StaticConnectionPool.php 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. <?php
  2. namespace Elasticsearch\ConnectionPool;
  3. use Elasticsearch\Common\Exceptions\NoNodesAvailableException;
  4. use Elasticsearch\ConnectionPool\Selectors\SelectorInterface;
  5. use Elasticsearch\Connections\Connection;
  6. use Elasticsearch\Connections\ConnectionFactoryInterface;
  7. class StaticConnectionPool extends AbstractConnectionPool implements ConnectionPoolInterface
  8. {
  9. /**
  10. * @var int
  11. */
  12. private $pingTimeout = 60;
  13. /**
  14. * @var int
  15. */
  16. private $maxPingTimeout = 3600;
  17. /**
  18. * {@inheritdoc}
  19. */
  20. public function __construct($connections, SelectorInterface $selector, ConnectionFactoryInterface $factory, $connectionPoolParams)
  21. {
  22. parent::__construct($connections, $selector, $factory, $connectionPoolParams);
  23. $this->scheduleCheck();
  24. }
  25. /**
  26. * @param bool $force
  27. *
  28. * @return Connection
  29. * @throws \Elasticsearch\Common\Exceptions\NoNodesAvailableException
  30. */
  31. public function nextConnection($force = false)
  32. {
  33. $skipped = array();
  34. $total = count($this->connections);
  35. while ($total--) {
  36. /** @var Connection $connection */
  37. $connection = $this->selector->select($this->connections);
  38. if ($connection->isAlive() === true) {
  39. return $connection;
  40. }
  41. if ($this->readyToRevive($connection) === true) {
  42. if ($connection->ping() === true) {
  43. return $connection;
  44. }
  45. } else {
  46. $skipped[] = $connection;
  47. }
  48. }
  49. // All "alive" nodes failed, force pings on "dead" nodes
  50. foreach ($skipped as $connection) {
  51. if ($connection->ping() === true) {
  52. return $connection;
  53. }
  54. }
  55. throw new NoNodesAvailableException("No alive nodes found in your cluster");
  56. }
  57. public function scheduleCheck()
  58. {
  59. foreach ($this->connections as $connection) {
  60. $connection->markDead();
  61. }
  62. }
  63. /**
  64. * @param Connection $connection
  65. *
  66. * @return bool
  67. */
  68. private function readyToRevive(Connection $connection)
  69. {
  70. $timeout = min(
  71. $this->pingTimeout * pow(2, $connection->getPingFailures()),
  72. $this->maxPingTimeout
  73. );
  74. if ($connection->getLastPing() + $timeout < time()) {
  75. return true;
  76. } else {
  77. return false;
  78. }
  79. }
  80. }