Db.php 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488
  1. <?php
  2. /**
  3. * Zend Framework
  4. *
  5. * LICENSE
  6. *
  7. * This source file is subject to the new BSD license that is bundled
  8. * with this package in the file LICENSE.txt.
  9. * It is also available through the world-wide-web at this URL:
  10. * http://framework.zend.com/license/new-bsd
  11. * If you did not receive a copy of the license and are unable to
  12. * obtain it through the world-wide-web, please send an email
  13. * to license@zend.com so we can send you a copy immediately.
  14. *
  15. * @category Zend
  16. * @package Zend_Queue
  17. * @subpackage Adapter
  18. * @copyright Copyright (c) 2005-2009 Zend Technologies USA Inc. (http://www.zend.com)
  19. * @license http://framework.zend.com/license/new-bsd New BSD License
  20. * @version $Id$
  21. */
  22. /**
  23. * @see Zend_Queue_Adapter_AdapterAbstract
  24. */
  25. require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
  26. /**
  27. * @see Zend_Db_Select
  28. */
  29. require_once 'Zend/Db/Select.php';
  30. /**
  31. * @see Zend_Db
  32. */
  33. require_once 'Zend/Db.php';
  34. /**
  35. * @see Zend_Queue_Adapter_Db_Queue
  36. */
  37. require_once 'Zend/Queue/Adapter/Db/Queue.php';
  38. /**
  39. * @see Zend_Queue_Adapter_Db_Message
  40. */
  41. require_once 'Zend/Queue/Adapter/Db/Message.php';
  42. /**
  43. * Class for using connecting to a Zend_Db-based queuing system
  44. *
  45. * @category Zend
  46. * @package Zend_Queue
  47. * @subpackage Adapter
  48. * @copyright Copyright (c) 2005-2009 Zend Technologies USA Inc. (http://www.zend.com)
  49. * @license http://framework.zend.com/license/new-bsd New BSD License
  50. */
  51. class Zend_Queue_Adapter_Db extends Zend_Queue_Adapter_AdapterAbstract
  52. {
  53. /**
  54. * @var Zend_Queue_Adapter_Db_Queue
  55. */
  56. protected $_queueTable = null;
  57. /**
  58. * @var Zend_Queue_Adapter_Db_Message
  59. */
  60. protected $_messageTable = null;
  61. /**
  62. * Constructor
  63. *
  64. * @param array|Zend_Config $options
  65. * @param Zend_Queue|null $queue
  66. * @return void
  67. */
  68. public function __construct($options, Zend_Queue $queue = null)
  69. {
  70. parent::__construct($options, $queue);
  71. if (!isset($this->_options['options'][Zend_Db_Select::FOR_UPDATE])) {
  72. // turn off auto update by default
  73. $this->_options['options'][Zend_Db_Select::FOR_UPDATE] = false;
  74. }
  75. if (!is_bool($this->_options['options'][Zend_Db_Select::FOR_UPDATE])) {
  76. require_once 'Zend/Queue/Exception.php';
  77. throw new Zend_Queue_Exception('Options array item: Zend_Db_Select::FOR_UPDATE must be boolean');
  78. }
  79. $options = &$this->_options['driverOptions'];
  80. if (!array_key_exists('type', $options)) {
  81. require_once 'Zend/Queue/Exception.php';
  82. throw new Zend_Queue_Exception("Configuration array must have a key for 'type' for the database type to use");
  83. }
  84. if (!array_key_exists('host', $options)) {
  85. require_once 'Zend/Queue/Exception.php';
  86. throw new Zend_Queue_Exception("Configuration array must have a key for 'host' for the host to use");
  87. }
  88. if (!array_key_exists('username', $options)) {
  89. require_once 'Zend/Queue/Exception.php';
  90. throw new Zend_Queue_Exception("Configuration array must have a key for 'username' for the username to use");
  91. }
  92. if (!array_key_exists('password', $options)) {
  93. require_once 'Zend/Queue/Exception.php';
  94. throw new Zend_Queue_Exception("Configuration array must have a key for 'password' for the password to use");
  95. }
  96. if (!array_key_exists('dbname', $options)) {
  97. require_once 'Zend/Queue/Exception.php';
  98. throw new Zend_Queue_Exception("Configuration array must have a key for 'dbname' for the database to use");
  99. }
  100. try {
  101. $db = Zend_Db::factory($options['type'], $options);
  102. $this->_queueTable = new Zend_Queue_Adapter_Db_Queue(array(
  103. 'db' => $db,
  104. ));
  105. $this->_messageTable = new Zend_Queue_Adapter_Db_Message(array(
  106. 'db' => $db,
  107. ));
  108. } catch (Zend_Db_Exception $e) {
  109. require_once 'Zend/Queue/Exception.php';
  110. throw new Zend_Queue_Exception('Error connecting to database: ' . $e->getMessage(), $e->getCode());
  111. }
  112. }
  113. /********************************************************************
  114. * Queue management functions
  115. *********************************************************************/
  116. /**
  117. * Does a queue already exist?
  118. *
  119. * Throws an exception if the adapter cannot determine if a queue exists.
  120. * use isSupported('isExists') to determine if an adapter can test for
  121. * queue existance.
  122. *
  123. * @param string $name
  124. * @return boolean
  125. * @throws Zend_Queue_Exception
  126. */
  127. public function isExists($name)
  128. {
  129. return in_array($name, $this->getQueues());
  130. }
  131. /**
  132. * Create a new queue
  133. *
  134. * Visibility timeout is how long a message is left in the queue "invisible"
  135. * to other readers. If the message is acknowleged (deleted) before the
  136. * timeout, then the message is deleted. However, if the timeout expires
  137. * then the message will be made available to other queue readers.
  138. *
  139. * @param string $name queue name
  140. * @param integer $timeout default visibility timeout
  141. * @return boolean
  142. * @throws Zend_Queue_Exception - database error
  143. */
  144. public function create($name, $timeout = null)
  145. {
  146. if ($this->isExists($name)) {
  147. $this->getLogger()->warn('Create queue failed. Queue already exists: ' . $name);
  148. return false;
  149. }
  150. $queue = $this->_queueTable->createRow();
  151. $queue->queue_name = $name;
  152. $queue->timeout = ($timeout === null) ? self::CREATE_TIMEOUT_DEFAULT : (int)$timeout;
  153. try {
  154. if ($id = $queue->save()) {
  155. $this->getLogger()->info('Queue created: ' . $name);
  156. return true;
  157. }
  158. } catch (Exception $e) {
  159. $this->getLogger()->err($e->getMessage() . ' code ' . $e->getCode());
  160. /**
  161. * @see Zend_Queue_Exception
  162. */
  163. require_once 'Zend/Queue/Exception.php';
  164. throw new Zend_Queue_Exception($e->getMessage(), $e->getCode());
  165. }
  166. return false;
  167. }
  168. /**
  169. * Delete a queue and all of it's messages
  170. *
  171. * Returns false if the queue is not found, true if the queue exists
  172. *
  173. * @param string $name queue name
  174. * @return boolean
  175. * @throws Zend_Queue_Exception - database error
  176. */
  177. public function delete($name)
  178. {
  179. $id = $this->getQueueId($name); // get primary key
  180. // if the queue does not exist then it must already be deleted.
  181. $list = $this->_queueTable->find($id);
  182. if (count($list) === 0) {
  183. return false;
  184. }
  185. $queue = $list->current();
  186. if ($queue instanceof Zend_Db_Table_Row_Abstract) {
  187. try {
  188. $queue->delete();
  189. } catch (Exception $e) {
  190. require_once 'Zend/Queue/Exception.php';
  191. throw new Zend_Queue_Exception($e->getMessage(), $e->getCode());
  192. }
  193. }
  194. return true;
  195. }
  196. /*
  197. * Get an array of all available queues
  198. *
  199. * Not all adapters support getQueues(), use isSupported('getQueues')
  200. * to determine if the adapter supports this feature.
  201. *
  202. * @return array
  203. * @throws Zend_Queue_Exception - database error
  204. */
  205. public function getQueues()
  206. {
  207. $query = $this->_queueTable->select();
  208. $query->from($this->_queueTable, array('queue_id', 'queue_name'));
  209. $list = array();
  210. foreach ($this->_queueTable->fetchAll($query) as $queue) {
  211. $list[] = $queue->queue_name;
  212. }
  213. return $list;
  214. }
  215. /**
  216. * Return the approximate number of messages in the queue
  217. *
  218. * @param Zend_Queue $queue
  219. * @return integer
  220. * @throws Zend_Queue_Exception
  221. */
  222. public function count(Zend_Queue $queue = null)
  223. {
  224. if ($queue === null) {
  225. $queue = $this->_queue;
  226. }
  227. $info = $this->_messageTable->info();
  228. $db = $this->_messageTable->getAdapter();
  229. $query = $db->select();
  230. $query->from($info['name'], array(new Zend_Db_Expr('COUNT(1)')))
  231. ->where('queue_id=?', $this->getQueueId($queue->getName()));
  232. // return count results
  233. return (int) $db->fetchOne($query);
  234. }
  235. /********************************************************************
  236. * Messsage management functions
  237. *********************************************************************/
  238. /**
  239. * Send a message to the queue
  240. *
  241. * @param string $message Message to send to the active queue
  242. * @param Zend_Queue $queue
  243. * @return Zend_Queue_Message
  244. * @throws Zend_Queue_Exception - database error
  245. */
  246. public function send($message, Zend_Queue $queue = null)
  247. {
  248. if ($queue === null) {
  249. $queue = $this->_queue;
  250. }
  251. if (is_string($message)) {
  252. $message = trim($message);
  253. } elseif (is_scalar($message)) {
  254. $message = (string) $message;
  255. } else {
  256. $message = serialize($message);
  257. }
  258. if (!$this->isExists($queue->getName())) {
  259. require_once 'Zend/Queue/Exception.php';
  260. throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
  261. }
  262. $msg = $this->_messageTable->createRow();
  263. $msg->queue_id = $this->getQueueId($queue->getName());
  264. $msg->created = time();
  265. $msg->body = $message;
  266. $msg->md5 = md5($message);
  267. // $msg->timeout = ??? @TODO
  268. try {
  269. $msg->save();
  270. } catch (Exception $e) {
  271. require_once 'Zend/Queue/Exception.php';
  272. throw new Zend_Queue_Exception($e->getMessage(), $e->getCode());
  273. }
  274. $options = array(
  275. 'queue' => $queue,
  276. 'data' => $msg->toArray(),
  277. );
  278. $classname = $queue->getMessageClass();
  279. if (!class_exists($classname)) {
  280. require_once 'Zend/Loader.php';
  281. Zend_Loader::loadClass($classname);
  282. }
  283. return new $classname($options);
  284. }
  285. /**
  286. * Get messages in the queue
  287. *
  288. * @param integer $maxMessages Maximum number of messages to return
  289. * @param integer $timeout Visibility timeout for these messages
  290. * @param Zend_Queue $queue
  291. * @return Zend_Queue_Message_Iterator
  292. * @throws Zend_Queue_Exception - database error
  293. */
  294. public function receive($maxMessages = null, $timeout = null, Zend_Queue $queue = null)
  295. {
  296. if ($maxMessages === null) {
  297. $maxMessages = 1;
  298. }
  299. if ($timeout === null) {
  300. $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
  301. }
  302. if ($queue === null) {
  303. $queue = $this->_queue;
  304. }
  305. $msgs = array();
  306. $info = $this->_messageTable->info();
  307. $microtime = microtime(true); // cache microtime
  308. $db = $this->_messageTable->getAdapter();
  309. // start transaction handling
  310. try {
  311. $db->beginTransaction();
  312. $query = $db->select();
  313. if ($this->_config['options'][Zend_Db_Select::FOR_UPDATE]) {
  314. // turn on forUpdate
  315. $query->forUpdate();
  316. }
  317. $query->from($info['name'], array('*'))
  318. ->where('queue_id=?', $this->getQueueId($queue->getName()))
  319. ->where('handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime)
  320. ->limit($maxMessages);
  321. foreach ($db->fetchAll($query) as $data) {
  322. // setup our changes to the message
  323. $data['handle'] = md5(uniqid(rand(), true));
  324. $update = array(
  325. 'handle' => $data['handle'],
  326. 'timeout' => $microtime,
  327. );
  328. // update the database
  329. $where = array();
  330. $where[] = $db->quoteInto('message_id=?', $data['message_id']);
  331. $where[] = 'handle IS NULL OR timeout+' . (int)$timeout . ' < ' . (int)$microtime;
  332. $count = $db->update($info['name'], $update, $where);
  333. // we check count to make sure no other thread has gotten
  334. // the rows after our select, but before our update.
  335. if ($count > 0) {
  336. $msgs[] = $data;
  337. }
  338. }
  339. $db->commit();
  340. } catch (Exception $e) {
  341. $db->rollBack();
  342. require_once 'Zend/Queue/Exception.php';
  343. throw new Zend_Queue_Exception($e->getMessage(), $e->getCode());
  344. }
  345. $options = array(
  346. 'queue' => $queue,
  347. 'data' => $msgs,
  348. 'messageClass' => $queue->getMessageClass(),
  349. );
  350. $classname = $queue->getMessageSetClass();
  351. if (!class_exists($classname)) {
  352. require_once 'Zend/Loader.php';
  353. Zend_Loader::loadClass($classname);
  354. }
  355. return new $classname($options);
  356. }
  357. /**
  358. * Delete a message from the queue
  359. *
  360. * Returns true if the message is deleted, false if the deletion is
  361. * unsuccessful.
  362. *
  363. * @param Zend_Queue_Message $message
  364. * @return boolean
  365. * @throws Zend_Queue_Exception - database error
  366. */
  367. public function deleteMessage(Zend_Queue_Message $message)
  368. {
  369. $db = $this->_messageTable->getAdapter();
  370. $where = $db->quoteInto('handle=?', $message->handle);
  371. if ($this->_messageTable->delete($where)) {
  372. return true;
  373. }
  374. return false;
  375. }
  376. /********************************************************************
  377. * Supporting functions
  378. *********************************************************************/
  379. /**
  380. * Return a list of queue capabilities functions
  381. *
  382. * $array['function name'] = true or false
  383. * true is supported, false is not supported.
  384. *
  385. * @param string $name
  386. * @return array
  387. */
  388. public function getCapabilities()
  389. {
  390. return array(
  391. 'create' => true,
  392. 'delete' => true,
  393. 'send' => true,
  394. 'receive' => true,
  395. 'deleteMessage' => true,
  396. 'getQueues' => true,
  397. 'count' => true,
  398. 'isExists' => true,
  399. );
  400. }
  401. /********************************************************************
  402. * Functions that are not part of the Zend_Queue_Adapter_Abstract
  403. *********************************************************************/
  404. /**
  405. * Get the queue ID
  406. *
  407. * Returns the queue's row identifier.
  408. *
  409. * @param string $name
  410. * @return integer|null
  411. * @throws Zend_Queue_Exception
  412. */
  413. protected function getQueueId($name)
  414. {
  415. $query = $this->_queueTable->select();
  416. $query->from($this->_queueTable, array('queue_id'))
  417. ->where('queue_name=?', $name);
  418. $queue = $this->_queueTable->fetchRow($query);
  419. if ($queue === null) {
  420. require_once 'Zend/Queue/Exception.php';
  421. throw new Zend_Queue_Exception('Queue does not exist: ' . $name);
  422. }
  423. return (int)$queue->queue_id;
  424. }
  425. }