Browse Source

Index: tests/Zend/Queue/Bugs/ZF-7650Test.php
===================================================================
--- tests/Zend/Queue/Bugs/ZF-7650Test.php (revision 0)
+++ tests/Zend/Queue/Bugs/ZF-7650Test.php (revision 0)
@@ -0,0 +1,132 @@
+<?php
+/**
+ * Zend Framework
+ *
+ * LICENSE
+ *
+ * This source file is subject to the new BSD license that is bundled
+ * with this package in the file LICENSE.txt.
+ * It is also available through the world-wide-web at this URL:
+ * http://framework.zend.com/license/new-bsd
+ * If you did not receive a copy of the license and are unable to
+ * obtain it through the world-wide-web, please send an email
+ * to license@zend.com so we can send you a copy immediately.
+ *
+ * @category Zend
+ * @package Zend_Queue
+ * @subpackage UnitTests
+ * @copyright Copyright (c) 2005-2009 Zend Technologies USA Inc. (http://www.zend.com)
+ * @license http://framework.zend.com/license/new-bsd New BSD License
+ * @version $Id: QueueTest.php 17667 2009-08-18 21:40:09Z mikaelkael $
+ */
+
+/*
+ * This code specifically tests for ZF-7650
+ */
+
+/** PHPUnit Test Case */
+require_once 'PHPUnit/Framework/TestCase.php';
+
+/** TestHelp.php */
+require_once dirname(__FILE__) . '/../../../TestHelper.php';
+
+/** Zend_Queue */
+require_once 'Zend/Queue.php';
+
+/** Zend_Queue */
+require_once 'Zend/Queue/Message.php';
+
+/** Zend_Queue_Adapter_Array */
+require_once 'Zend/Queue/Adapter/Array.php';
+
+/**
+ * @see Zend_Db_Select
+ */
+require_once 'Zend/Db/Select.php';
+
+/**
+ * @category Zend
+ * @package Zend_Queue
+ * @subpackage UnitTests
+ * @copyright Copyright (c) 2005-2009 Zend Technologies USA Inc. (http://www.zend.com)
+ * @license http://framework.zend.com/license/new-bsd New BSD License
+ * @group Zend_Queue
+ */
+class Zend_Queue_QueueTest extends PHPUnit_Framework_TestCase
+{
+ public function test_ZF_7650()
+ {
+ // Zend_Queue_Adapter_Array
+ $queue = new Zend_Queue('Array');
+ $queue2 = $queue->createQueue('queue');
+
+ $queue->send('My Test Message 1');
+ $queue->send('My Test Message 2');
+
+ $messages = $queue->receive(0);
+ $this->assertEquals(0, count($messages));
+
+ // Zend_Queue_Adapter_Memcacheq
+ $driverOptions = array();
+ if (defined('TESTS_ZEND_QUEUE_MEMCACHEQ_HOST')) {
+ $driverOptions['host'] = TESTS_ZEND_QUEUE_MEMCACHEQ_HOST;
+ }
+ if (defined('TESTS_ZEND_QUEUE_MEMCACHEQ_PORT')) {
+ $driverOptions['port'] = TESTS_ZEND_QUEUE_MEMCACHEQ_PORT;
+ }
+ $options = array('name' => 'ZF7650', 'driverOptions' => $driverOptions);
+
+ $queue = new Zend_Queue('Memcacheq', $options);
+ $queue2 = $queue->createQueue('queue');
+
+ $queue->send('My Test Message 1');
+ $queue->send('My Test Message 2');
+
+ $messages = $queue->receive(0);
+ $this->assertEquals(0, count($messages));
+
+ // Zend_Queue_Adapter_Db
+ $driverOptions = array();
+ if (defined('TESTS_ZEND_QUEUE_DB')) {
+ require_once 'Zend/Json.php';
+ $driverOptions = Zend_Json::decode(TESTS_ZEND_QUEUE_DB);
+ }
+
+ $options = array(
+ 'name' => '/temp-queue/ZF7650',
+ 'options' => array(Zend_Db_Select::FOR_UPDATE => true),
+ 'driverOptions' => $driverOptions,
+ );
+
+ $queue = new Zend_Queue('Db', $options);
+ $queue2 = $queue->createQueue('queue');
+
+ $queue->send('My Test Message 1');
+ $queue->send('My Test Message 2');
+
+ $messages = $queue->receive(0);
+ $this->assertEquals(0, count($messages));
+
+ // Zend_Queue_Adapter_Activemq
+ $driverOptions = array();
+ if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_HOST')) {
+ $driverOptions['host'] = TESTS_ZEND_QUEUE_ACTIVEMQ_HOST;
+ }
+ if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_PORT')) {
+ $driverOptions['port'] = TESTS_ZEND_QUEUE_ACTIVEMQ_PORT;
+ }
+ if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_SCHEME')) {
+ $driverOptions['scheme'] = TESTS_ZEND_QUEUE_ACTIVEMQ_SCHEME;
+ }
+ $options = array('driverOptions' => $driverOptions);
+
+ $queue = new Zend_Queue('Activemq', $options);
+ $queue2 = $queue->createQueue('queue');
+
+ $queue->send('My Test Message 1');
+ $queue->send('My Test Message 2');
+
+ $messages = $queue->receive(0);
+ $this->assertEquals(0, count($messages));
+ }
+}
Index: library/Zend/Queue/Adapter/Activemq.php
===================================================================
--- library/Zend/Queue/Adapter/Activemq.php (revision 17776)
+++ library/Zend/Queue/Adapter/Activemq.php (working copy)
@@ -196,6 +196,9 @@
$queue = $this->_queue;
}

