Bulk.php 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. <?php
  2. namespace Elastica;
  3. use Elastica\Bulk\Action;
  4. use Elastica\Bulk\Action\AbstractDocument as AbstractDocumentAction;
  5. use Elastica\Bulk\Response as BulkResponse;
  6. use Elastica\Bulk\ResponseSet;
  7. use Elastica\Exception\Bulk\ResponseException as BulkResponseException;
  8. use Elastica\Exception\InvalidException;
  9. use Elastica\Script\AbstractScript;
  10. class Bulk
  11. {
  12. const DELIMITER = "\n";
  13. /**
  14. * @var \Elastica\Client
  15. */
  16. protected $_client;
  17. /**
  18. * @var \Elastica\Bulk\Action[]
  19. */
  20. protected $_actions = [];
  21. /**
  22. * @var string|null
  23. */
  24. protected $_index;
  25. /**
  26. * @var string|null
  27. */
  28. protected $_type;
  29. /**
  30. * @var array request parameters to the bulk api
  31. */
  32. protected $_requestParams = [];
  33. /**
  34. * @param \Elastica\Client $client
  35. */
  36. public function __construct(Client $client)
  37. {
  38. $this->_client = $client;
  39. }
  40. /**
  41. * @param string|\Elastica\Index $index
  42. *
  43. * @return $this
  44. */
  45. public function setIndex($index)
  46. {
  47. if ($index instanceof Index) {
  48. $index = $index->getName();
  49. }
  50. $this->_index = (string) $index;
  51. return $this;
  52. }
  53. /**
  54. * @return string|null
  55. */
  56. public function getIndex()
  57. {
  58. return $this->_index;
  59. }
  60. /**
  61. * @return bool
  62. */
  63. public function hasIndex()
  64. {
  65. return null !== $this->getIndex() && '' !== $this->getIndex();
  66. }
  67. /**
  68. * @param string|\Elastica\Type $type
  69. *
  70. * @return $this
  71. */
  72. public function setType($type)
  73. {
  74. if ($type instanceof Type) {
  75. $this->setIndex($type->getIndex()->getName());
  76. $type = $type->getName();
  77. }
  78. $this->_type = (string) $type;
  79. return $this;
  80. }
  81. /**
  82. * @return string|null
  83. */
  84. public function getType()
  85. {
  86. return $this->_type;
  87. }
  88. /**
  89. * @return bool
  90. */
  91. public function hasType()
  92. {
  93. return null !== $this->getType() && '' !== $this->getType();
  94. }
  95. /**
  96. * @return string
  97. */
  98. public function getPath()
  99. {
  100. $path = '';
  101. if ($this->hasIndex()) {
  102. $path .= $this->getIndex().'/';
  103. if ($this->hasType()) {
  104. $path .= $this->getType().'/';
  105. }
  106. }
  107. $path .= '_bulk';
  108. return $path;
  109. }
  110. /**
  111. * @param \Elastica\Bulk\Action $action
  112. *
  113. * @return $this
  114. */
  115. public function addAction(Action $action)
  116. {
  117. $this->_actions[] = $action;
  118. return $this;
  119. }
  120. /**
  121. * @param \Elastica\Bulk\Action[] $actions
  122. *
  123. * @return $this
  124. */
  125. public function addActions(array $actions)
  126. {
  127. foreach ($actions as $action) {
  128. $this->addAction($action);
  129. }
  130. return $this;
  131. }
  132. /**
  133. * @return \Elastica\Bulk\Action[]
  134. */
  135. public function getActions()
  136. {
  137. return $this->_actions;
  138. }
  139. /**
  140. * @param \Elastica\Document $document
  141. * @param string $opType
  142. *
  143. * @return $this
  144. */
  145. public function addDocument(Document $document, $opType = null)
  146. {
  147. $action = AbstractDocumentAction::create($document, $opType);
  148. return $this->addAction($action);
  149. }
  150. /**
  151. * @param \Elastica\Document[] $documents
  152. * @param string $opType
  153. *
  154. * @return $this
  155. */
  156. public function addDocuments(array $documents, $opType = null)
  157. {
  158. foreach ($documents as $document) {
  159. $this->addDocument($document, $opType);
  160. }
  161. return $this;
  162. }
  163. /**
  164. * @param \Elastica\Script\AbstractScript $script
  165. * @param string $opType
  166. *
  167. * @return $this
  168. */
  169. public function addScript(AbstractScript $script, $opType = null)
  170. {
  171. $action = AbstractDocumentAction::create($script, $opType);
  172. return $this->addAction($action);
  173. }
  174. /**
  175. * @param \Elastica\Document[] $scripts
  176. * @param string $opType
  177. *
  178. * @return $this
  179. */
  180. public function addScripts(array $scripts, $opType = null)
  181. {
  182. foreach ($scripts as $document) {
  183. $this->addScript($document, $opType);
  184. }
  185. return $this;
  186. }
  187. /**
  188. * @param \Elastica\Script\AbstractScript|\Elastica\Document|array $data
  189. * @param string $opType
  190. *
  191. * @return $this
  192. */
  193. public function addData($data, $opType = null)
  194. {
  195. if (!is_array($data)) {
  196. $data = [$data];
  197. }
  198. foreach ($data as $actionData) {
  199. if ($actionData instanceof AbstractScript) {
  200. $this->addScript($actionData, $opType);
  201. } elseif ($actionData instanceof Document) {
  202. $this->addDocument($actionData, $opType);
  203. } else {
  204. throw new \InvalidArgumentException('Data should be a Document, a Script or an array containing Documents and/or Scripts');
  205. }
  206. }
  207. return $this;
  208. }
  209. /**
  210. * @param array $data
  211. *
  212. * @throws \Elastica\Exception\InvalidException
  213. *
  214. * @return $this
  215. */
  216. public function addRawData(array $data)
  217. {
  218. foreach ($data as $row) {
  219. if (is_array($row)) {
  220. $opType = key($row);
  221. $metadata = reset($row);
  222. if (Action::isValidOpType($opType)) {
  223. // add previous action
  224. if (isset($action)) {
  225. $this->addAction($action);
  226. }
  227. $action = new Action($opType, $metadata);
  228. } elseif (isset($action)) {
  229. $action->setSource($row);
  230. $this->addAction($action);
  231. $action = null;
  232. } else {
  233. throw new InvalidException('Invalid bulk data, source must follow action metadata');
  234. }
  235. } else {
  236. throw new InvalidException('Invalid bulk data, should be array of array, Document or Bulk/Action');
  237. }
  238. }
  239. // add last action if available
  240. if (isset($action)) {
  241. $this->addAction($action);
  242. }
  243. return $this;
  244. }
  245. /**
  246. * Set a url parameter on the request bulk request.
  247. *
  248. * @param string $name name of the parameter
  249. * @param string $value value of the parameter
  250. *
  251. * @return $this
  252. */
  253. public function setRequestParam($name, $value)
  254. {
  255. $this->_requestParams[$name] = $value;
  256. return $this;
  257. }
  258. /**
  259. * Set the amount of time that the request will wait the shards to come on line.
  260. * Requires Elasticsearch version >= 0.90.8.
  261. *
  262. * @param string $time timeout in Elasticsearch time format
  263. *
  264. * @return $this
  265. */
  266. public function setShardTimeout($time)
  267. {
  268. return $this->setRequestParam('timeout', $time);
  269. }
  270. /**
  271. * @return string
  272. */
  273. public function __toString()
  274. {
  275. return $this->toString();
  276. }
  277. /**
  278. * @return string
  279. */
  280. public function toString()
  281. {
  282. $data = '';
  283. foreach ($this->getActions() as $action) {
  284. $data .= $action->toString();
  285. }
  286. return $data;
  287. }
  288. /**
  289. * @return array
  290. */
  291. public function toArray()
  292. {
  293. $data = [];
  294. foreach ($this->getActions() as $action) {
  295. foreach ($action->toArray() as $row) {
  296. $data[] = $row;
  297. }
  298. }
  299. return $data;
  300. }
  301. /**
  302. * @return \Elastica\Bulk\ResponseSet
  303. */
  304. public function send()
  305. {
  306. $path = $this->getPath();
  307. $data = $this->toString();
  308. $response = $this->_client->request($path, Request::POST, $data, $this->_requestParams, Request::NDJSON_CONTENT_TYPE);
  309. return $this->_processResponse($response);
  310. }
  311. /**
  312. * @param \Elastica\Response $response
  313. *
  314. * @throws \Elastica\Exception\Bulk\ResponseException
  315. * @throws \Elastica\Exception\InvalidException
  316. *
  317. * @return \Elastica\Bulk\ResponseSet
  318. */
  319. protected function _processResponse(Response $response)
  320. {
  321. $responseData = $response->getData();
  322. $actions = $this->getActions();
  323. $bulkResponses = [];
  324. if (isset($responseData['items']) && is_array($responseData['items'])) {
  325. foreach ($responseData['items'] as $key => $item) {
  326. if (!isset($actions[$key])) {
  327. throw new InvalidException('No response found for action #'.$key);
  328. }
  329. $action = $actions[$key];
  330. $opType = key($item);
  331. $bulkResponseData = reset($item);
  332. if ($action instanceof AbstractDocumentAction) {
  333. $data = $action->getData();
  334. if ($data instanceof Document && $data->isAutoPopulate()
  335. || $this->_client->getConfigValue(['document', 'autoPopulate'], false)
  336. ) {
  337. if (!$data->hasId() && isset($bulkResponseData['_id'])) {
  338. $data->setId($bulkResponseData['_id']);
  339. }
  340. if (isset($bulkResponseData['_version'])) {
  341. $data->setVersion($bulkResponseData['_version']);
  342. }
  343. }
  344. }
  345. $bulkResponses[] = new BulkResponse($bulkResponseData, $action, $opType);
  346. }
  347. }
  348. $bulkResponseSet = new ResponseSet($response, $bulkResponses);
  349. if ($bulkResponseSet->hasError()) {
  350. throw new BulkResponseException($bulkResponseSet);
  351. }
  352. return $bulkResponseSet;
  353. }
  354. }