MongoWriteBatch.php 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. <?php
  2. /*
  3. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  4. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  5. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  6. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  7. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  8. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  9. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  10. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  11. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  12. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  13. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  14. */
  15. if (class_exists('MongoWriteBatch', false)) {
  16. return;
  17. }
  18. use Alcaeus\MongoDbAdapter\TypeConverter;
  19. use Alcaeus\MongoDbAdapter\Helper\WriteConcernConverter;
  20. use MongoDB\Driver\Exception\BulkWriteException;
  21. use MongoDB\Driver\WriteError;
  22. use MongoDB\Driver\WriteResult;
  23. /**
  24. * MongoWriteBatch allows you to "batch up" multiple operations (of same type)
  25. * and shipping them all to MongoDB at the same time. This can be especially
  26. * useful when operating on many documents at the same time to reduce roundtrips.
  27. *
  28. * @see http://php.net/manual/en/class.mongowritebatch.php
  29. */
  30. class MongoWriteBatch
  31. {
  32. use WriteConcernConverter;
  33. const COMMAND_INSERT = 1;
  34. const COMMAND_UPDATE = 2;
  35. const COMMAND_DELETE = 3;
  36. /**
  37. * @var MongoCollection
  38. */
  39. private $collection;
  40. /**
  41. * @var int
  42. */
  43. private $batchType;
  44. /**
  45. * @var array
  46. */
  47. private $writeOptions;
  48. /**
  49. * @var array
  50. */
  51. private $items = [];
  52. /**
  53. * Creates a new batch of write operations
  54. *
  55. * @see http://php.net/manual/en/mongowritebatch.construct.php
  56. * @param MongoCollection $collection
  57. * @param int $batchType
  58. * @param array $writeOptions
  59. */
  60. protected function __construct(MongoCollection $collection, $batchType, $writeOptions)
  61. {
  62. $this->collection = $collection;
  63. $this->batchType = $batchType;
  64. $this->writeOptions = $writeOptions;
  65. }
  66. /**
  67. * Adds a write operation to a batch
  68. *
  69. * @see http://php.net/manual/en/mongowritebatch.add.php
  70. * @param array|object $item
  71. * @return boolean
  72. */
  73. public function add($item)
  74. {
  75. if (is_object($item)) {
  76. $item = (array) $item;
  77. }
  78. $this->validate($item);
  79. $this->addItem($item);
  80. return true;
  81. }
  82. /**
  83. * Executes a batch of write operations
  84. *
  85. * @see http://php.net/manual/en/mongowritebatch.execute.php
  86. * @param array $writeOptions
  87. * @return array
  88. */
  89. final public function execute(array $writeOptions = [])
  90. {
  91. $writeOptions += $this->writeOptions;
  92. if (! count($this->items)) {
  93. return ['ok' => true];
  94. }
  95. if (isset($writeOptions['j'])) {
  96. trigger_error('j parameter is not supported', E_USER_WARNING);
  97. }
  98. if (isset($writeOptions['fsync'])) {
  99. trigger_error('fsync parameter is not supported', E_USER_WARNING);
  100. }
  101. $options['writeConcern'] = $this->createWriteConcernFromArray($writeOptions);
  102. if (isset($writeOptions['ordered'])) {
  103. $options['ordered'] = $writeOptions['ordered'];
  104. }
  105. try {
  106. $writeResult = $this->collection->getCollection()->bulkWrite($this->items, $options);
  107. $resultDocument = [];
  108. $ok = true;
  109. } catch (BulkWriteException $e) {
  110. $writeResult = $e->getWriteResult();
  111. $resultDocument = ['writeErrors' => $this->convertWriteErrors($writeResult)];
  112. $ok = false;
  113. }
  114. $this->items = [];
  115. switch ($this->batchType) {
  116. case self::COMMAND_UPDATE:
  117. if ($options['writeConcern']->getW() === 0) {
  118. $resultDocument += [
  119. 'nMatched' => 0,
  120. 'nModified' => 0,
  121. 'nUpserted' => 0,
  122. 'ok' => true,
  123. ];
  124. break;
  125. }
  126. $upsertedIds = [];
  127. foreach ($writeResult->getUpsertedIds() as $index => $id) {
  128. $upsertedIds[] = [
  129. 'index' => $index,
  130. '_id' => TypeConverter::toLegacy($id)
  131. ];
  132. }
  133. $resultDocument += [
  134. 'nMatched' => $writeResult->getMatchedCount(),
  135. 'nModified' => $writeResult->getModifiedCount(),
  136. 'nUpserted' => $writeResult->getUpsertedCount(),
  137. 'ok' => true,
  138. ];
  139. if (count($upsertedIds)) {
  140. $resultDocument['upserted'] = $upsertedIds;
  141. }
  142. break;
  143. case self::COMMAND_DELETE:
  144. if ($options['writeConcern']->getW() === 0) {
  145. $resultDocument += [
  146. 'nRemoved' => 0,
  147. 'ok' => true,
  148. ];
  149. break;
  150. }
  151. $resultDocument += [
  152. 'nRemoved' => $writeResult->getDeletedCount(),
  153. 'ok' => true,
  154. ];
  155. break;
  156. case self::COMMAND_INSERT:
  157. if ($options['writeConcern']->getW() === 0) {
  158. $resultDocument += [
  159. 'nInserted' => 0,
  160. 'ok' => true,
  161. ];
  162. break;
  163. }
  164. $resultDocument += [
  165. 'nInserted' => $writeResult->getInsertedCount(),
  166. 'ok' => true,
  167. ];
  168. break;
  169. }
  170. if (! $ok) {
  171. // Exception code is hardcoded to the value in ext-mongo, see
  172. // https://github.com/mongodb/mongo-php-driver-legacy/blob/ab4bc0d90e93b3f247f6bcb386d0abc8d2fa7d74/batch/write.c#L428
  173. throw new \MongoWriteConcernException('Failed write', 911, null, $resultDocument);
  174. }
  175. return $resultDocument;
  176. }
  177. private function validate(array $item)
  178. {
  179. switch ($this->batchType) {
  180. case self::COMMAND_UPDATE:
  181. if (! isset($item['q'])) {
  182. throw new Exception("Expected \$item to contain 'q' key");
  183. }
  184. if (! isset($item['u'])) {
  185. throw new Exception("Expected \$item to contain 'u' key");
  186. }
  187. break;
  188. case self::COMMAND_DELETE:
  189. if (! isset($item['q'])) {
  190. throw new Exception("Expected \$item to contain 'q' key");
  191. }
  192. if (! isset($item['limit'])) {
  193. throw new Exception("Expected \$item to contain 'limit' key");
  194. }
  195. break;
  196. }
  197. }
  198. private function addItem(array $item)
  199. {
  200. switch ($this->batchType) {
  201. case self::COMMAND_UPDATE:
  202. $method = isset($item['multi']) ? 'updateMany' : 'updateOne';
  203. $options = [];
  204. if (isset($item['upsert']) && $item['upsert']) {
  205. $options['upsert'] = true;
  206. }
  207. $this->items[] = [$method => [TypeConverter::fromLegacy($item['q']), TypeConverter::fromLegacy($item['u']), $options]];
  208. break;
  209. case self::COMMAND_INSERT:
  210. $this->items[] = ['insertOne' => [TypeConverter::fromLegacy($item)]];
  211. break;
  212. case self::COMMAND_DELETE:
  213. $method = $item['limit'] === 0 ? 'deleteMany' : 'deleteOne';
  214. $this->items[] = [$method => [TypeConverter::fromLegacy($item['q'])]];
  215. break;
  216. }
  217. }
  218. /**
  219. * @param WriteResult $result
  220. * @return array
  221. */
  222. private function convertWriteErrors(WriteResult $result)
  223. {
  224. $writeErrors = [];
  225. /** @var WriteError $writeError */
  226. foreach ($result->getWriteErrors() as $writeError) {
  227. $writeErrors[] = [
  228. 'index' => $writeError->getIndex(),
  229. 'code' => $writeError->getCode(),
  230. 'errmsg' => $writeError->getMessage(),
  231. ];
  232. }
  233. return $writeErrors;
  234. }
  235. }