|
|
@@ -67,6 +67,11 @@ class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
protected $_messageTable = null;
|
|
|
|
|
|
/**
|
|
|
+ * @var Zend_Db_Table_Row_Abstract
|
|
|
+ */
|
|
|
+ protected $_messageRow = null;
|
|
|
+
|
|
|
+ /**
|
|
|
* Constructor
|
|
|
*
|
|
|
* @param array|Zend_Config $options
|
|
|
@@ -87,6 +92,33 @@ class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
throw new Zend_Queue_Exception('Options array item: Zend_Db_Select::FOR_UPDATE must be boolean');
|
|
|
}
|
|
|
|
|
|
+ if ($this->_options['dbAdapter'] !== null
|
|
|
+ && $this->_options['dbAdapter'] instanceof Zend_Db_Adapter_Abstract) {
|
|
|
+ $db = $this->_options['dbAdapter'];
|
|
|
+ } else {
|
|
|
+ $db = $this->_initDbAdapter();
|
|
|
+ }
|
|
|
+
|
|
|
+ $this->_queueTable = new Zend_Queue_Adapter_Db_Queue(array(
|
|
|
+ 'db' => $db,
|
|
|
+ ));
|
|
|
+
|
|
|
+ $this->_messageTable = new Zend_Queue_Adapter_Db_Message(array(
|
|
|
+ 'db' => $db,
|
|
|
+ ));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Initialize Db adapter using 'driverOptions' section of the _options array
|
|
|
+ *
|
|
|
+ * Throws an exception if the adapter cannot connect to DB.
|
|
|
+ *
|
|
|
+ * @return Zend_Db_Adapter_Abstract
|
|
|
+ * @throws Zend_Queue_Exception
|
|
|
+ */
|
|
|
+ protected function _initDbAdapter()
|
|
|
+ {
|
|
|
$options = &$this->_options['driverOptions'];
|
|
|
if (!array_key_exists('type', $options)) {
|
|
|
require_once 'Zend/Queue/Exception.php';
|
|
|
@@ -118,18 +150,12 @@ class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
|
|
|
try {
|
|
|
$db = Zend_Db::factory($type, $options);
|
|
|
-
|
|
|
- $this->_queueTable = new Zend_Queue_Adapter_Db_Queue(array(
|
|
|
- 'db' => $db,
|
|
|
- ));
|
|
|
-
|
|
|
- $this->_messageTable = new Zend_Queue_Adapter_Db_Message(array(
|
|
|
- 'db' => $db,
|
|
|
- ));
|
|
|
} catch (Zend_Db_Exception $e) {
|
|
|
require_once 'Zend/Queue/Exception.php';
|
|
|
throw new Zend_Queue_Exception('Error connecting to database: ' . $e->getMessage(), $e->getCode());
|
|
|
}
|
|
|
+
|
|
|
+ return $db;
|
|
|
}
|
|
|
|
|
|
/********************************************************************
|
|
|
@@ -149,7 +175,15 @@ class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
*/
|
|
|
public function isExists($name)
|
|
|
{
|
|
|
- return in_array($name, $this->getQueues());
|
|
|
+ $id = 0;
|
|
|
+
|
|
|
+ try {
|
|
|
+ $id = $this->getQueueId($name);
|
|
|
+ } catch (Zend_Queue_Exception $e) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ return ($id > 0);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -173,7 +207,7 @@ class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
|
|
|
$queue = $this->_queueTable->createRow();
|
|
|
$queue->queue_name = $name;
|
|
|
- $queue->timeout = ($timeout === null) ? self::CREATE_TIMEOUT_DEFAULT : (int)$timeout;
|
|
|
+ $queue->timeout = ($timeout === null) ? self::CREATE_TIMEOUT_DEFAULT : (int)$timeout;
|
|
|
|
|
|
try {
|
|
|
if ($queue->save()) {
|
|
|
@@ -278,6 +312,10 @@ class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
*/
|
|
|
public function send($message, Zend_Queue $queue = null)
|
|
|
{
|
|
|
+ if ($this->_messageRow === null) {
|
|
|
+ $this->_messageRow = $this->_messageTable->createRow();
|
|
|
+ }
|
|
|
+
|
|
|
if ($queue === null) {
|
|
|
$queue = $this->_queue;
|
|
|
}
|
|
|
@@ -294,7 +332,7 @@ class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
|
|
|
}
|
|
|
|
|
|
- $msg = $this->_messageTable->createRow();
|
|
|
+ $msg = clone $this->_messageRow;
|
|
|
$msg->queue_id = $this->getQueueId($queue->getName());
|
|
|
$msg->created = time();
|
|
|
$msg->body = $message;
|
|
|
@@ -470,6 +508,10 @@ class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
*/
|
|
|
protected function getQueueId($name)
|
|
|
{
|
|
|
+ if (array_key_exists($name, $this->_queues)) {
|
|
|
+ return $this->_queues[$name];
|
|
|
+ }
|
|
|
+
|
|
|
$query = $this->_queueTable->select();
|
|
|
$query->from($this->_queueTable, array('queue_id'))
|
|
|
->where('queue_name=?', $name);
|
|
|
@@ -481,6 +523,8 @@ class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
throw new Zend_Queue_Exception('Queue does not exist: ' . $name);
|
|
|
}
|
|
|
|
|
|
- return (int)$queue->queue_id;
|
|
|
+ $this->_queues[$name] = (int)$queue->queue_id;
|
|
|
+
|
|
|
+ return $this->_queues[$name];
|
|
|
}
|
|
|
}
|