瀏覽代碼

ZF-7948 Zend_Queue_Adapter_Activemq should only subscribe once - also fixes ActivemqTest to actually run

git-svn-id: http://framework.zend.com/svn/framework/standard/trunk@23498 44c647ce-9c0f-0410-b52a-842ac1e357ba
mjh_ca 15 年之前
父節點
當前提交
7f5aeaa979

+ 35 - 5
library/Zend/Queue/Adapter/Activemq.php

@@ -56,6 +56,11 @@ class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract
     private $_client = null;
 
     /**
+     * @var array
+     */
+    private $_subscribed = array();
+
+    /**
      * Constructor
      *
      * @param  array|Zend_Config $config An array having configuration data
@@ -177,6 +182,33 @@ class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract
     }
 
     /**
+     * Checks if the client is subscribed to the queue
+     *
+     * @param  Zend_Queue $queue
+     * @return boolean
+     */
+    protected function _isSubscribed(Zend_Queue $queue)
+    {
+        return isset($this->_subscribed[$queue->getName()]);
+    }
+
+    /**
+      * Subscribes the client to the queue.
+      *
+      * @param  Zend_Queue $queue
+      * @return void
+      */
+    protected function _subscribe(Zend_Queue $queue)
+    {
+        $frame = $this->_client->createFrame();
+        $frame->setCommand('SUBSCRIBE');
+        $frame->setHeader('destination', $queue->getName());
+        $frame->setHeader('ack', 'client');
+        $this->_client->send($frame);
+        $this->_subscribed[$queue->getName()] = true;
+    }
+
+    /**
      * Return the first element in the queue
      *
      * @param  integer    $maxMessages
@@ -200,11 +232,9 @@ class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract
         $data = array();
 
         // signal that we are reading
-        $frame = $this->_client->createFrame();
-        $frame->setCommand('SUBSCRIBE');
-        $frame->setHeader('destination', $queue->getName());
-        $frame->setHeader('ack','client');
-        $this->_client->send($frame);
+        if (!$this->_isSubscribed($queue)){
+            $this->_subscribe($queue);
+        }
 
         if ($maxMessages > 0) {
             if ($this->_client->canRead()) {

+ 101 - 0
tests/Zend/Queue/Adapter/ActivemqOfflineTest.php

@@ -0,0 +1,101 @@
+<?php
+/**
+ * Zend Framework
+ *
+ * LICENSE
+ *
+ * This source file is subject to the new BSD license that is bundled
+ * with this package in the file LICENSE.txt.
+ * It is also available through the world-wide-web at this URL:
+ * http://framework.zend.com/license/new-bsd
+ * If you did not receive a copy of the license and are unable to
+ * obtain it through the world-wide-web, please send an email
+ * to license@zend.com so we can send you a copy immediately.
+ *
+ * @category   Zend
+ * @package    Zend_Queue
+ * @subpackage UnitTests
+ * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
+ * @license    http://framework.zend.com/license/new-bsd     New BSD License
+ * @version    $Id: ActivemqTest.php 20096 2010-01-06 02:05:09Z bkarwin $
+ */
+
+/** PHPUnit Test Case */
+require_once 'PHPUnit/Framework/TestCase.php';
+
+/** TestHelp.php */
+require_once dirname(__FILE__) . '/../../../TestHelper.php';
+
+require_once 'Zend/Queue/Adapter/Activemq.php';
+require_once 'Zend/Queue/Stomp/Client.php';
+require_once 'Zend/Queue/Stomp/Frame.php';
+
+/**
+ * @category   Zend
+ * @package    Zend_Queue
+ * @subpackage UnitTests
+ * @copyright  Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
+ * @license    http://framework.zend.com/license/new-bsd     New BSD License
+ * @group      Zend_Queue
+ */
+class Zend_Queue_Adapter_ActivemqOfflineTest extends PHPUnit_Framework_TestCase
+{
+    /**
+     * @group ZF-7948
+     */
+    public function testSubscribesOncePerQueue()
+    {
+        $stompClient = new StompClientMock;
+        $options['driverOptions']['stompClient'] = $stompClient;
+        $adapter = new Zend_Queue_Adapter_Activemq($options);
+
+        $queue = new Zend_Queue('array', array('name' => 'foo'));
+        $adapter->receive(null, null, $queue);
+        $adapter->receive(null, null, $queue);
+
+        // iterate through mock StompClient and ensure SUBSCRIBE is only sent once per queue
+        $subscribes = 0;
+        foreach ($stompClient->frameStack as $frame) {
+            if ($frame->getCommand() === 'SUBSCRIBE') {
+                $subscribes++;
+            }
+        }
+
+        $this->assertEquals(1, $subscribes);
+    }
+}
+
+class StompClientMock extends Zend_Queue_Stomp_Client
+{
+    public $frameStack = array();
+    public $responseStack = array();
+
+    public function __construct() {
+        // spoof a successful connection in the response stack
+        $frame = new Zend_Queue_Stomp_Frame;
+        $frame->setCommand('CONNECTED');
+        $this->responseStack[] = $frame;
+    }
+    public function __destruct() {}
+
+    public function send(Zend_Queue_Stomp_FrameInterface $frame)
+    {
+        $this->frameStack[] = $frame;
+        return $this;
+    }
+
+    public function receive()
+    {
+        return array_shift($this->responseStack);
+    }
+
+    public function canRead()
+    {
+        return count($this->responseStack) > 0;
+    }
+
+    public function createFrame()
+    {
+        return new Zend_Queue_Stomp_Frame;
+    }
+}

