Connection.php 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722
  1. <?php
  2. namespace Elasticsearch\Connections;
  3. use Elasticsearch\Common\Exceptions\AlreadyExpiredException;
  4. use Elasticsearch\Common\Exceptions\BadRequest400Exception;
  5. use Elasticsearch\Common\Exceptions\Conflict409Exception;
  6. use Elasticsearch\Common\Exceptions\Curl\CouldNotConnectToHost;
  7. use Elasticsearch\Common\Exceptions\Curl\CouldNotResolveHostException;
  8. use Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException;
  9. use Elasticsearch\Common\Exceptions\Forbidden403Exception;
  10. use Elasticsearch\Common\Exceptions\MaxRetriesException;
  11. use Elasticsearch\Common\Exceptions\Missing404Exception;
  12. use Elasticsearch\Common\Exceptions\NoDocumentsToGetException;
  13. use Elasticsearch\Common\Exceptions\NoShardAvailableException;
  14. use Elasticsearch\Common\Exceptions\RequestTimeout408Exception;
  15. use Elasticsearch\Common\Exceptions\RoutingMissingException;
  16. use Elasticsearch\Common\Exceptions\ScriptLangNotSupportedException;
  17. use Elasticsearch\Common\Exceptions\ServerErrorResponseException;
  18. use Elasticsearch\Common\Exceptions\TransportException;
  19. use Elasticsearch\Serializers\SerializerInterface;
  20. use Elasticsearch\Transport;
  21. use GuzzleHttp\Ring\Core;
  22. use GuzzleHttp\Ring\Exception\ConnectException;
  23. use GuzzleHttp\Ring\Exception\RingException;
  24. use Psr\Log\LoggerInterface;
  25. /**
  26. * Class AbstractConnection
  27. *
  28. * @category Elasticsearch
  29. * @package Elasticsearch\Connections
  30. * @author Zachary Tong <zach@elastic.co>
  31. * @license http://www.apache.org/licenses/LICENSE-2.0 Apache2
  32. * @link http://elastic.co
  33. */
  34. class Connection implements ConnectionInterface
  35. {
  36. /** @var callable */
  37. protected $handler;
  38. /** @var SerializerInterface */
  39. protected $serializer;
  40. /**
  41. * @var string
  42. */
  43. protected $transportSchema = 'http'; // TODO depreciate this default
  44. /**
  45. * @var string
  46. */
  47. protected $host;
  48. /**
  49. * @var string || null
  50. */
  51. protected $path;
  52. /**
  53. * @var LoggerInterface
  54. */
  55. protected $log;
  56. /**
  57. * @var LoggerInterface
  58. */
  59. protected $trace;
  60. /**
  61. * @var array
  62. */
  63. protected $connectionParams;
  64. /** @var array */
  65. protected $headers = [];
  66. /** @var bool */
  67. protected $isAlive = false;
  68. /** @var float */
  69. private $pingTimeout = 1; //TODO expose this
  70. /** @var int */
  71. private $lastPing = 0;
  72. /** @var int */
  73. private $failedPings = 0;
  74. private $lastRequest = array();
  75. /**
  76. * Constructor
  77. *
  78. * @param $handler
  79. * @param array $hostDetails
  80. * @param array $connectionParams Array of connection-specific parameters
  81. * @param \Elasticsearch\Serializers\SerializerInterface $serializer
  82. * @param \Psr\Log\LoggerInterface $log Logger object
  83. * @param \Psr\Log\LoggerInterface $trace
  84. */
  85. public function __construct(
  86. $handler,
  87. $hostDetails,
  88. $connectionParams,
  89. SerializerInterface $serializer,
  90. LoggerInterface $log,
  91. LoggerInterface $trace
  92. ) {
  93. if (isset($hostDetails['port']) !== true) {
  94. $hostDetails['port'] = 9200;
  95. }
  96. if (isset($hostDetails['scheme'])) {
  97. $this->transportSchema = $hostDetails['scheme'];
  98. }
  99. if (isset($hostDetails['user']) && isset($hostDetails['pass'])) {
  100. $connectionParams['client']['curl'][CURLOPT_HTTPAUTH] = CURLAUTH_BASIC;
  101. $connectionParams['client']['curl'][CURLOPT_USERPWD] = $hostDetails['user'].':'.$hostDetails['pass'];
  102. }
  103. if (isset($connectionParams['client']['headers']) === true) {
  104. $this->headers = $connectionParams['client']['headers'];
  105. unset($connectionParams['client']['headers']);
  106. }
  107. $host = $hostDetails['host'].':'.$hostDetails['port'];
  108. $path = null;
  109. if (isset($hostDetails['path']) === true) {
  110. $path = $hostDetails['path'];
  111. }
  112. $this->host = $host;
  113. $this->path = $path;
  114. $this->log = $log;
  115. $this->trace = $trace;
  116. $this->connectionParams = $connectionParams;
  117. $this->serializer = $serializer;
  118. $this->handler = $this->wrapHandler($handler, $log, $trace);
  119. }
  120. /**
  121. * @param $method
  122. * @param $uri
  123. * @param null $params
  124. * @param null $body
  125. * @param array $options
  126. * @param \Elasticsearch\Transport $transport
  127. * @return mixed
  128. */
  129. public function performRequest($method, $uri, $params = null, $body = null, $options = [], Transport $transport = null)
  130. {
  131. if (isset($body) === true) {
  132. $body = $this->serializer->serialize($body);
  133. }
  134. $request = [
  135. 'http_method' => $method,
  136. 'scheme' => $this->transportSchema,
  137. 'uri' => $this->getURI($uri, $params),
  138. 'body' => $body,
  139. 'headers' => array_merge([
  140. 'Host' => [$this->host]
  141. ], $this->headers)
  142. ];
  143. $request = array_replace_recursive($request, $this->connectionParams, $options);
  144. // RingPHP does not like if client is empty
  145. if (empty($request['client'])) {
  146. unset($request['client']);
  147. }
  148. $handler = $this->handler;
  149. $future = $handler($request, $this, $transport, $options);
  150. return $future;
  151. }
  152. /** @return string */
  153. public function getTransportSchema()
  154. {
  155. return $this->transportSchema;
  156. }
  157. /** @return array */
  158. public function getLastRequestInfo()
  159. {
  160. return $this->lastRequest;
  161. }
  162. private function wrapHandler(callable $handler, LoggerInterface $logger, LoggerInterface $tracer)
  163. {
  164. return function (array $request, Connection $connection, Transport $transport = null, $options) use ($handler, $logger, $tracer) {
  165. $this->lastRequest = [];
  166. $this->lastRequest['request'] = $request;
  167. // Send the request using the wrapped handler.
  168. $response = Core::proxy($handler($request), function ($response) use ($connection, $transport, $logger, $tracer, $request, $options) {
  169. $this->lastRequest['response'] = $response;
  170. if (isset($response['error']) === true) {
  171. if ($response['error'] instanceof ConnectException || $response['error'] instanceof RingException) {
  172. $this->log->warning("Curl exception encountered.");
  173. $exception = $this->getCurlRetryException($request, $response);
  174. $this->logRequestFail(
  175. $request['http_method'],
  176. $response['effective_url'],
  177. $request['body'],
  178. $request['headers'],
  179. $response['status'],
  180. $response['body'],
  181. $response['transfer_stats']['total_time'],
  182. $exception
  183. );
  184. $node = $connection->getHost();
  185. $this->log->warning("Marking node $node dead.");
  186. $connection->markDead();
  187. // If the transport has not been set, we are inside a Ping or Sniff,
  188. // so we don't want to retrigger retries anyway.
  189. //
  190. // TODO this could be handled better, but we are limited because connectionpools do not
  191. // have access to Transport. Architecturally, all of this needs to be refactored
  192. if (isset($transport) === true) {
  193. $transport->connectionPool->scheduleCheck();
  194. $neverRetry = isset($request['client']['never_retry']) ? $request['client']['never_retry'] : false;
  195. $shouldRetry = $transport->shouldRetry($request);
  196. $shouldRetryText = ($shouldRetry) ? 'true' : 'false';
  197. $this->log->warning("Retries left? $shouldRetryText");
  198. if ($shouldRetry && !$neverRetry) {
  199. return $transport->performRequest(
  200. $request['http_method'],
  201. $request['uri'],
  202. [],
  203. $request['body'],
  204. $options
  205. );
  206. }
  207. }
  208. $this->log->warning("Out of retries, throwing exception from $node");
  209. // Only throw if we run out of retries
  210. throw $exception;
  211. } else {
  212. // Something went seriously wrong, bail
  213. $exception = new TransportException($response['error']->getMessage());
  214. $this->logRequestFail(
  215. $request['http_method'],
  216. $response['effective_url'],
  217. $request['body'],
  218. $request['headers'],
  219. $response['status'],
  220. $response['body'],
  221. $response['transfer_stats']['total_time'],
  222. $exception
  223. );
  224. throw $exception;
  225. }
  226. } else {
  227. $connection->markAlive();
  228. if (isset($response['body']) === true) {
  229. $response['body'] = stream_get_contents($response['body']);
  230. $this->lastRequest['response']['body'] = $response['body'];
  231. }
  232. if ($response['status'] >= 400 && $response['status'] < 500) {
  233. $ignore = isset($request['client']['ignore']) ? $request['client']['ignore'] : [];
  234. $this->process4xxError($request, $response, $ignore);
  235. } elseif ($response['status'] >= 500) {
  236. $ignore = isset($request['client']['ignore']) ? $request['client']['ignore'] : [];
  237. $this->process5xxError($request, $response, $ignore);
  238. }
  239. // No error, deserialize
  240. $response['body'] = $this->serializer->deserialize($response['body'], $response['transfer_stats']);
  241. }
  242. $this->logRequestSuccess(
  243. $request['http_method'],
  244. $response['effective_url'],
  245. $request['body'],
  246. $request['headers'],
  247. $response['status'],
  248. $response['body'],
  249. $response['transfer_stats']['total_time']
  250. );
  251. return isset($request['client']['verbose']) && $request['client']['verbose'] === true ? $response : $response['body'];
  252. });
  253. return $response;
  254. };
  255. }
  256. /**
  257. * @param string $uri
  258. * @param array $params
  259. *
  260. * @return string
  261. */
  262. private function getURI($uri, $params)
  263. {
  264. if (isset($params) === true && !empty($params)) {
  265. array_walk($params, function (&$value, &$key) {
  266. if ($value === true) {
  267. $value = 'true';
  268. } elseif ($value === false) {
  269. $value = 'false';
  270. }
  271. });
  272. $uri .= '?' . http_build_query($params);
  273. }
  274. if ($this->path !== null) {
  275. $uri = $this->path . $uri;
  276. }
  277. return $uri;
  278. }
  279. /**
  280. * Log a successful request
  281. *
  282. * @param string $method
  283. * @param string $fullURI
  284. * @param string $body
  285. * @param array $headers
  286. * @param string $statusCode
  287. * @param string $response
  288. * @param string $duration
  289. *
  290. * @return void
  291. */
  292. public function logRequestSuccess($method, $fullURI, $body, $headers, $statusCode, $response, $duration)
  293. {
  294. $this->log->debug('Request Body', array($body));
  295. $this->log->info(
  296. 'Request Success:',
  297. array(
  298. 'method' => $method,
  299. 'uri' => $fullURI,
  300. 'headers' => $headers,
  301. 'HTTP code' => $statusCode,
  302. 'duration' => $duration,
  303. )
  304. );
  305. $this->log->debug('Response', array($response));
  306. // Build the curl command for Trace.
  307. $curlCommand = $this->buildCurlCommand($method, $fullURI, $body);
  308. $this->trace->info($curlCommand);
  309. $this->trace->debug(
  310. 'Response:',
  311. array(
  312. 'response' => $response,
  313. 'method' => $method,
  314. 'uri' => $fullURI,
  315. 'HTTP code' => $statusCode,
  316. 'duration' => $duration,
  317. )
  318. );
  319. }
  320. /**
  321. * Log a a failed request
  322. *
  323. * @param string $method
  324. * @param string $fullURI
  325. * @param string $body
  326. * @param array $headers
  327. * @param null|string $statusCode
  328. * @param null|string $response
  329. * @param string $duration
  330. * @param \Exception|null $exception
  331. *
  332. * @return void
  333. */
  334. public function logRequestFail($method, $fullURI, $body, $headers, $statusCode, $response, $duration, \Exception $exception)
  335. {
  336. $this->log->debug('Request Body', array($body));
  337. $this->log->warning(
  338. 'Request Failure:',
  339. array(
  340. 'method' => $method,
  341. 'uri' => $fullURI,
  342. 'headers' => $headers,
  343. 'HTTP code' => $statusCode,
  344. 'duration' => $duration,
  345. 'error' => $exception->getMessage(),
  346. )
  347. );
  348. $this->log->warning('Response', array($response));
  349. // Build the curl command for Trace.
  350. $curlCommand = $this->buildCurlCommand($method, $fullURI, $body);
  351. $this->trace->info($curlCommand);
  352. $this->trace->debug(
  353. 'Response:',
  354. array(
  355. 'response' => $response,
  356. 'method' => $method,
  357. 'uri' => $fullURI,
  358. 'HTTP code' => $statusCode,
  359. 'duration' => $duration,
  360. )
  361. );
  362. }
  363. /**
  364. * @return bool
  365. */
  366. public function ping()
  367. {
  368. $options = [
  369. 'client' => [
  370. 'timeout' => $this->pingTimeout,
  371. 'never_retry' => true,
  372. 'verbose' => true
  373. ]
  374. ];
  375. try {
  376. $response = $this->performRequest('HEAD', '/', null, null, $options);
  377. $response = $response->wait();
  378. } catch (TransportException $exception) {
  379. $this->markDead();
  380. return false;
  381. }
  382. if ($response['status'] === 200) {
  383. $this->markAlive();
  384. return true;
  385. } else {
  386. $this->markDead();
  387. return false;
  388. }
  389. }
  390. /**
  391. * @return array
  392. */
  393. public function sniff()
  394. {
  395. $options = [
  396. 'client' => [
  397. 'timeout' => $this->pingTimeout,
  398. 'never_retry' => true
  399. ]
  400. ];
  401. return $this->performRequest('GET', '/_nodes/', null, null, $options);
  402. }
  403. /**
  404. * @return bool
  405. */
  406. public function isAlive()
  407. {
  408. return $this->isAlive;
  409. }
  410. public function markAlive()
  411. {
  412. $this->failedPings = 0;
  413. $this->isAlive = true;
  414. $this->lastPing = time();
  415. }
  416. public function markDead()
  417. {
  418. $this->isAlive = false;
  419. $this->failedPings += 1;
  420. $this->lastPing = time();
  421. }
  422. /**
  423. * @return int
  424. */
  425. public function getLastPing()
  426. {
  427. return $this->lastPing;
  428. }
  429. /**
  430. * @return int
  431. */
  432. public function getPingFailures()
  433. {
  434. return $this->failedPings;
  435. }
  436. /**
  437. * @return string
  438. */
  439. public function getHost()
  440. {
  441. return $this->host;
  442. }
  443. /**
  444. * @return null|string
  445. */
  446. public function getUserPass()
  447. {
  448. if (isset($this->connectionParams['client']['curl'][CURLOPT_USERPWD]) === true) {
  449. return $this->connectionParams['client']['curl'][CURLOPT_USERPWD];
  450. }
  451. return null;
  452. }
  453. /**
  454. * @return null|string
  455. */
  456. public function getPath()
  457. {
  458. return $this->path;
  459. }
  460. /**
  461. * @param $request
  462. * @param $response
  463. * @return \Elasticsearch\Common\Exceptions\Curl\CouldNotConnectToHost|\Elasticsearch\Common\Exceptions\Curl\CouldNotResolveHostException|\Elasticsearch\Common\Exceptions\Curl\OperationTimeoutException|\Elasticsearch\Common\Exceptions\MaxRetriesException
  464. */
  465. protected function getCurlRetryException($request, $response)
  466. {
  467. $exception = null;
  468. $message = $response['error']->getMessage();
  469. $exception = new MaxRetriesException($message);
  470. switch ($response['curl']['errno']) {
  471. case 6:
  472. $exception = new CouldNotResolveHostException($message, null, $exception);
  473. break;
  474. case 7:
  475. $exception = new CouldNotConnectToHost($message, null, $exception);
  476. break;
  477. case 28:
  478. $exception = new OperationTimeoutException($message, null, $exception);
  479. break;
  480. }
  481. return $exception;
  482. }
  483. /**
  484. * Construct a string cURL command
  485. *
  486. * @param string $method HTTP method
  487. * @param string $uri Full URI of request
  488. * @param string $body Request body
  489. *
  490. * @return string
  491. */
  492. private function buildCurlCommand($method, $uri, $body)
  493. {
  494. if (strpos($uri, '?') === false) {
  495. $uri .= '?pretty=true';
  496. } else {
  497. str_replace('?', '?pretty=true', $uri);
  498. }
  499. $curlCommand = 'curl -X' . strtoupper($method);
  500. $curlCommand .= " '" . $uri . "'";
  501. if (isset($body) === true && $body !== '') {
  502. $curlCommand .= " -d '" . $body . "'";
  503. }
  504. return $curlCommand;
  505. }
  506. /**
  507. * @param $request
  508. * @param $response
  509. * @param $ignore
  510. * @throws \Elasticsearch\Common\Exceptions\AlreadyExpiredException|\Elasticsearch\Common\Exceptions\BadRequest400Exception|\Elasticsearch\Common\Exceptions\Conflict409Exception|\Elasticsearch\Common\Exceptions\Forbidden403Exception|\Elasticsearch\Common\Exceptions\Missing404Exception|\Elasticsearch\Common\Exceptions\ScriptLangNotSupportedException|null
  511. */
  512. private function process4xxError($request, $response, $ignore)
  513. {
  514. $statusCode = $response['status'];
  515. $responseBody = $response['body'];
  516. /** @var \Exception $exception */
  517. $exception = $this->tryDeserialize400Error($response);
  518. if (array_search($response['status'], $ignore) !== false) {
  519. return;
  520. }
  521. if ($statusCode === 400 && strpos($responseBody, "AlreadyExpiredException") !== false) {
  522. $exception = new AlreadyExpiredException($responseBody, $statusCode);
  523. } elseif ($statusCode === 403) {
  524. $exception = new Forbidden403Exception($responseBody, $statusCode);
  525. } elseif ($statusCode === 404) {
  526. $exception = new Missing404Exception($responseBody, $statusCode);
  527. } elseif ($statusCode === 409) {
  528. $exception = new Conflict409Exception($responseBody, $statusCode);
  529. } elseif ($statusCode === 400 && strpos($responseBody, 'script_lang not supported') !== false) {
  530. $exception = new ScriptLangNotSupportedException($responseBody. $statusCode);
  531. } elseif ($statusCode === 408) {
  532. $exception = new RequestTimeout408Exception($responseBody, $statusCode);
  533. } else {
  534. $exception = new BadRequest400Exception($responseBody, $statusCode);
  535. }
  536. $this->logRequestFail(
  537. $request['http_method'],
  538. $response['effective_url'],
  539. $request['body'],
  540. $request['headers'],
  541. $response['status'],
  542. $response['body'],
  543. $response['transfer_stats']['total_time'],
  544. $exception
  545. );
  546. throw $exception;
  547. }
  548. /**
  549. * @param $request
  550. * @param $response
  551. * @param $ignore
  552. * @throws \Elasticsearch\Common\Exceptions\NoDocumentsToGetException|\Elasticsearch\Common\Exceptions\NoShardAvailableException|\Elasticsearch\Common\Exceptions\RoutingMissingException|\Elasticsearch\Common\Exceptions\ServerErrorResponseException
  553. */
  554. private function process5xxError($request, $response, $ignore)
  555. {
  556. $statusCode = $response['status'];
  557. $responseBody = $response['body'];
  558. /** @var \Exception $exception */
  559. $exception = $this->tryDeserialize500Error($response);
  560. $exceptionText = "[$statusCode Server Exception] ".$exception->getMessage();
  561. $this->log->error($exceptionText);
  562. $this->log->error($exception->getTraceAsString());
  563. if (array_search($statusCode, $ignore) !== false) {
  564. return;
  565. }
  566. if ($statusCode === 500 && strpos($responseBody, "RoutingMissingException") !== false) {
  567. $exception = new RoutingMissingException($exception->getMessage(), $statusCode, $exception);
  568. } elseif ($statusCode === 500 && preg_match('/ActionRequestValidationException.+ no documents to get/', $responseBody) === 1) {
  569. $exception = new NoDocumentsToGetException($exception->getMessage(), $statusCode, $exception);
  570. } elseif ($statusCode === 500 && strpos($responseBody, 'NoShardAvailableActionException') !== false) {
  571. $exception = new NoShardAvailableException($exception->getMessage(), $statusCode, $exception);
  572. } else {
  573. $exception = new ServerErrorResponseException($responseBody, $statusCode);
  574. }
  575. $this->logRequestFail(
  576. $request['http_method'],
  577. $response['effective_url'],
  578. $request['body'],
  579. $request['headers'],
  580. $response['status'],
  581. $response['body'],
  582. $response['transfer_stats']['total_time'],
  583. $exception
  584. );
  585. throw $exception;
  586. }
  587. private function tryDeserialize400Error($response)
  588. {
  589. return $this->tryDeserializeError($response, 'Elasticsearch\Common\Exceptions\BadRequest400Exception');
  590. }
  591. private function tryDeserialize500Error($response)
  592. {
  593. return $this->tryDeserializeError($response, 'Elasticsearch\Common\Exceptions\ServerErrorResponseException');
  594. }
  595. private function tryDeserializeError($response, $errorClass)
  596. {
  597. $error = $this->serializer->deserialize($response['body'], $response['transfer_stats']);
  598. if (is_array($error) === true) {
  599. // 2.0 structured exceptions
  600. if (isset($error['error']['reason']) === true) {
  601. // Try to use root cause first (only grabs the first root cause)
  602. $root = $error['error']['root_cause'];
  603. if (isset($root) && isset($root[0])) {
  604. $cause = $root[0]['reason'];
  605. $type = $root[0]['type'];
  606. } else {
  607. $cause = $error['error']['reason'];
  608. $type = $error['error']['type'];
  609. }
  610. $original = new $errorClass($response['body'], $response['status']);
  611. return new $errorClass("$type: $cause", $response['status'], $original);
  612. } elseif (isset($error['error']) === true) {
  613. // <2.0 semi-structured exceptions
  614. $original = new $errorClass($response['body'], $response['status']);
  615. return new $errorClass($error['error'], $response['status'], $original);
  616. }
  617. // <2.0 "i just blew up" nonstructured exception
  618. // $error is an array but we don't know the format, reuse the response body instead
  619. return new $errorClass($response['body'], $response['status']);
  620. }
  621. // <2.0 "i just blew up" nonstructured exception
  622. return new $errorClass($response['body']);
  623. }
  624. }