Pipeline.php 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. <?php
  2. namespace Elastica;
  3. use Elastica\Exception\InvalidException;
  4. use Elastica\Exception\NotImplementedException;
  5. use Elastica\Processor\AbstractProcessor;
  6. use Elasticsearch\Endpoints\AbstractEndpoint;
  7. use Elasticsearch\Endpoints\Ingest\Pipeline\Delete;
  8. use Elasticsearch\Endpoints\Ingest\Pipeline\Get;
  9. use Elasticsearch\Endpoints\Ingest\Pipeline\Put;
  10. /**
  11. * Elastica Pipeline object.
  12. *
  13. * Handles Pipeline management & definition.
  14. *
  15. * @author Federico Panini <fpanini@gmail.com>
  16. *
  17. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest-processors.html
  18. */
  19. class Pipeline extends Param
  20. {
  21. /**
  22. * @var string name of the pipeline
  23. */
  24. protected $id;
  25. /**
  26. * Client Object.
  27. *
  28. * @var Client Client object
  29. */
  30. protected $_client;
  31. /**
  32. * Processors array.
  33. *
  34. * @var array
  35. */
  36. protected $_processors = [];
  37. /**
  38. * Create a new Pipeline Object.
  39. *
  40. * @param Client $client
  41. */
  42. public function __construct(Client $client)
  43. {
  44. $this->_client = $client;
  45. }
  46. /**
  47. * Create a Pipeline.
  48. *
  49. * @return Response
  50. *
  51. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/put-pipeline-api.html
  52. */
  53. public function create()
  54. {
  55. if (empty($this->id)) {
  56. throw new InvalidException('You should set a valid pipeline id');
  57. }
  58. if (empty($this->_params['description'])) {
  59. throw new InvalidException('You should set a valid processor description.');
  60. }
  61. if (empty($this->_processors['processors'])) {
  62. throw new InvalidException('You should set a valid processor of type Elastica\Processor\AbstractProcessor.');
  63. }
  64. $endpoint = new Put();
  65. $endpoint->setID($this->id);
  66. $endpoint->setBody($this->toArray());
  67. return $this->requestEndpoint($endpoint);
  68. }
  69. /**
  70. * Get a Pipeline Object.
  71. *
  72. * @param string $id Pipeline name
  73. *
  74. * @return Response
  75. *
  76. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/get-pipeline-api.html
  77. */
  78. public function getPipeline(string $id)
  79. {
  80. $endpoint = new Get();
  81. $endpoint->setID($id);
  82. return $this->requestEndpoint($endpoint);
  83. }
  84. /**
  85. * Delete a Pipeline.
  86. *
  87. * @param string $id Pipeline name
  88. *
  89. * @return Response
  90. *
  91. * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/delete-pipeline-api.html
  92. */
  93. public function deletePipeline(string $id)
  94. {
  95. $endpoint = new Delete();
  96. $endpoint->setID($id);
  97. return $this->requestEndpoint($endpoint);
  98. }
  99. /**
  100. * @todo implement simulate API
  101. */
  102. public function simulate()
  103. {
  104. throw new NotImplementedException('simulate API on Pipeline not yet implemented.');
  105. }
  106. /**
  107. * Sets query as raw array. Will overwrite all already set arguments.
  108. *
  109. * @param array $processors array
  110. *
  111. * @return $this
  112. */
  113. public function setRawProcessors(array $processors)
  114. {
  115. $this->_processors = $processors;
  116. return $this;
  117. }
  118. /**
  119. * Add a processor.
  120. *
  121. * @param AbstractProcessor $processor
  122. *
  123. * @return $this
  124. */
  125. public function addProcessor(AbstractProcessor $processor)
  126. {
  127. if (empty($this->_processors)) {
  128. $this->_processors['processors'] = $processor->toArray();
  129. $this->_params['processors'] = [];
  130. } else {
  131. $this->_processors['processors'] = array_merge($this->_processors['processors'], $processor->toArray());
  132. }
  133. return $this;
  134. }
  135. /**
  136. * Set pipeline id.
  137. *
  138. * @param string $id
  139. */
  140. public function setId(string $id)
  141. {
  142. $this->id = $id;
  143. }
  144. /**
  145. * Sets the processors.
  146. *
  147. * @param array $processors array of AbstractProcessor object
  148. *
  149. * @return $this
  150. */
  151. public function setProcessors(array $processors)
  152. {
  153. return $this->setParam('processors', [$processors]);
  154. }
  155. /**
  156. * Set Description.
  157. *
  158. * @param string $description
  159. *
  160. * @return $this
  161. */
  162. public function setDescription(string $description)
  163. {
  164. return $this->setParam('description', $description);
  165. }
  166. /**
  167. * Converts the params to an array. A default implementation exist to create
  168. * the an array out of the class name (last part of the class name)
  169. * and the params.
  170. *
  171. * @return array Filter array
  172. */
  173. public function toArray()
  174. {
  175. $this->_params['processors'] = [$this->_processors['processors']];
  176. return $this->getParams();
  177. }
  178. /**
  179. * Returns index client.
  180. *
  181. * @return \Elastica\Client Index client object
  182. */
  183. public function getClient()
  184. {
  185. return $this->_client;
  186. }
  187. /**
  188. * Makes calls to the elasticsearch server with usage official client Endpoint based on this index.
  189. *
  190. * @param AbstractEndpoint $endpoint
  191. *
  192. * @return Response
  193. */
  194. public function requestEndpoint(AbstractEndpoint $endpoint)
  195. {
  196. $cloned = clone $endpoint;
  197. return $this->getClient()->requestEndpoint($cloned);
  198. }
  199. }