+ 3 - 3
tests/Zend/Queue/Adapter/ActivemqTest.php

@@ -63,13 +63,13 @@ class Zend_Queue_Adapter_ActivemqTest extends Zend_Queue_Adapter_AdapterTest
     {
         $driverOptions = array();
         if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_HOST')) {
-            $driverOptions['host'] = TESTS_ZEND_QUEUE_APACHEMQ_HOST;
+            $driverOptions['host'] = TESTS_ZEND_QUEUE_ACTIVEMQ_HOST;
         }
         if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_PORT')) {
-            $driverOptions['port'] = TESTS_ZEND_QUEUE_APACHEMQ_PORT;
+            $driverOptions['port'] = TESTS_ZEND_QUEUE_ACTIVEMQ_PORT;
         }
         if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_SCHEME')) {
-            $driverOptions['scheme'] = TESTS_ZEND_QUEUE_APACHEMQ_SCHEME;
+            $driverOptions['scheme'] = TESTS_ZEND_QUEUE_ACTIVEMQ_SCHEME;
         }
         return array('driverOptions' => $driverOptions);
     }

+ 12 - 2
tests/Zend/Queue/Adapter/AdapterTest.php

@@ -148,11 +148,21 @@ abstract class Zend_Queue_Adapter_AdapterTest extends PHPUnit_Framework_TestCase
         try {
             $queue = new Zend_Queue($this->getAdapterName(), $config);
         } catch (Zend_Queue_Exception $e) {
-            $this->markTestSkipped();
+            $this->markTestSkipped($e->getMessage());
             restore_error_handler();
             return false;
         }
-        restore_error_handler();
+
+        // a PHP level error occurred, mark test as failed with error as reason
+        // (misconfigured test? undefined constant?)
+        if ($this->error) {
+            $err = error_get_last();
+            $this->markTestFailed($err['message']);
+            restore_error_handler();
+            return false;
+        }
+
+        restore_error_handler();        
 
         return $queue;
     }

+ 2 - 0
tests/Zend/Queue/AllTests.php

@@ -49,6 +49,7 @@ require_once 'Zend/Queue/Stomp/ClientTest.php';
 
 // Message Queues dependent on Stomp
 require_once 'Zend/Queue/Adapter/ActivemqTest.php';
+require_once 'Zend/Queue/Adapter/ActivemqOfflineTest.php';
 
 /**
  * @category   Zend
@@ -94,6 +95,7 @@ class Zend_Queue_AllTests
 
         // Message Queues dependent on Stomp
         $suite->addTestSuite('Zend_Queue_Adapter_ActivemqTest');
+        $suite->addTestSuite('Zend_Queue_Adapter_ActivemqOfflineTest');
 
         return $suite;
     }