Sqs.php 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. <?php
  2. /**
  3. * LICENSE
  4. *
  5. * This source file is subject to the new BSD license that is bundled
  6. * with this package in the file LICENSE.txt.
  7. * It is also available through the world-wide-web at this URL:
  8. * http://framework.zend.com/license/new-bsd
  9. * If you did not receive a copy of the license and are unable to
  10. * obtain it through the world-wide-web, please send an email
  11. * to license@zend.com so we can send you a copy immediately.
  12. *
  13. * @category Zend
  14. * @package Zend_Cloud
  15. * @subpackage QueueService
  16. * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
  17. * @license http://framework.zend.com/license/new-bsd New BSD License
  18. */
  19. require_once 'Zend/Service/Amazon/Sqs.php';
  20. require_once 'Zend/Cloud/QueueService/Adapter/AbstractAdapter.php';
  21. require_once 'Zend/Cloud/QueueService/Exception.php';
  22. require_once 'Zend/Cloud/QueueService/Message.php';
  23. /**
  24. * SQS adapter for simple queue service.
  25. *
  26. * @category Zend
  27. * @package Zend_Cloud
  28. * @subpackage QueueService
  29. * @copyright Copyright (c) 2005-2015 Zend Technologies USA Inc. (http://www.zend.com)
  30. * @license http://framework.zend.com/license/new-bsd New BSD License
  31. */
  32. class Zend_Cloud_QueueService_Adapter_Sqs
  33. extends Zend_Cloud_QueueService_Adapter_AbstractAdapter
  34. {
  35. /*
  36. * Options array keys for the SQS adapter.
  37. */
  38. const AWS_ACCESS_KEY = 'aws_accesskey';
  39. const AWS_SECRET_KEY = 'aws_secretkey';
  40. /**
  41. * Defaults
  42. */
  43. const CREATE_TIMEOUT = 30;
  44. /**
  45. * SQS service instance.
  46. * @var Zend_Service_Amazon_Sqs
  47. */
  48. protected $_sqs;
  49. /**
  50. * Constructor
  51. *
  52. * @param array|Zend_Config $options
  53. * @return void
  54. */
  55. public function __construct($options = array())
  56. {
  57. if ($options instanceof Zend_Config) {
  58. $options = $options->toArray();
  59. }
  60. if (!is_array($options)) {
  61. throw new Zend_Cloud_QueueService_Exception('Invalid options provided');
  62. }
  63. if (isset($options[self::MESSAGE_CLASS])) {
  64. $this->setMessageClass($options[self::MESSAGE_CLASS]);
  65. }
  66. if (isset($options[self::MESSAGESET_CLASS])) {
  67. $this->setMessageSetClass($options[self::MESSAGESET_CLASS]);
  68. }
  69. try {
  70. $this->_sqs = new Zend_Service_Amazon_Sqs(
  71. $options[self::AWS_ACCESS_KEY], $options[self::AWS_SECRET_KEY]
  72. );
  73. } catch(Zend_Service_Amazon_Exception $e) {
  74. throw new Zend_Cloud_QueueService_Exception('Error on create: '.$e->getMessage(), $e->getCode(), $e);
  75. }
  76. if(isset($options[self::HTTP_ADAPTER])) {
  77. $this->_sqs->getHttpClient()->setAdapter($options[self::HTTP_ADAPTER]);
  78. }
  79. }
  80. /**
  81. * Create a queue. Returns the ID of the created queue (typically the URL).
  82. * It may take some time to create the queue. Check your vendor's
  83. * documentation for details.
  84. *
  85. * @param string $name
  86. * @param array $options
  87. * @return string Queue ID (typically URL)
  88. */
  89. public function createQueue($name, $options = null)
  90. {
  91. try {
  92. return $this->_sqs->create($name, $options[self::CREATE_TIMEOUT]);
  93. } catch(Zend_Service_Amazon_Exception $e) {
  94. throw new Zend_Cloud_QueueService_Exception('Error on queue creation: '.$e->getMessage(), $e->getCode(), $e);
  95. }
  96. }
  97. /**
  98. * Delete a queue. All messages in the queue will also be deleted.
  99. *
  100. * @param string $queueId
  101. * @param array $options
  102. * @return boolean true if successful, false otherwise
  103. */
  104. public function deleteQueue($queueId, $options = null)
  105. {
  106. try {
  107. return $this->_sqs->delete($queueId);
  108. } catch(Zend_Service_Amazon_Exception $e) {
  109. throw new Zend_Cloud_QueueService_Exception('Error on queue deletion: '.$e->getMessage(), $e->getCode(), $e);
  110. }
  111. }
  112. /**
  113. * List all queues.
  114. *
  115. * @param array $options
  116. * @return array Queue IDs
  117. */
  118. public function listQueues($options = null)
  119. {
  120. try {
  121. return $this->_sqs->getQueues();
  122. } catch(Zend_Service_Amazon_Exception $e) {
  123. throw new Zend_Cloud_QueueService_Exception('Error on listing queues: '.$e->getMessage(), $e->getCode(), $e);
  124. }
  125. }
  126. /**
  127. * Get a key/value array of metadata for the given queue.
  128. *
  129. * @param string $queueId
  130. * @param array $options
  131. * @return array
  132. */
  133. public function fetchQueueMetadata($queueId, $options = null)
  134. {
  135. try {
  136. // TODO: ZF-9050 Fix the SQS client library in trunk to return all attribute values
  137. $attributes = $this->_sqs->getAttribute($queueId, 'All');
  138. if(is_array($attributes)) {
  139. return $attributes;
  140. } else {
  141. return array('All' => $this->_sqs->getAttribute($queueId, 'All'));
  142. }
  143. } catch(Zend_Service_Amazon_Exception $e) {
  144. throw new Zend_Cloud_QueueService_Exception('Error on fetching queue metadata: '.$e->getMessage(), $e->getCode(), $e);
  145. }
  146. }
  147. /**
  148. * Store a key/value array of metadata for the specified queue.
  149. * WARNING: This operation overwrites any metadata that is located at
  150. * $destinationPath. Some adapters may not support this method.
  151. *
  152. * @param array $metadata
  153. * @param string $queueId
  154. * @param array $options
  155. * @return void
  156. */
  157. public function storeQueueMetadata($queueId, $metadata, $options = null)
  158. {
  159. // TODO Add support for SetQueueAttributes to client library
  160. require_once 'Zend/Cloud/OperationNotAvailableException.php';
  161. throw new Zend_Cloud_OperationNotAvailableException('Amazon SQS doesn\'t currently support storing metadata');
  162. }
  163. /**
  164. * Send a message to the specified queue.
  165. *
  166. * @param string $message
  167. * @param string $queueId
  168. * @param array $options
  169. * @return string Message ID
  170. */
  171. public function sendMessage($queueId, $message, $options = null)
  172. {
  173. try {
  174. return $this->_sqs->send($queueId, $message);
  175. } catch(Zend_Service_Amazon_Exception $e) {
  176. throw new Zend_Cloud_QueueService_Exception('Error on sending message: '.$e->getMessage(), $e->getCode(), $e);
  177. }
  178. }
  179. /**
  180. * Recieve at most $max messages from the specified queue and return the
  181. * message IDs for messages recieved.
  182. *
  183. * @param string $queueId
  184. * @param int $max
  185. * @param array $options
  186. * @return array
  187. */
  188. public function receiveMessages($queueId, $max = 1, $options = null)
  189. {
  190. try {
  191. return $this->_makeMessages($this->_sqs->receive($queueId, $max, $options[self::VISIBILITY_TIMEOUT]));
  192. } catch(Zend_Service_Amazon_Exception $e) {
  193. throw new Zend_Cloud_QueueService_Exception('Error on recieving messages: '.$e->getMessage(), $e->getCode(), $e);
  194. }
  195. }
  196. /**
  197. * Create Zend_Cloud_QueueService_Message array for
  198. * Sqs messages.
  199. *
  200. * @param array $messages
  201. * @return Zend_Cloud_QueueService_Message[]
  202. */
  203. protected function _makeMessages($messages)
  204. {
  205. $messageClass = $this->getMessageClass();
  206. $setClass = $this->getMessageSetClass();
  207. $result = array();
  208. foreach($messages as $message) {
  209. $result[] = new $messageClass($message['body'], $message);
  210. }
  211. return new $setClass($result);
  212. }
  213. /**
  214. * Delete the specified message from the specified queue.
  215. *
  216. * @param string $queueId
  217. * @param Zend_Cloud_QueueService_Message $message
  218. * @param array $options
  219. * @return void
  220. */
  221. public function deleteMessage($queueId, $message, $options = null)
  222. {
  223. try {
  224. if($message instanceof Zend_Cloud_QueueService_Message) {
  225. $message = $message->getMessage();
  226. }
  227. $messageId = $message['handle'];
  228. return $this->_sqs->deleteMessage($queueId, $messageId);
  229. } catch(Zend_Service_Amazon_Exception $e) {
  230. throw new Zend_Cloud_QueueService_Exception('Error on deleting a message: '.$e->getMessage(), $e->getCode(), $e);
  231. }
  232. }
  233. /**
  234. * Peek at the messages from the specified queue without removing them.
  235. *
  236. * @param string $queueId
  237. * @param int $num How many messages
  238. * @param array $options
  239. * @return Zend_Cloud_QueueService_Message[]
  240. */
  241. public function peekMessages($queueId, $num = 1, $options = null)
  242. {
  243. try {
  244. return $this->_makeMessages($this->_sqs->receive($queueId, $num, 0));
  245. } catch(Zend_Service_Amazon_Exception $e) {
  246. throw new Zend_Cloud_QueueService_Exception('Error on peeking messages: '.$e->getMessage(), $e->getCode(), $e);
  247. }
  248. }
  249. /**
  250. * Get SQS implementation
  251. * @return Zend_Service_Amazon_Sqs
  252. */
  253. public function getClient()
  254. {
  255. return $this->_sqs;
  256. }
  257. }