Sqs.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. <?php
  2. /**
  3. * Zend Framework
  4. *
  5. * LICENSE
  6. *
  7. * This source file is subject to the new BSD license that is bundled
  8. * with this package in the file LICENSE.txt.
  9. * It is also available through the world-wide-web at this URL:
  10. * http://framework.zend.com/license/new-bsd
  11. * If you did not receive a copy of the license and are unable to
  12. * obtain it through the world-wide-web, please send an email
  13. * to license@zend.com so we can send you a copy immediately.
  14. *
  15. * @category Zend
  16. * @package Zend_Service
  17. * @subpackage Amazon_Sqs
  18. * @copyright Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
  19. * @license http://framework.zend.com/license/new-bsd New BSD License
  20. * @version $Id$
  21. */
  22. /**
  23. * @see Zend_Service_Amazon_Abstract
  24. */
  25. require_once 'Zend/Service/Amazon/Abstract.php';
  26. /**
  27. * @see Zend_Crypt_Hmac
  28. */
  29. require_once 'Zend/Crypt/Hmac.php';
  30. /**
  31. * Class for connecting to the Amazon Simple Queue Service (SQS)
  32. *
  33. * @category Zend
  34. * @package Zend_Service
  35. * @subpackage Amazon_Sqs
  36. * @copyright Copyright (c) 2005-2010 Zend Technologies USA Inc. (http://www.zend.com)
  37. * @license http://framework.zend.com/license/new-bsd New BSD License
  38. * @see http://aws.amazon.com/sqs/ Amazon Simple Queue Service
  39. */
  40. class Zend_Service_Amazon_Sqs extends Zend_Service_Amazon_Abstract
  41. {
  42. /**
  43. * Default timeout for createQueue() function
  44. */
  45. const CREATE_TIMEOUT_DEFAULT = 30;
  46. /**
  47. * HTTP end point for the Amazon SQS service
  48. */
  49. protected $_sqsEndpoint = 'queue.amazonaws.com';
  50. /**
  51. * The API version to use
  52. */
  53. protected $_sqsApiVersion = '2009-02-01';
  54. /**
  55. * Signature Version
  56. */
  57. protected $_sqsSignatureVersion = '2';
  58. /**
  59. * Signature Encoding Method
  60. */
  61. protected $_sqsSignatureMethod = 'HmacSHA256';
  62. /**
  63. * Constructor
  64. *
  65. * @param string $accessKey
  66. * @param string $secretKey
  67. * @param string $region
  68. */
  69. public function __construct($accessKey = null, $secretKey = null, $region = null)
  70. {
  71. parent::__construct($accessKey, $secretKey, $region);
  72. }
  73. /**
  74. * Create a new queue
  75. *
  76. * Visibility timeout is how long a message is left in the queue "invisible"
  77. * to other readers. If the message is acknowleged (deleted) before the
  78. * timeout, then the message is deleted. However, if the timeout expires
  79. * then the message will be made available to other queue readers.
  80. *
  81. * @param string $queue_name queue name
  82. * @param integer $timeout default visibility timeout
  83. * @return string|boolean
  84. * @throws Zend_Service_Amazon_Sqs_Exception
  85. */
  86. public function create($queue_name, $timeout = null)
  87. {
  88. $params = array();
  89. $params['QueueName'] = $queue_name;
  90. $timeout = ($timeout === null) ? self::CREATE_TIMEOUT_DEFAULT : (int)$timeout;
  91. $params['DefaultVisibilityTimeout'] = $timeout;
  92. $retry_count = 0;
  93. do {
  94. $retry = false;
  95. $result = $this->_makeRequest(null, 'CreateQueue', $params);
  96. if (!isset($result->CreateQueueResult->QueueUrl)
  97. || empty($result->CreateQueueResult->QueueUrl)
  98. ) {
  99. if ($result->Error->Code == 'AWS.SimpleQueueService.QueueNameExists') {
  100. return false;
  101. } elseif ($result->Error->Code == 'AWS.SimpleQueueService.QueueDeletedRecently') {
  102. // Must sleep for 60 seconds, then try re-creating the queue
  103. sleep(60);
  104. $retry = true;
  105. $retry_count++;
  106. } else {
  107. require_once 'Zend/Service/Amazon/Sqs/Exception.php';
  108. throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
  109. }
  110. } else {
  111. return (string) $result->CreateQueueResult->QueueUrl;
  112. }
  113. } while ($retry);
  114. return false;
  115. }
  116. /**
  117. * Delete a queue and all of it's messages
  118. *
  119. * Returns false if the queue is not found, true if the queue exists
  120. *
  121. * @param string $queue_url queue URL
  122. * @return boolean
  123. * @throws Zend_Service_Amazon_Sqs_Exception
  124. */
  125. public function delete($queue_url)
  126. {
  127. $result = $this->_makeRequest($queue_url, 'DeleteQueue');
  128. if ($result->Error->Code !== null) {
  129. require_once 'Zend/Service/Amazon/Sqs/Exception.php';
  130. throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
  131. }
  132. return true;
  133. }
  134. /**
  135. * Get an array of all available queues
  136. *
  137. * @return array
  138. * @throws Zend_Service_Amazon_Sqs_Exception
  139. */
  140. public function getQueues()
  141. {
  142. $result = $this->_makeRequest(null, 'ListQueues');
  143. if (isset($result->Error)) {
  144. require_once 'Zend/Service/Amazon/Sqs/Exception.php';
  145. throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
  146. }
  147. if (!isset($result->ListQueuesResult->QueueUrl)
  148. || empty($result->ListQueuesResult->QueueUrl)
  149. ) {
  150. return array();
  151. }
  152. $queues = array();
  153. foreach ($result->ListQueuesResult->QueueUrl as $queue_url) {
  154. $queues[] = (string)$queue_url;
  155. }
  156. return $queues;
  157. }
  158. /**
  159. * Return the approximate number of messages in the queue
  160. *
  161. * @param string $queue_url Queue URL
  162. * @return integer
  163. * @throws Zend_Service_Amazon_Sqs_Exception
  164. */
  165. public function count($queue_url)
  166. {
  167. return (int)$this->getAttribute($queue_url, 'ApproximateNumberOfMessages');
  168. }
  169. /**
  170. * Send a message to the queue
  171. *
  172. * @param string $queue_url Queue URL
  173. * @param string $message Message to send to the queue
  174. * @return string Message ID
  175. * @throws Zend_Service_Amazon_Sqs_Exception
  176. */
  177. public function send($queue_url, $message)
  178. {
  179. $params = array();
  180. $params['MessageBody'] = urlencode($message);
  181. $checksum = md5($params['MessageBody']);
  182. $result = $this->_makeRequest($queue_url, 'SendMessage', $params);
  183. if (!isset($result->SendMessageResult->MessageId)
  184. || empty($result->SendMessageResult->MessageId)
  185. ) {
  186. require_once 'Zend/Service/Amazon/Sqs/Exception.php';
  187. throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
  188. } else if ((string) $result->SendMessageResult->MD5OfMessageBody != $checksum) {
  189. require_once 'Zend/Service/Amazon/Sqs/Exception.php';
  190. throw new Zend_Service_Amazon_Sqs_Exception('MD5 of body does not match message sent');
  191. }
  192. return (string) $result->SendMessageResult->MessageId;
  193. }
  194. /**
  195. * Get messages in the queue
  196. *
  197. * @param string $queue_url Queue name
  198. * @param integer $max_messages Maximum number of messages to return
  199. * @param integer $timeout Visibility timeout for these messages
  200. * @return array
  201. * @throws Zend_Service_Amazon_Sqs_Exception
  202. */
  203. public function receive($queue_url, $max_messages = null, $timeout = null)
  204. {
  205. $params = array();
  206. // If not set, the visibility timeout on the queue is used
  207. if ($timeout !== null) {
  208. $params['VisibilityTimeout'] = (int)$timeout;
  209. }
  210. // SQS will default to only returning one message
  211. if ($max_messages !== null) {
  212. $params['MaxNumberOfMessages'] = (int)$max_messages;
  213. }
  214. $result = $this->_makeRequest($queue_url, 'ReceiveMessage', $params);
  215. if (!isset($result->ReceiveMessageResult->Message)
  216. || empty($result->ReceiveMessageResult->Message)
  217. ) {
  218. require_once 'Zend/Service/Amazon/Sqs/Exception.php';
  219. throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
  220. }
  221. $data = array();
  222. foreach ($result->ReceiveMessageResult->Message as $message) {
  223. $data[] = array(
  224. 'message_id' => (string)$message->MessageId,
  225. 'handle' => (string)$message->ReceiptHandle,
  226. 'md5' => (string)$message->MD5OfBody,
  227. 'body' => urldecode((string)$message->Body),
  228. );
  229. }
  230. return $data;
  231. }
  232. /**
  233. * Delete a message from the queue
  234. *
  235. * Returns true if the message is deleted, false if the deletion is
  236. * unsuccessful.
  237. *
  238. * @param string $queue_url Queue URL
  239. * @param string $handle Message handle as returned by SQS
  240. * @return boolean
  241. * @throws Zend_Service_Amazon_Sqs_Exception
  242. */
  243. public function deleteMessage($queue_url, $handle)
  244. {
  245. $params = array();
  246. $params['ReceiptHandle'] = (string)$handle;
  247. $result = $this->_makeRequest($queue_url, 'DeleteMessage', $params);
  248. if (isset($result->Error->Code)
  249. && !empty($result->Error->Code)
  250. ) {
  251. return false;
  252. }
  253. // Will always return true unless ReceiptHandle is malformed
  254. return true;
  255. }
  256. /**
  257. * Get the attributes for the queue
  258. *
  259. * @param string $queue_url Queue URL
  260. * @param string $attribute
  261. * @return string
  262. * @throws Zend_Service_Amazon_Sqs_Exception
  263. */
  264. public function getAttribute($queue_url, $attribute = 'All')
  265. {
  266. $params = array();
  267. $params['AttributeName'] = $attribute;
  268. $result = $this->_makeRequest($queue_url, 'GetQueueAttributes', $params);
  269. if (!isset($result->GetQueueAttributesResult->Attribute)
  270. || empty($result->GetQueueAttributesResult->Attribute)
  271. ) {
  272. require_once 'Zend/Service/Amazon/Sqs/Exception.php';
  273. throw new Zend_Service_Amazon_Sqs_Exception($result->Error->Code);
  274. }
  275. if(count($result->GetQueueAttributesResult->Attribute) > 1) {
  276. $attr_result = array();
  277. foreach($result->GetQueueAttributesResult->Attribute as $attribute) {
  278. $attr_result[(string)$attribute->Name] = (string)$attribute->Value;
  279. }
  280. return $attr_result;
  281. } else {
  282. return (string) $result->GetQueueAttributesResult->Attribute->Value;
  283. }
  284. }
  285. /**
  286. * Make a request to Amazon SQS
  287. *
  288. * @param string $queue Queue Name
  289. * @param string $action SQS action
  290. * @param array $params
  291. * @return SimpleXMLElement
  292. */
  293. private function _makeRequest($queue_url, $action, $params = array())
  294. {
  295. $params['Action'] = $action;
  296. $params = $this->addRequiredParameters($queue_url, $params);
  297. if ($queue_url === null) {
  298. $queue_url = '/';
  299. }
  300. $client = self::getHttpClient();
  301. switch ($action) {
  302. case 'ListQueues':
  303. case 'CreateQueue':
  304. $client->setUri('http://'.$this->_sqsEndpoint);
  305. break;
  306. default:
  307. $client->setUri($queue_url);
  308. break;
  309. }
  310. $retry_count = 0;
  311. do {
  312. $retry = false;
  313. $client->resetParameters();
  314. $client->setParameterGet($params);
  315. $response = $client->request('GET');
  316. $response_code = $response->getStatus();
  317. // Some 5xx errors are expected, so retry automatically
  318. if ($response_code >= 500 && $response_code < 600 && $retry_count <= 5) {
  319. $retry = true;
  320. $retry_count++;
  321. sleep($retry_count / 4 * $retry_count);
  322. }
  323. } while ($retry);
  324. unset($client);
  325. return new SimpleXMLElement($response->getBody());
  326. }
  327. /**
  328. * Adds required authentication and version parameters to an array of
  329. * parameters
  330. *
  331. * The required parameters are:
  332. * - AWSAccessKey
  333. * - SignatureVersion
  334. * - Timestamp
  335. * - Version and
  336. * - Signature
  337. *
  338. * If a required parameter is already set in the <tt>$parameters</tt> array,
  339. * it is overwritten.
  340. *
  341. * @param string $queue_url Queue URL
  342. * @param array $parameters the array to which to add the required
  343. * parameters.
  344. * @return array
  345. */
  346. protected function addRequiredParameters($queue_url, array $parameters)
  347. {
  348. $parameters['AWSAccessKeyId'] = $this->_getAccessKey();
  349. $parameters['SignatureVersion'] = $this->_sqsSignatureVersion;
  350. $parameters['Timestamp'] = gmdate('Y-m-d\TH:i:s\Z', time()+10);
  351. $parameters['Version'] = $this->_sqsApiVersion;
  352. $parameters['SignatureMethod'] = $this->_sqsSignatureMethod;
  353. $parameters['Signature'] = $this->_signParameters($queue_url, $parameters);
  354. return $parameters;
  355. }
  356. /**
  357. * Computes the RFC 2104-compliant HMAC signature for request parameters
  358. *
  359. * This implements the Amazon Web Services signature, as per the following
  360. * specification:
  361. *
  362. * 1. Sort all request parameters (including <tt>SignatureVersion</tt> and
  363. * excluding <tt>Signature</tt>, the value of which is being created),
  364. * ignoring case.
  365. *
  366. * 2. Iterate over the sorted list and append the parameter name (in its
  367. * original case) and then its value. Do not URL-encode the parameter
  368. * values before constructing this string. Do not use any separator
  369. * characters when appending strings.
  370. *
  371. * @param string $queue_url Queue URL
  372. * @param array $parameters the parameters for which to get the signature.
  373. *
  374. * @return string the signed data.
  375. */
  376. protected function _signParameters($queue_url, array $paramaters)
  377. {
  378. $data = "GET\n";
  379. $data .= $this->_sqsEndpoint . "\n";
  380. if ($queue_url !== null) {
  381. $data .= parse_url($queue_url, PHP_URL_PATH);
  382. }
  383. else {
  384. $data .= '/';
  385. }
  386. $data .= "\n";
  387. uksort($paramaters, 'strcmp');
  388. unset($paramaters['Signature']);
  389. $arrData = array();
  390. foreach($paramaters as $key => $value) {
  391. $arrData[] = $key . '=' . str_replace('%7E', '~', urlencode($value));
  392. }
  393. $data .= implode('&', $arrData);
  394. $hmac = Zend_Crypt_Hmac::compute($this->_getSecretKey(), 'SHA256', $data, Zend_Crypt_Hmac::BINARY);
  395. return base64_encode($hmac);
  396. }
  397. }