*/ class Client { /** * Config with defaults. * * log: Set to true, to enable logging, set a string to log to a specific file * retryOnConflict: Use in \Elastica\Client::updateDocument * bigintConversion: Set to true to enable the JSON bigint to string conversion option (see issue #717) * * @var array */ protected $_config = [ 'host' => null, 'port' => null, 'path' => null, 'url' => null, 'proxy' => null, 'transport' => null, 'persistent' => true, 'timeout' => null, 'connections' => [], // host, port, path, timeout, transport, compression, persistent, timeout, username, password, config -> (curl, headers, url) 'roundRobin' => false, 'log' => false, 'retryOnConflict' => 0, 'bigintConversion' => false, 'username' => null, 'password' => null, ]; /** * @var callback */ protected $_callback; /** * @var Connection\ConnectionPool */ protected $_connectionPool; /** * @var \Elastica\Request|null */ protected $_lastRequest; /** * @var \Elastica\Response|null */ protected $_lastResponse; /** * @var LoggerInterface */ protected $_logger; /** * @var string */ protected $_version; /** * Creates a new Elastica client. * * @param array $config OPTIONAL Additional config options * @param callback $callback OPTIONAL Callback function which can be used to be notified about errors (for example connection down) * @param LoggerInterface $logger */ public function __construct(array $config = [], $callback = null, LoggerInterface $logger = null) { $this->_callback = $callback; if (!$logger && isset($config['log']) && $config['log']) { $logger = new Log($config['log']); } $this->_logger = $logger ?: new NullLogger(); $this->setConfig($config); $this->_initConnections(); } /** * Get current version. * * @return string */ public function getVersion() { if ($this->_version) { return $this->_version; } $data = $this->request('/')->getData(); return $this->_version = $data['version']['number']; } /** * Inits the client connections. */ protected function _initConnections() { $connections = []; foreach ($this->getConfig('connections') as $connection) { $connections[] = Connection::create($this->_prepareConnectionParams($connection)); } if (isset($this->_config['servers'])) { foreach ($this->getConfig('servers') as $server) { $connections[] = Connection::create($this->_prepareConnectionParams($server)); } } // If no connections set, create default connection if (empty($connections)) { $connections[] = Connection::create($this->_prepareConnectionParams($this->getConfig())); } if (!isset($this->_config['connectionStrategy'])) { if (true === $this->getConfig('roundRobin')) { $this->setConfigValue('connectionStrategy', 'RoundRobin'); } else { $this->setConfigValue('connectionStrategy', 'Simple'); } } $strategy = Connection\Strategy\StrategyFactory::create($this->getConfig('connectionStrategy')); $this->_connectionPool = new Connection\ConnectionPool($connections, $strategy, $this->_callback); } /** * Creates a Connection params array from a Client or server config array. * * @param array $config * * @return array */ protected function _prepareConnectionParams(array $config) { $params = []; $params['config'] = []; foreach ($config as $key => $value) { if (in_array($key, ['bigintConversion', 'curl', 'headers', 'url'])) { $params['config'][$key] = $value; } else { $params[$key] = $value; } } return $params; } /** * Sets specific config values (updates and keeps default values). * * @param array $config Params * * @return $this */ public function setConfig(array $config) { foreach ($config as $key => $value) { $this->_config[$key] = $value; } return $this; } /** * Returns a specific config key or the whole * config array if not set. * * @param string $key Config key * * @throws \Elastica\Exception\InvalidException * * @return array|string Config value */ public function getConfig($key = '') { if (empty($key)) { return $this->_config; } if (!array_key_exists($key, $this->_config)) { throw new InvalidException('Config key is not set: '.$key); } return $this->_config[$key]; } /** * Sets / overwrites a specific config value. * * @param string $key Key to set * @param mixed $value Value * * @return $this */ public function setConfigValue($key, $value) { return $this->setConfig([$key => $value]); } /** * @param array|string $keys config key or path of config keys * @param mixed $default default value will be returned if key was not found * * @return mixed */ public function getConfigValue($keys, $default = null) { $value = $this->_config; foreach ((array) $keys as $key) { if (isset($value[$key])) { $value = $value[$key]; } else { return $default; } } return $value; } /** * Returns the index for the given connection. * * @param string $name Index name to create connection to * * @return \Elastica\Index Index for the given name */ public function getIndex($name) { return new Index($this, $name); } /** * Adds a HTTP Header. * * @param string $header The HTTP Header * @param string $headerValue The HTTP Header Value * * @throws \Elastica\Exception\InvalidException If $header or $headerValue is not a string * * @return $this */ public function addHeader($header, $headerValue) { if (is_string($header) && is_string($headerValue)) { $this->_config['headers'][$header] = $headerValue; } else { throw new InvalidException('Header must be a string'); } return $this; } /** * Remove a HTTP Header. * * @param string $header The HTTP Header to remove * * @throws \Elastica\Exception\InvalidException If $header is not a string * * @return $this */ public function removeHeader($header) { if (is_string($header)) { if (array_key_exists($header, $this->_config['headers'])) { unset($this->_config['headers'][$header]); } } else { throw new InvalidException('Header must be a string'); } return $this; } /** * Uses _bulk to send documents to the server. * * Array of \Elastica\Document as input. Index and type has to be * set inside the document, because for bulk settings documents, * documents can belong to any type and index * * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html * * @param array|\Elastica\Document[] $docs Array of Elastica\Document * @param array $requestParams * * @throws \Elastica\Exception\InvalidException If docs is empty * * @return \Elastica\Bulk\ResponseSet Response object */ public function updateDocuments(array $docs, array $requestParams = []) { if (empty($docs)) { throw new InvalidException('Array has to consist of at least one element'); } $bulk = new Bulk($this); $bulk->addDocuments($docs, Action::OP_TYPE_UPDATE); foreach ($requestParams as $key => $value) { $bulk->setRequestParam($key, $value); } return $bulk->send(); } /** * Uses _bulk to send documents to the server. * * Array of \Elastica\Document as input. Index and type has to be * set inside the document, because for bulk settings documents, * documents can belong to any type and index * * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html * * @param array|\Elastica\Document[] $docs Array of Elastica\Document * @param array $requestParams * * @throws \Elastica\Exception\InvalidException If docs is empty * * @return \Elastica\Bulk\ResponseSet Response object */ public function addDocuments(array $docs, array $requestParams = []) { if (empty($docs)) { throw new InvalidException('Array has to consist of at least one element'); } $bulk = new Bulk($this); $bulk->addDocuments($docs); foreach ($requestParams as $key => $value) { $bulk->setRequestParam($key, $value); } return $bulk->send(); } /** * Update document, using update script. Requires elasticsearch >= 0.19.0. * * @param int|string $id document id * @param array|\Elastica\Script\AbstractScript|\Elastica\Document $data raw data for request body * @param string $index index to update * @param string $type type of index to update * @param array $options array of query params to use for query. For possible options check es api * * @return \Elastica\Response * * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html */ public function updateDocument($id, $data, $index, $type, array $options = []) { $endpoint = new Update(); $endpoint->setID($id); $endpoint->setIndex($index); $endpoint->setType($type); if ($data instanceof AbstractScript) { $requestData = $data->toArray(); } elseif ($data instanceof Document) { $requestData = ['doc' => $data->getData()]; if ($data->getDocAsUpsert()) { $requestData['doc_as_upsert'] = true; } $docOptions = $data->getOptions( [ 'version', 'version_type', 'routing', 'percolate', 'parent', 'fields', 'retry_on_conflict', 'consistency', 'replication', 'refresh', 'timeout', ] ); $options += $docOptions; // set fields param to source only if options was not set before if ($data instanceof Document && ($data->isAutoPopulate() || $this->getConfigValue(['document', 'autoPopulate'], false)) && !isset($options['fields']) ) { $options['fields'] = '_source'; } } else { $requestData = $data; } //If an upsert document exists if ($data instanceof AbstractScript || $data instanceof Document) { if ($data->hasUpsert()) { $requestData['upsert'] = $data->getUpsert()->getData(); } } if (!isset($options['retry_on_conflict'])) { if ($retryOnConflict = $this->getConfig('retryOnConflict')) { $options['retry_on_conflict'] = $retryOnConflict; } } $endpoint->setBody($requestData); $endpoint->setParams($options); $response = $this->requestEndpoint($endpoint); if ($response->isOk() && $data instanceof Document && ($data->isAutoPopulate() || $this->getConfigValue(['document', 'autoPopulate'], false)) ) { $responseData = $response->getData(); if (isset($responseData['_version'])) { $data->setVersion($responseData['_version']); } if (isset($options['fields'])) { $this->_populateDocumentFieldsFromResponse($response, $data, $options['fields']); } } return $response; } /** * @param \Elastica\Response $response * @param \Elastica\Document $document * @param string $fields Array of field names to be populated or '_source' if whole document data should be updated */ protected function _populateDocumentFieldsFromResponse(Response $response, Document $document, $fields) { $responseData = $response->getData(); if ('_source' == $fields) { if (isset($responseData['get']['_source']) && is_array($responseData['get']['_source'])) { $document->setData($responseData['get']['_source']); } } else { $keys = explode(',', $fields); $data = $document->getData(); foreach ($keys as $key) { if (isset($responseData['get']['fields'][$key])) { $data[$key] = $responseData['get']['fields'][$key]; } elseif (isset($data[$key])) { unset($data[$key]); } } $document->setData($data); } } /** * Bulk deletes documents. * * @param array|\Elastica\Document[] $docs * @param array $requestParams * * @throws \Elastica\Exception\InvalidException * * @return \Elastica\Bulk\ResponseSet */ public function deleteDocuments(array $docs, array $requestParams = []) { if (empty($docs)) { throw new InvalidException('Array has to consist of at least one element'); } $bulk = new Bulk($this); $bulk->addDocuments($docs, Action::OP_TYPE_DELETE); foreach ($requestParams as $key => $value) { $bulk->setRequestParam($key, $value); } return $bulk->send(); } /** * Returns the status object for all indices. * * @return \Elastica\Status Status object */ public function getStatus() { return new Status($this); } /** * Returns the current cluster. * * @return \Elastica\Cluster Cluster object */ public function getCluster() { return new Cluster($this); } /** * Establishes the client connections. */ public function connect() { return $this->_initConnections(); } /** * @param \Elastica\Connection $connection * * @return $this */ public function addConnection(Connection $connection) { $this->_connectionPool->addConnection($connection); return $this; } /** * Determines whether a valid connection is available for use. * * @return bool */ public function hasConnection() { return $this->_connectionPool->hasConnection(); } /** * @throws \Elastica\Exception\ClientException * * @return \Elastica\Connection */ public function getConnection() { return $this->_connectionPool->getConnection(); } /** * @return \Elastica\Connection[] */ public function getConnections() { return $this->_connectionPool->getConnections(); } /** * @return \Elastica\Connection\Strategy\StrategyInterface */ public function getConnectionStrategy() { return $this->_connectionPool->getStrategy(); } /** * @param array|\Elastica\Connection[] $connections * * @return $this */ public function setConnections(array $connections) { $this->_connectionPool->setConnections($connections); return $this; } /** * Deletes documents with the given ids, index, type from the index. * * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html * * @param array $ids Document ids * @param string|\Elastica\Index $index Index name * @param string|\Elastica\Type $type Type of documents * @param string|bool $routing Optional routing key for all ids * * @throws \Elastica\Exception\InvalidException * * @return \Elastica\Bulk\ResponseSet Response object */ public function deleteIds(array $ids, $index, $type, $routing = false) { if (empty($ids)) { throw new InvalidException('Array has to consist of at least one id'); } $bulk = new Bulk($this); $bulk->setIndex($index); $bulk->setType($type); foreach ($ids as $id) { $action = new Action(Action::OP_TYPE_DELETE); $action->setId($id); if (!empty($routing)) { $action->setRouting($routing); } $bulk->addAction($action); } return $bulk->send(); } /** * Bulk operation. * * Every entry in the params array has to exactly on array * of the bulk operation. An example param array would be: * * array( * array('index' => array('_index' => 'test', '_type' => 'user', '_id' => '1')), * array('user' => array('name' => 'hans')), * array('delete' => array('_index' => 'test', '_type' => 'user', '_id' => '2')) * ); * * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html * * @param array $params Parameter array * * @throws \Elastica\Exception\ResponseException * @throws \Elastica\Exception\InvalidException * * @return \Elastica\Bulk\ResponseSet Response object */ public function bulk(array $params) { if (empty($params)) { throw new InvalidException('Array has to consist of at least one param'); } $bulk = new Bulk($this); $bulk->addRawData($params); return $bulk->send(); } /** * Makes calls to the elasticsearch server based on this index. * * It's possible to make any REST query directly over this method * * @param string $path Path to call * @param string $method Rest method to use (GET, POST, DELETE, PUT) * @param array|string $data OPTIONAL Arguments as array or pre-encoded string * @param array $query OPTIONAL Query params * @param string $contentType Content-Type sent with this request * * @throws Exception\ConnectionException|Exception\ClientException * * @return Response Response object */ public function request($path, $method = Request::GET, $data = [], array $query = [], $contentType = Request::DEFAULT_CONTENT_TYPE) { $connection = $this->getConnection(); $request = $this->_lastRequest = new Request($path, $method, $data, $query, $connection, $contentType); $this->_lastResponse = null; try { $response = $this->_lastResponse = $request->send(); } catch (ConnectionException $e) { $this->_connectionPool->onFail($connection, $e, $this); $this->_log($e); // In case there is no valid connection left, throw exception which caused the disabling of the connection. if (!$this->hasConnection()) { throw $e; } return $this->request($path, $method, $data, $query); } $this->_log($request); return $response; } /** * Makes calls to the elasticsearch server with usage official client Endpoint. * * @param AbstractEndpoint $endpoint * * @return Response */ public function requestEndpoint(AbstractEndpoint $endpoint) { return $this->request( ltrim($endpoint->getURI(), '/'), $endpoint->getMethod(), null === $endpoint->getBody() ? [] : $endpoint->getBody(), $endpoint->getParams() ); } /** * logging. * * @deprecated Overwriting Client->_log is deprecated. Handle logging functionality by using a custom LoggerInterface. * * @param mixed $context */ protected function _log($context) { if ($context instanceof ConnectionException) { $this->_logger->error('Elastica Request Failure', [ 'exception' => $context, 'request' => $context->getRequest()->toArray(), 'retry' => $this->hasConnection(), ]); return; } if ($context instanceof Request) { $this->_logger->debug('Elastica Request', [ 'request' => $context->toArray(), 'response' => $this->_lastResponse ? $this->_lastResponse->getData() : null, 'responseStatus' => $this->_lastResponse ? $this->_lastResponse->getStatus() : null, ]); return; } $this->_logger->debug('Elastica Request', [ 'message' => $context, ]); } /** * Optimizes all search indices. * * @param array $args OPTIONAL Optional arguments * * @return \Elastica\Response Response object * * @deprecated Replaced by forcemergeAll * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-optimize.html */ public function optimizeAll($args = []) { trigger_error('Deprecated: Elastica\Client::optimizeAll() is deprecated and will be removed in further Elastica releases. Use Elastica\Client::forcemergeAll() instead.', E_USER_DEPRECATED); return $this->forcemergeAll($args); } /** * Force merges all search indices. * * @param array $args OPTIONAL Optional arguments * * @return \Elastica\Response Response object * * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-forcemerge.html */ public function forcemergeAll($args = []) { $endpoint = new ForceMerge(); $endpoint->setParams($args); return $this->requestEndpoint($endpoint); } /** * Refreshes all search indices. * * @return \Elastica\Response Response object * * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-refresh.html */ public function refreshAll() { return $this->requestEndpoint(new Refresh()); } /** * @return Request|null */ public function getLastRequest() { return $this->_lastRequest; } /** * @return Response|null */ public function getLastResponse() { return $this->_lastResponse; } /** * Replace the existing logger. * * @param LoggerInterface $logger * * @return $this */ public function setLogger(LoggerInterface $logger) { $this->_logger = $logger; return $this; } }