SniffingConnectionPool.php 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. <?php
  2. namespace Elasticsearch\ConnectionPool;
  3. use Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException;
  4. use Elasticsearch\Common\Exceptions\NoNodesAvailableException;
  5. use Elasticsearch\ConnectionPool\Selectors\SelectorInterface;
  6. use Elasticsearch\Connections\Connection;
  7. use Elasticsearch\Connections\ConnectionFactoryInterface;
  8. class SniffingConnectionPool extends AbstractConnectionPool implements ConnectionPoolInterface
  9. {
  10. /** @var int */
  11. private $sniffingInterval = 300;
  12. /** @var int */
  13. private $nextSniff = -1;
  14. /**
  15. * {@inheritdoc}
  16. */
  17. public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams)
  18. {
  19. parent::__construct($connections, $selector, $factory, $connectionPoolParams);
  20. $this->setConnectionPoolParams($connectionPoolParams);
  21. $this->nextSniff = time() + $this->sniffingInterval;
  22. }
  23. /**
  24. * @param bool $force
  25. *
  26. * @return Connection
  27. * @throws \Elasticsearch\Common\Exceptions\NoNodesAvailableException
  28. */
  29. public function nextConnection($force = false)
  30. {
  31. $this->sniff($force);
  32. $size = count($this->connections);
  33. while ($size--) {
  34. /** @var Connection $connection */
  35. $connection = $this->selector->select($this->connections);
  36. if ($connection->isAlive() === true || $connection->ping() === true) {
  37. return $connection;
  38. }
  39. }
  40. if ($force === true) {
  41. throw new NoNodesAvailableException("No alive nodes found in your cluster");
  42. }
  43. return $this->nextConnection(true);
  44. }
  45. public function scheduleCheck()
  46. {
  47. $this->nextSniff = -1;
  48. }
  49. /**
  50. * @param bool $force
  51. */
  52. private function sniff($force = false)
  53. {
  54. if ($force === false && $this->nextSniff >= time()) {
  55. return;
  56. }
  57. $total = count($this->connections);
  58. while ($total--) {
  59. /** @var Connection $connection */
  60. $connection = $this->selector->select($this->connections);
  61. if ($connection->isAlive() xor $force) {
  62. continue;
  63. }
  64. if ($this->sniffConnection($connection) === true) {
  65. return;
  66. }
  67. }
  68. if ($force === true) {
  69. return;
  70. }
  71. foreach ($this->seedConnections as $connection) {
  72. if ($this->sniffConnection($connection) === true) {
  73. return;
  74. }
  75. }
  76. }
  77. /**
  78. * @param Connection $connection
  79. * @return bool
  80. */
  81. private function sniffConnection(Connection $connection)
  82. {
  83. try {
  84. $response = $connection->sniff();
  85. } catch (OperationTimeoutException $exception) {
  86. return false;
  87. }
  88. $nodes = $this->parseClusterState($connection->getTransportSchema(), $response);
  89. if (count($nodes) === 0) {
  90. return false;
  91. }
  92. $this->connections = array();
  93. foreach ($nodes as $node) {
  94. $nodeDetails = array(
  95. 'host' => $node['host'],
  96. 'port' => $node['port']
  97. );
  98. $this->connections[] = $this->connectionFactory->create($nodeDetails);
  99. }
  100. $this->nextSniff = time() + $this->sniffingInterval;
  101. return true;
  102. }
  103. private function parseClusterState($transportSchema, $nodeInfo)
  104. {
  105. $pattern = '/([^:]*):([0-9]+)/';
  106. $schemaAddress = $transportSchema . '_address';
  107. $hosts = array();
  108. foreach ($nodeInfo['nodes'] as $node) {
  109. if (isset($node['http']) === true && isset($node['http']['publish_address']) === true) {
  110. if (preg_match($pattern, $node['http']['publish_address'], $match) === 1) {
  111. $hosts[] = array(
  112. 'host' => $match[1],
  113. 'port' => (int) $match[2],
  114. );
  115. }
  116. }
  117. }
  118. return $hosts;
  119. }
  120. private function setConnectionPoolParams($connectionPoolParams)
  121. {
  122. if (isset($connectionPoolParams['sniffingInterval']) === true) {
  123. $this->sniffingInterval = $connectionPoolParams['sniffingInterval'];
  124. }
  125. }
  126. }