Mongoadapter.php 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390
  1. <?php
  2. require_once 'Zend/Queue/Adapter/AdapterAbstract.php';
  3. /**
  4. * Class for using a standard PHP array as a queue
  5. *
  6. * @category Zend
  7. * @package Zend_Queue
  8. * @subpackage Adapter
  9. * @copyright Copyright (c) 2005-2012 Zend Technologies USA Inc. (http://www.zend.com)
  10. * @license http://framework.zend.com/license/new-bsd New BSD License
  11. */
  12. class Mooses_Mongodb_Queue_Mongoadapter extends Zend_Queue_Adapter_AdapterAbstract {
  13. /**
  14. * @var array
  15. */
  16. protected $_data = array();
  17. protected $_dataContainer = NULL;
  18. protected $_queueCollection = NULL;
  19. protected static $instance = NULL;
  20. /**
  21. * Constructor
  22. *
  23. * @param array|Zend_Config $options
  24. * @param Zend_Queue|null $queue
  25. * @return void
  26. */
  27. public function __construct($options, Zend_Queue $queue = null)
  28. {
  29. $this->_dataContainer = new Mooses_Mongodb_Queue_Abstractadapter();
  30. $this->_queueCollection = new Mooses_Mongodb_Queue_Queueadapter();
  31. if(is_null($options)){
  32. $options = array();
  33. }
  34. parent::__construct($options, $queue);
  35. }
  36. public static function getInstance() {
  37. if(is_null(self::$instance)) {
  38. $_calledClass = get_called_class();
  39. self::$instance = new $_calledClass();
  40. }
  41. return self::$instance;
  42. }
  43. /********************************************************************
  44. * Queue management functions
  45. *********************************************************************/
  46. /**
  47. * Does a queue already exist?
  48. *
  49. * Throws an exception if the adapter cannot determine if a queue exists.
  50. * use isSupported('isExists') to determine if an adapter can test for
  51. * queue existance.
  52. *
  53. * @param string $name
  54. * @return boolean
  55. */
  56. public function isExists($name)
  57. {
  58. try {
  59. $_existantQueue = $this->_queueCollection->loadByAttribute("queue_name", $name);
  60. return (($_existantQueue) ? true : false);
  61. } catch(Exception $e){
  62. var_dump($e->getMessage());
  63. die($e->getTraceAsString());
  64. }
  65. }
  66. /**
  67. * Create a new queue
  68. *
  69. * Visibility timeout is how long a message is left in the queue "invisible"
  70. * to other readers. If the message is acknowleged (deleted) before the
  71. * timeout, then the message is deleted. However, if the timeout expires
  72. * then the message will be made available to other queue readers.
  73. *
  74. * @param string $name queue name
  75. * @param integer $timeout default visibility timeout
  76. * @return boolean
  77. */
  78. public function create($name, $timeout=null)
  79. {
  80. if ($this->isExists($name)) {
  81. return false;
  82. }
  83. if ($timeout === null) {
  84. $timeout = self::CREATE_TIMEOUT_DEFAULT;
  85. }
  86. $_newQueue = $this->_queueCollection->setData("queue_id", uniqid("vfq_"))
  87. ->setData("queue_name", $name)
  88. ->setData("timeout", 86400);
  89. $_newQueue->save(true);
  90. return true;
  91. }
  92. /**
  93. * Delete a queue and all of it's messages
  94. *
  95. * Returns false if the queue is not found, true if the queue exists
  96. *
  97. * @param string $name queue name
  98. * @return boolean
  99. */
  100. public function delete($name)
  101. {
  102. try {
  103. $_existantQueue = $this->_queueCollection->loadByAttribute("name", $name);
  104. if ($_existantQueue) {
  105. $_collectionMessages = $this->_dataContainer->getCollection()->addAttributeToFilter("queue_id", $_existantQueue->getData("queue_id"))->getCollectionData();
  106. foreach ($_collectionMessages as $_message) {
  107. $_message->delete(false);
  108. }
  109. $this->_dataContainer->load($_existantQueue->getId())->delete(false);
  110. }
  111. return true;
  112. } catch (Exception $e){
  113. return false;
  114. }
  115. }
  116. /**
  117. * Get an array of all available queues
  118. *
  119. * Not all adapters support getQueues(), use isSupported('getQueues')
  120. * to determine if the adapter supports this feature.
  121. *
  122. * @return array
  123. */
  124. public function getQueues()
  125. {
  126. $_collectionQueues = $this->_queueCollection->getCollection()->getCollectionData();
  127. return $_collectionQueues;
  128. }
  129. /**
  130. * Return the approximate number of messages in the queue
  131. *
  132. * @param Zend_Queue $queue
  133. * @return integer
  134. * @throws Zend_Queue_Exception
  135. */
  136. public function count(Zend_Queue $queue=null)
  137. {
  138. if ($queue === null) {
  139. $queue = $this->queue;
  140. }
  141. if ($this->isExists($queue->getName())) {
  142. /**
  143. * @see Zend_Queue_Exception
  144. */
  145. // require_once 'Zend/Queue/Exception.php';
  146. throw new Zend_Queue_Exception('Queue does not exist');
  147. }
  148. $_messages = $queue->count();
  149. return $_messages;
  150. }
  151. /********************************************************************
  152. * Messsage management functions
  153. *********************************************************************/
  154. /**
  155. * Send a message to the queue
  156. *
  157. * @param string $message Message to send to the active queue
  158. * @param Zend_Queue $queue
  159. * @return Zend_Queue_Message
  160. * @throws Zend_Queue_Exception
  161. */
  162. public function send($message, Zend_Queue $queue=null)
  163. {
  164. if ($queue === null) {
  165. $queue = $this->_queue;
  166. }
  167. if (!$this->isExists($queue->getName())) {
  168. throw new Zend_Queue_Exception('Queue does not exist:' . $queue->getName());
  169. }
  170. // create the message
  171. $data = array(
  172. 'message_id' => md5(uniqid(rand(), true)),
  173. 'body' => $message,
  174. 'md5' => md5($message),
  175. 'handle' => null,
  176. 'created' => time(),
  177. 'queue_name' => $queue->getName(),
  178. 'retrials' => 0
  179. );
  180. // add $data to the "queue"
  181. $_queueAdapter = $queue->getAdapter();
  182. $_queueAdapter->setData($data)->save();
  183. $options = array(
  184. 'queue' => $queue,
  185. 'data' => $data,
  186. );
  187. $classname = "Mooses_Mongodb_Queue_Abstractadapter";
  188. if (!class_exists($classname)) {
  189. require_once 'Zend/Loader.php';
  190. Zend_Loader::loadClass($classname);
  191. }
  192. return new $classname($options);
  193. }
  194. /**
  195. * Get messages in the queue
  196. *
  197. * @param integer $maxMessages Maximum number of messages to return
  198. * @param integer $timeout Visibility timeout for these messages
  199. * @param Zend_Queue $queue
  200. * @return Zend_Queue_Message_Iterator
  201. */
  202. public function receive($maxMessages = null, $timeout = 1800, Zend_Queue $queue = null)
  203. {
  204. if ($maxMessages === null) {
  205. $maxMessages = 1;
  206. }
  207. if ($timeout === null) {
  208. $timeout = self::RECEIVE_TIMEOUT_DEFAULT;
  209. }
  210. if ($queue === null) {
  211. $queue = $this->_queue;
  212. }
  213. $data = array();
  214. if ($maxMessages > 0) {
  215. $start_time = microtime(true);
  216. $count = 0;
  217. $_collectionModel = new Mooses_Mongodb_Queue_Abstractadapter();
  218. $_collectionData = $_collectionModel
  219. ->loadByAttribute("queue_name", $queue->getName(), true);
  220. foreach(array_reverse($_collectionData) as $key => $msg){
  221. if ($count >= $maxMessages) {
  222. break;
  223. }
  224. $_messageId = $msg->getData("message_id");
  225. $_retrials = (int) $msg->getData("retrials");
  226. if (($msg->getData('handle') === null) || (!is_null($msg->getData("handle")) && $_retrials < 3)) {
  227. $_randomId = md5(uniqid(rand(), true));
  228. $_microtime = microtime(true);
  229. $_retrials = (int) ($_retrials + 1);
  230. $msg->setData('handle', $_randomId)->setData("timeout", $_microtime);
  231. $msg->update(array("message_id" => $_messageId), array('$set' => array("handle" => $_randomId, "timeout" => $_microtime, "retrials" => $_retrials)));
  232. $data[] = $msg->getData();
  233. $count++;
  234. // } elseif(($msg->getData('handle') === null && $_timeout < $start_time)){
  235. // $_logger = Mooses_Logger::getInstance("vfctc_ordersynch_error");
  236. // $_logger->log("SYNCH_ORDERS", Zend_Log::WARN, "Too early to process this message: {\"message_id\": \"" . $_messageId . "\"}");
  237. } elseif($_retrials > 2){
  238. $_logger = Mooses_Logger::getInstance("vfctc_ordersynch_error");
  239. $_logger->log("SYNCH_ORDERS", Zend_Log::EMERG, "Too many retrials on message: {\"message_id\": \"" . $_messageId . "\"}");
  240. $msg->update(array("message_id" => $_messageId), array('$set' => array("queue_name" => "error_queue")));
  241. }
  242. }
  243. }
  244. $options = array(
  245. 'queue' => $queue,
  246. 'data' => $data,
  247. 'messageClass' => $queue->getMessageClass(),
  248. );
  249. $classname = $queue->getMessageSetClass();
  250. if (!class_exists($classname)) {
  251. require_once 'Zend/Loader.php';
  252. Zend_Loader::loadClass($classname);
  253. }
  254. return new $classname($options);
  255. }
  256. public function addRetryErrorMessage($_messageId, $_error, $_oldRetrialMessages = array()){
  257. $_collectionModel = new Mooses_Mongodb_Queue_Abstractadapter();
  258. $_message = $_collectionModel->loadByAttribute("message_id", $_messageId, false);
  259. if($_message){
  260. $_retrialMessages = array();
  261. if(is_array($_oldRetrialMessages)) {
  262. $_retrialMessages = array_merge($_retrialMessages, $_oldRetrialMessages);
  263. }
  264. if(is_object($_error) || is_array($_error)){
  265. $_error = json_encode($_error);
  266. }
  267. array_push($_retrialMessages, array("date" => date("Y-m-d H:i:s"), "message" => $_error));
  268. $_collectionModel->update(array("message_id" => $_messageId), array('$set' => array("retrial_messages" => $_retrialMessages)));
  269. };
  270. }
  271. /**
  272. * Delete a message from the queue
  273. *
  274. * Returns true if the message is deleted, false if the deletion is
  275. * unsuccessful.
  276. *
  277. * @param Zend_Queue_Message $message
  278. * @return boolean
  279. * @throws Zend_Queue_Exception
  280. */
  281. public function deleteMessage(Zend_Queue_Message $message)
  282. {
  283. try {
  284. $queue = new Mooses_Mongodb_Queue_Abstractadapter();
  285. $_messageData = $message->toArray();
  286. return $queue->remove(array("handle" => $_messageData['handle']), array("justOne" => true));
  287. } catch (Exception $e){
  288. return false;
  289. }
  290. }
  291. /********************************************************************
  292. * Supporting functions
  293. *********************************************************************/
  294. /**
  295. * Return a list of queue capabilities functions
  296. *
  297. * $array['function name'] = true or false
  298. * true is supported, false is not supported.
  299. *
  300. * @param string $name
  301. * @return array
  302. */
  303. public function getCapabilities()
  304. {
  305. return array(
  306. 'create' => true,
  307. 'delete' => true,
  308. 'send' => true,
  309. 'receive' => true,
  310. 'deleteMessage' => true,
  311. 'getQueues' => true,
  312. 'count' => true,
  313. 'isExists' => true,
  314. );
  315. }
  316. /********************************************************************
  317. * Functions that are not part of the Zend_Queue_Adapter_Abstract
  318. *********************************************************************/
  319. /**
  320. * serialize
  321. */
  322. public function __sleep()
  323. {
  324. return array('_data');
  325. }
  326. /*
  327. * These functions are debug helpers.
  328. */
  329. /**
  330. * returns underlying _data array
  331. * $queue->getAdapter()->getData();
  332. *
  333. * @return $this;
  334. */
  335. public function getData()
  336. {
  337. //$this->_dataContainer->
  338. }
  339. /**
  340. * sets the underlying _data array
  341. * $queue->getAdapter()->setData($data);
  342. *
  343. * @param array $data
  344. * @return $this;
  345. */
  346. public function setData($data)
  347. {
  348. if(is_array($data)) {
  349. foreach ($data as $_key => $_singleData) {
  350. $this->_dataContainer->setData($_key, $_singleData);
  351. }
  352. }
  353. return $this;
  354. }
  355. public function save(){
  356. return $this->_dataContainer->save(true, false);
  357. }
  358. }