|
|
@@ -54,6 +54,11 @@ class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
* @var integer
|
|
|
*/
|
|
|
protected $_port = null;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @var resource
|
|
|
+ */
|
|
|
+ protected $_socket = null;
|
|
|
|
|
|
/********************************************************************
|
|
|
* Constructor / Destructor
|
|
|
@@ -107,6 +112,11 @@ class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
if ($this->_cache instanceof Memcache) {
|
|
|
$this->_cache->close();
|
|
|
}
|
|
|
+ if (is_resource($this->_socket)) {
|
|
|
+ $cmd = 'quit' . self::EOL;
|
|
|
+ fwrite($this->_socket, $cmd);
|
|
|
+ fclose($this->_socket);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/********************************************************************
|
|
|
@@ -126,7 +136,11 @@ class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
*/
|
|
|
public function isExists($name)
|
|
|
{
|
|
|
- return in_array($name, $this->getQueues());
|
|
|
+ if (empty($this->_queues)) {
|
|
|
+ $this->getQueues();
|
|
|
+ }
|
|
|
+
|
|
|
+ return in_array($name, $this->_queues);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -192,14 +206,15 @@ class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
*/
|
|
|
public function getQueues()
|
|
|
{
|
|
|
+ $this->_queues = array();
|
|
|
+
|
|
|
$response = $this->_sendCommand('stats queue', array('END'));
|
|
|
- $list = array();
|
|
|
|
|
|
foreach ($response as $i => $line) {
|
|
|
- $list[] = str_replace('STAT ', '', $line);
|
|
|
+ $this->_queues[] = str_replace('STAT ', '', $line);
|
|
|
}
|
|
|
|
|
|
- return $list;
|
|
|
+ return $this->_queues;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -375,8 +390,10 @@ class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
*/
|
|
|
protected function _sendCommand($command, array $terminator, $include_term=false)
|
|
|
{
|
|
|
- $fp = fsockopen($this->_host, $this->_port, $errno, $errstr, 10);
|
|
|
- if ($fp === false) {
|
|
|
+ if (!is_resource($this->_socket)) {
|
|
|
+ $this->_socket = fsockopen($this->_host, $this->_port, $errno, $errstr, 10);
|
|
|
+ }
|
|
|
+ if ($this->_socket === false) {
|
|
|
require_once 'Zend/Queue/Exception.php';
|
|
|
throw new Zend_Queue_Exception("Could not open a connection to $this->_host:$this->_port errno=$errno : $errstr");
|
|
|
}
|
|
|
@@ -384,11 +401,11 @@ class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
$response = array();
|
|
|
|
|
|
$cmd = $command . self::EOL;
|
|
|
- fwrite($fp, $cmd);
|
|
|
+ fwrite($this->_socket, $cmd);
|
|
|
|
|
|
$continue_reading = true;
|
|
|
- while (!feof($fp) && $continue_reading) {
|
|
|
- $resp = trim(fgets($fp, 1024));
|
|
|
+ while (!feof($this->_socket) && $continue_reading) {
|
|
|
+ $resp = trim(fgets($this->_socket, 1024));
|
|
|
if (in_array($resp, $terminator)) {
|
|
|
if ($include_term) {
|
|
|
$response[] = $resp;
|
|
|
@@ -399,10 +416,6 @@ class Zend_Queue_Adapter_Memcacheq extends Zend_Queue_Adapter_AdapterAbstract
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- $cmd = 'quit' . self::EOL;
|
|
|
- fwrite($fp, $cmd);
|
|
|
- fclose($fp);
|
|
|
-
|
|
|
return $response;
|
|
|
}
|
|
|
}
|