+ // read
+ $data = array();
+
// signal that we are reading
$frame = $this->_client->createFrame();
$frame->setCommand('SUBSCRIBE');
@@ -203,26 +206,26 @@
$frame->setHeader('ack','client');
$this->_client->send($frame);

- // read
- $data = array();
- if ($this->_client->canRead()) {
- for ($i = 0; $i < $maxMessages; $i++) {
- $response = $this->_client->receive();
+ if ($maxMessages > 0) {
+ if ($this->_client->canRead()) {
+ for ($i = 0; $i < $maxMessages; $i++) {
+ $response = $this->_client->receive();

- switch ($response->getCommand()) {
- case 'MESSAGE':
- $datum = array(
- 'message_id' => $response->getHeader('message-id'),
- 'handle' => $response->getHeader('message-id'),
- 'body' => $response->getBody(),
- 'md5' => md5($response->getBody())
- );
- $data[] = $datum;
- break;
- default:
- $block = print_r($response, true);
- require_once 'Zend/Queue/Exception.php';
- throw new Zend_Queue_Exception('Invalid response received: ' . $block);
+ switch ($response->getCommand()) {
+ case 'MESSAGE':
+ $datum = array(
+ 'message_id' => $response->getHeader('message-id'),
+ 'handle' => $response->getHeader('message-id'),
+ 'body' => $response->getBody(),
+ 'md5' => md5($response->getBody())
+ );
+ $data[] = $datum;
+ break;
+ default:
+ $block = print_r($response, true);
+ require_once 'Zend/Queue/Exception.php';
+ throw new Zend_Queue_Exception('Invalid response received: ' . $block);
+ }
}
}
}
Index: library/Zend/Queue/Adapter/Array.php
===================================================================
--- library/Zend/Queue/Adapter/Array.php (revision 17776)
+++ library/Zend/Queue/Adapter/Array.php (working copy)
@@ -223,22 +223,27 @@
}

$data = array();
- $start_time = microtime(true);
+ if ($maxMessages > 0) {
+ $start_time = microtime(true);

- $count = 0;
- $temp = &$this->_data[$queue->getName()];
- foreach ($temp as $key=>&$msg) {
- if (($msg['handle'] === null)
- || ($msg['timeout'] + $timeout < $start_time)
- ) {
- $msg['handle'] = md5(uniqid(rand(), true));
- $msg['timeout'] = microtime(true);
- $data[] = $msg;
- $count++;
- }
+ $count = 0;
+ $temp = &$this->_data[$queue->getName()];
+ foreach ($temp as $key=>&$msg) {
+ // count check has to be first, as someone can ask for 0 messages
+ // ZF-7650
+ if ($count >= $maxMessages) {
+ break;
+ }

- if ($count >= $maxMessages) {
- break;
+ if (($msg['handle'] === null)
+ || ($msg['timeout'] + $timeout < $start_time)
+ ) {
+ $msg['handle'] = md5(uniqid(rand(), true));
+ $msg['timeout'] = microtime(true);
+ $data[] = $msg;
+ $count++;
+ }
+
}
}

Index: library/Zend/Queue/Adapter/Memcacheq.php
===================================================================
--- library/Zend/Queue/Adapter/Memcacheq.php (revision 17776)
+++ library/Zend/Queue/Adapter/Memcacheq.php (working copy)
@@ -279,6 +279,7 @@
if ($maxMessages === null) {
$maxMessages = 1;
}
+
if ($timeout === null) {
$timeout = self::RECEIVE_TIMEOUT_DEFAULT;
}
@@ -287,13 +288,15 @@
}

$msgs = array();
- for ($i = 0; $i < $maxMessages; $i++) {
- $data = array(
- 'handle' => md5(uniqid(rand(), true)),
- 'body' => $this->_cache->get($queue->getName()),
- );
+ if ($maxMessages > 0 ) {
+ for ($i = 0; $i < $maxMessages; $i++) {
+ $data = array(
+ 'handle' => md5(uniqid(rand(), true)),
+ 'body' => $this->_cache->get($queue->getName()),
+ );

- $msgs[] = $data;
+ $msgs[] = $data;
+ }
}

$options = array(
Index: library/Zend/Queue/Adapter/Db.php
===================================================================
--- library/Zend/Queue/Adapter/Db.php (revision 17776)
+++ library/Zend/Queue/Adapter/Db.php (working copy)
@@ -346,41 +346,43 @@

// start transaction handling
try {
- $db->beginTransaction();
+ if ( $maxMessages > 0 ) { // ZF-7666 LIMIT 0 clause not included.
+ $db->beginTransaction();

- $query = $db->select();
- if ($this->_options['options'][Zend_Db_Select::FOR_UPDATE]) {
- // turn on forUpdate
- $query->forUpdate();
- }
- $query->from($info['name'], array('*'))
- ->where('queue_id=?', $this->getQueueId($queue->getName()))
- ->where('handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime)
- ->limit($maxMessages);
+ $query = $db->select();
+ if ($this->_options['options'][Zend_Db_Select::FOR_UPDATE]) {
+ // turn on forUpdate
+ $query->forUpdate();
+ }
+ $query->from($info['name'], array('*'))
+ ->where('queue_id=?', $this->getQueueId($queue->getName()))
+ ->where('handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime)
+ ->limit($maxMessages);

- foreach ($db->fetchAll($query) as $data) {
- // setup our changes to the message
- $data['handle'] = md5(uniqid(rand(), true));
+ foreach ($db->fetchAll($query) as $data) {
+ // setup our changes to the message
+ $data['handle'] = md5(uniqid(rand(), true));

- $update = array(
- 'handle' => $data['handle'],
- 'timeout' => $microtime,
- );
+ $update = array(
+ 'handle' => $data['handle'],
+ 'timeout' => $microtime,
+ );

- // update the database
- $where = array();
- $where[] = $db->quoteInto('message_id=?', $data['message_id']);
- $where[] = 'handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime;
+ // update the database
+ $where = array();
+ $where[] = $db->quoteInto('message_id=?', $data['message_id']);
+ $where[] = 'handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime;

- $count = $db->update($info['name'], $update, $where);
+ $count = $db->update($info['name'], $update, $where);

- // we check count to make sure no other thread has gotten
- // the rows after our select, but before our update.
- if ($count > 0) {
- $msgs[] = $data;
+ // we check count to make sure no other thread has gotten
+ // the rows after our select, but before our update.
+ if ($count > 0) {
+ $msgs[] = $data;
+ }
}
+ $db->commit();
}
- $db->commit();
} catch (Exception $e) {
$db->rollBack();



git-svn-id: http://framework.zend.com/svn/framework/standard/trunk@17778 44c647ce-9c0f-0410-b52a-842ac1e357ba

danlo 16 years ago
parent
commit
576af6600d

+ 23 - 20
library/Zend/Queue/Adapter/Activemq.php

@@ -196,6 +196,9 @@ class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract
             $queue = $this->_queue;
         }
 
+        // read
+        $data = array();
+
         // signal that we are reading
         $frame = $this->_client->createFrame();
         $frame->setCommand('SUBSCRIBE');
@@ -203,26 +206,26 @@ class Zend_Queue_Adapter_Activemq extends Zend_Queue_Adapter_AdapterAbstract
         $frame->setHeader('ack','client');
         $this->_client->send($frame);
 
-        // read
-        $data = array();
-        if ($this->_client->canRead()) {
-            for ($i = 0; $i < $maxMessages; $i++) {
-                $response = $this->_client->receive();
-
-                switch ($response->getCommand()) {
-                    case 'MESSAGE':
-                        $datum = array(
-                            'message_id' => $response->getHeader('message-id'),
-                            'handle'     => $response->getHeader('message-id'),
-                            'body'       => $response->getBody(),
-                            'md5'        => md5($response->getBody())
-                        );
-                        $data[] = $datum;
-                        break;
-                    default:
-                        $block = print_r($response, true);
-                        require_once 'Zend/Queue/Exception.php';
-                        throw new Zend_Queue_Exception('Invalid response received: ' . $block);
+        if ($maxMessages > 0) {
+            if ($this->_client->canRead()) {
+                for ($i = 0; $i < $maxMessages; $i++) {
+                    $response = $this->_client->receive();
+
+                    switch ($response->getCommand()) {
+                        case 'MESSAGE':
+                            $datum = array(
+                                'message_id' => $response->getHeader('message-id'),
+                                'handle'     => $response->getHeader('message-id'),
+                                'body'       => $response->getBody(),
+                                'md5'        => md5($response->getBody())
+                            );
+                            $data[] = $datum;
+                            break;
+                        default:
+                            $block = print_r($response, true);
+                            require_once 'Zend/Queue/Exception.php';
+                            throw new Zend_Queue_Exception('Invalid response received: ' . $block);
+                    }
                 }
             }
         }

+ 20 - 15
library/Zend/Queue/Adapter/Array.php

@@ -223,22 +223,27 @@ class Zend_Queue_Adapter_Array extends Zend_Queue_Adapter_AdapterAbstract
         }
 
         $data       = array();
-        $start_time = microtime(true);
-
-        $count = 0;
-        $temp = &$this->_data[$queue->getName()];
-        foreach ($temp as $key=>&$msg) {
-            if (($msg['handle'] === null)
-                || ($msg['timeout'] + $timeout < $start_time)
-            ) {
-                $msg['handle']  = md5(uniqid(rand(), true));
-                $msg['timeout'] = microtime(true);
-                $data[] = $msg;
-                $count++;
-            }
+        if ($maxMessages > 0) {
+            $start_time = microtime(true);
+
+            $count = 0;
+            $temp = &$this->_data[$queue->getName()];
+            foreach ($temp as $key=>&$msg) {
+                // count check has to be first, as someone can ask for 0 messages
+                // ZF-7650
+                if ($count >= $maxMessages) {
+                    break;
+                }
+
+                if (($msg['handle'] === null)
+                    || ($msg['timeout'] + $timeout < $start_time)
+                ) {
+                    $msg['handle']  = md5(uniqid(rand(), true));
+                    $msg['timeout'] = microtime(true);
+                    $data[] = $msg;
+                    $count++;
+                }
 
-            if ($count >= $maxMessages) {
-                break;
             }
         }
 

+ 34 - 32
library/Zend/Queue/Adapter/Db.php

@@ -346,41 +346,43 @@ class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
 
         // start transaction handling
         try {
-            $db->beginTransaction();
+            if ( $maxMessages > 0 ) { // ZF-7666 LIMIT 0 clause not included.
+                $db->beginTransaction();
 
-            $query = $db->select();
-            if ($this->_options['options'][Zend_Db_Select::FOR_UPDATE]) {
-                // turn on forUpdate
-                $query->forUpdate();
-            }
-            $query->from($info['name'], array('*'))
-                  ->where('queue_id=?', $this->getQueueId($queue->getName()))
-                  ->where('handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime)
-                  ->limit($maxMessages);
-
-            foreach ($db->fetchAll($query) as $data) {
-                // setup our changes to the message
-                $data['handle'] = md5(uniqid(rand(), true));
-
-                $update = array(
-                    'handle'  => $data['handle'],
-                    'timeout' => $microtime,
-                );
-
-                // update the database
-                $where   = array();
-                $where[] = $db->quoteInto('message_id=?', $data['message_id']);
-                $where[] = 'handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime;
-
-                $count = $db->update($info['name'], $update, $where);
-
-                // we check count to make sure no other thread has gotten
-                // the rows after our select, but before our update.
-                if ($count > 0) {
-                    $msgs[] = $data;
+                $query = $db->select();
+                if ($this->_options['options'][Zend_Db_Select::FOR_UPDATE]) {
+                    // turn on forUpdate
+                    $query->forUpdate();
+                }
+                $query->from($info['name'], array('*'))
+                      ->where('queue_id=?', $this->getQueueId($queue->getName()))
+                      ->where('handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime)
+                      ->limit($maxMessages);
+
+                foreach ($db->fetchAll($query) as $data) {
+                    // setup our changes to the message
+                    $data['handle'] = md5(uniqid(rand(), true));
+
+                    $update = array(
+                        'handle'  => $data['handle'],
+                        'timeout' => $microtime,
+                    );
+
+                    // update the database
+                    $where   = array();
+                    $where[] = $db->quoteInto('message_id=?', $data['message_id']);
+                    $where[] = 'handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime;
+
+                    $count = $db->update($info['name'], $update, $where);
+
+                    // we check count to make sure no other thread has gotten
+                    // the rows after our select, but before our update.
+                    if ($count > 0) {
+                        $msgs[] = $data;
+                    }
                 }
+                $db->commit();
             }
-            $db->commit();
         } catch (Exception $e) {
             $db->rollBack();
 

+ 10 - 7
library/Zend/Queue/Adapter/Memcacheq.php

@@ -279,6 +279,7 @@ class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
         if ($maxMessages === null) {
             $maxMessages = 1;
         }
+
         if ($timeout === null) {
             $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
         }
@@ -287,13 +288,15 @@ class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
         }
 
         $msgs = array();
-        for ($i = 0; $i < $maxMessages; $i++) {
-            $data = array(
-                'handle' => md5(uniqid(rand(), true)),
-                'body'   => $this->_cache->get($queue->getName()),
-            );
-
-            $msgs[] = $data;
+        if ($maxMessages > 0 ) {
+            for ($i = 0; $i < $maxMessages; $i++) {
+                $data = array(
+                    'handle' => md5(uniqid(rand(), true)),
+                    'body'   => $this->_cache->get($queue->getName()),
+                );
+
+                $msgs[] = $data;
+            }
         }
 
         $options = array(

+ 132 - 0
tests/Zend/Queue/Bugs/ZF-7650Test.php

@@ -0,0 +1,132 @@
+<?php
+/**
+ * Zend Framework
+ *
+ * LICENSE
+ *
+ * This source file is subject to the new BSD license that is bundled
+ * with this package in the file LICENSE.txt.
+ * It is also available through the world-wide-web at this URL:
+ * http://framework.zend.com/license/new-bsd
+ * If you did not receive a copy of the license and are unable to
+ * obtain it through the world-wide-web, please send an email
+ * to license@zend.com so we can send you a copy immediately.
+ *
+ * @category   Zend
+ * @package    Zend_Queue
+ * @subpackage UnitTests
+ * @copyright  Copyright (c) 2005-2009 Zend Technologies USA Inc. (http://www.zend.com)
+ * @license    http://framework.zend.com/license/new-bsd     New BSD License
+ * @version    $Id: QueueTest.php 17667 2009-08-18 21:40:09Z mikaelkael $
+ */
+
+/*
+ * This code specifically tests for ZF-7650
+ */
+
+/** PHPUnit Test Case */
+require_once 'PHPUnit/Framework/TestCase.php';
+
+/** TestHelp.php */
+require_once dirname(__FILE__) . '/../../../TestHelper.php';
+
+/** Zend_Queue */
+require_once 'Zend/Queue.php';
+
+/** Zend_Queue */
+require_once 'Zend/Queue/Message.php';
+
+/** Zend_Queue_Adapter_Array */
+require_once 'Zend/Queue/Adapter/Array.php';
+
+/**
+ * @see Zend_Db_Select
+ */
+require_once 'Zend/Db/Select.php';
+
+/**
+ * @category   Zend
+ * @package    Zend_Queue
+ * @subpackage UnitTests
+ * @copyright  Copyright (c) 2005-2009 Zend Technologies USA Inc. (http://www.zend.com)
+ * @license    http://framework.zend.com/license/new-bsd     New BSD License
+ * @group      Zend_Queue
+ */
+class Zend_Queue_QueueTest extends PHPUnit_Framework_TestCase
+{
+    public function test_ZF_7650()
+    {
+        // Zend_Queue_Adapter_Array
+        $queue = new Zend_Queue('Array');
+        $queue2 = $queue->createQueue('queue');
+
+        $queue->send('My Test Message 1');
+        $queue->send('My Test Message 2');
+
+        $messages = $queue->receive(0);
+        $this->assertEquals(0, count($messages));
+
+        // Zend_Queue_Adapter_Memcacheq
+        $driverOptions = array();
+        if (defined('TESTS_ZEND_QUEUE_MEMCACHEQ_HOST')) {
+            $driverOptions['host'] = TESTS_ZEND_QUEUE_MEMCACHEQ_HOST;
+        }
+        if (defined('TESTS_ZEND_QUEUE_MEMCACHEQ_PORT')) {
+            $driverOptions['port'] = TESTS_ZEND_QUEUE_MEMCACHEQ_PORT;
+        }
+        $options = array('name' => 'ZF7650', 'driverOptions' => $driverOptions);
+
+        $queue = new Zend_Queue('Memcacheq', $options);
+        $queue2 = $queue->createQueue('queue');
+
+        $queue->send('My Test Message 1');
+        $queue->send('My Test Message 2');
+
+        $messages = $queue->receive(0);
+        $this->assertEquals(0, count($messages));
+
+        // Zend_Queue_Adapter_Db
+        $driverOptions = array();
+        if (defined('TESTS_ZEND_QUEUE_DB')) {
+            require_once 'Zend/Json.php';
+            $driverOptions = Zend_Json::decode(TESTS_ZEND_QUEUE_DB);
+        }
+
+        $options = array(
+            'name'          => '/temp-queue/ZF7650',
+            'options'       => array(Zend_Db_Select::FOR_UPDATE => true),
+            'driverOptions' => $driverOptions,
+        );
+
+        $queue = new Zend_Queue('Db', $options);
+        $queue2 = $queue->createQueue('queue');
+
+        $queue->send('My Test Message 1');
+        $queue->send('My Test Message 2');
+
+        $messages = $queue->receive(0);
+        $this->assertEquals(0, count($messages));
+
+        // Zend_Queue_Adapter_Activemq
+        $driverOptions = array();
+        if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_HOST')) {
+            $driverOptions['host'] = TESTS_ZEND_QUEUE_ACTIVEMQ_HOST;
+        }
+        if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_PORT')) {
+            $driverOptions['port'] = TESTS_ZEND_QUEUE_ACTIVEMQ_PORT;
+        }
+        if (defined('TESTS_ZEND_QUEUE_ACTIVEMQ_SCHEME')) {
+            $driverOptions['scheme'] = TESTS_ZEND_QUEUE_ACTIVEMQ_SCHEME;
+        }
+        $options = array('driverOptions' => $driverOptions);
+
+        $queue = new Zend_Queue('Activemq', $options);
+        $queue2 = $queue->createQueue('queue');
+
+        $queue->send('My Test Message 1');
+        $queue->send('My Test Message 2');
+
+        $messages = $queue->receive(0);
+        $this->assertEquals(0, count($messages));
+    }
+}