Transport.php 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. <?php
  2. namespace Elasticsearch;
  3. use Elasticsearch\Common\Exceptions;
  4. use Elasticsearch\ConnectionPool\AbstractConnectionPool;
  5. use Elasticsearch\Connections\Connection;
  6. use Elasticsearch\Connections\ConnectionInterface;
  7. use GuzzleHttp\Ring\Future\FutureArrayInterface;
  8. use Psr\Log\LoggerInterface;
  9. /**
  10. * Class Transport
  11. *
  12. * @category Elasticsearch
  13. * @package Elasticsearch
  14. * @author Zachary Tong <zach@elastic.co>
  15. * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2
  16. * @link http://elastic.co
  17. */
  18. class Transport
  19. {
  20. /**
  21. * @var AbstractConnectionPool
  22. */
  23. public $connectionPool;
  24. /**
  25. * @var LoggerInterface
  26. */
  27. private $log;
  28. /** @var int */
  29. public $retryAttempts = 0;
  30. /** @var Connection */
  31. public $lastConnection;
  32. /** @var int */
  33. public $retries;
  34. /**
  35. * Transport class is responsible for dispatching requests to the
  36. * underlying cluster connections
  37. *
  38. * @param $retries
  39. * @param bool $sniffOnStart
  40. * @param ConnectionPool\AbstractConnectionPool $connectionPool
  41. * @param \Psr\Log\LoggerInterface $log Monolog logger object
  42. */
  43. // @codingStandardsIgnoreStart
  44. // "Arguments with default values must be at the end of the argument list" - cannot change the interface
  45. public function __construct($retries, $sniffOnStart = false, AbstractConnectionPool $connectionPool, LoggerInterface $log)
  46. {
  47. // @codingStandardsIgnoreEnd
  48. $this->log = $log;
  49. $this->connectionPool = $connectionPool;
  50. $this->retries = $retries;
  51. if ($sniffOnStart === true) {
  52. $this->log->notice('Sniff on Start.');
  53. $this->connectionPool->scheduleCheck();
  54. }
  55. }
  56. /**
  57. * Returns a single connection from the connection pool
  58. * Potentially performs a sniffing step before returning
  59. *
  60. * @return ConnectionInterface Connection
  61. */
  62. public function getConnection()
  63. {
  64. return $this->connectionPool->nextConnection();
  65. }
  66. /**
  67. * Perform a request to the Cluster
  68. *
  69. * @param string $method HTTP method to use
  70. * @param string $uri HTTP URI to send request to
  71. * @param null $params Optional query parameters
  72. * @param null $body Optional query body
  73. * @param array $options
  74. *
  75. * @throws Common\Exceptions\NoNodesAvailableException|\Exception
  76. * @return FutureArrayInterface
  77. */
  78. public function performRequest($method, $uri, $params = null, $body = null, $options = [])
  79. {
  80. try {
  81. $connection = $this->getConnection();
  82. } catch (Exceptions\NoNodesAvailableException $exception) {
  83. $this->log->critical('No alive nodes found in cluster');
  84. throw $exception;
  85. }
  86. $response = array();
  87. $caughtException = null;
  88. $this->lastConnection = $connection;
  89. $future = $connection->performRequest(
  90. $method,
  91. $uri,
  92. $params,
  93. $body,
  94. $options,
  95. $this
  96. );
  97. $future->promise()->then(
  98. //onSuccess
  99. function ($response) {
  100. $this->retryAttempts = 0;
  101. // Note, this could be a 4xx or 5xx error
  102. },
  103. //onFailure
  104. function ($response) {
  105. // Ignore 400 level errors, as that means the server responded just fine
  106. if (!(isset($response['code']) && $response['code'] >=400 && $response['code'] < 500)) {
  107. // Otherwise schedule a check
  108. $this->connectionPool->scheduleCheck();
  109. }
  110. }
  111. );
  112. return $future;
  113. }
  114. /**
  115. * @param FutureArrayInterface $result Response of a request (promise)
  116. * @param array $options Options for transport
  117. *
  118. * @return callable|array
  119. */
  120. public function resultOrFuture($result, $options = [])
  121. {
  122. $response = null;
  123. $async = isset($options['client']['future']) ? $options['client']['future'] : null;
  124. if (is_null($async) || $async === false) {
  125. do {
  126. $result = $result->wait();
  127. } while ($result instanceof FutureArrayInterface);
  128. return $result;
  129. } elseif ($async === true || $async === 'lazy') {
  130. return $result;
  131. }
  132. }
  133. /**
  134. * @param $request
  135. *
  136. * @return bool
  137. */
  138. public function shouldRetry($request)
  139. {
  140. if ($this->retryAttempts < $this->retries) {
  141. $this->retryAttempts += 1;
  142. return true;
  143. }
  144. return false;
  145. }
  146. /**
  147. * Returns the last used connection so that it may be inspected. Mainly
  148. * for debugging/testing purposes.
  149. *
  150. * @return Connection
  151. */
  152. public function getLastConnection()
  153. {
  154. return $this->lastConnection;
  155. }
  156. }