MongoWriteBatch.php 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. <?php
  2. /*
  3. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  4. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  5. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  6. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  7. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  8. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  9. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  10. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  11. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  12. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  13. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  14. */
  15. if (class_exists('MongoWriteBatch', false)) {
  16. return;
  17. }
  18. use Alcaeus\MongoDbAdapter\TypeConverter;
  19. use Alcaeus\MongoDbAdapter\Helper\WriteConcernConverter;
  20. /**
  21. * MongoWriteBatch allows you to "batch up" multiple operations (of same type)
  22. * and shipping them all to MongoDB at the same time. This can be especially
  23. * useful when operating on many documents at the same time to reduce roundtrips.
  24. *
  25. * @see http://php.net/manual/en/class.mongowritebatch.php
  26. */
  27. class MongoWriteBatch
  28. {
  29. use WriteConcernConverter;
  30. const COMMAND_INSERT = 1;
  31. const COMMAND_UPDATE = 2;
  32. const COMMAND_DELETE = 3;
  33. /**
  34. * @var MongoCollection
  35. */
  36. private $collection;
  37. /**
  38. * @var int
  39. */
  40. private $batchType;
  41. /**
  42. * @var array
  43. */
  44. private $writeOptions;
  45. /**
  46. * @var array
  47. */
  48. private $items = [];
  49. /**
  50. * Creates a new batch of write operations
  51. *
  52. * @see http://php.net/manual/en/mongowritebatch.construct.php
  53. * @param MongoCollection $collection
  54. * @param int $batchType
  55. * @param array $writeOptions
  56. */
  57. protected function __construct(MongoCollection $collection, $batchType, $writeOptions)
  58. {
  59. $this->collection = $collection;
  60. $this->batchType = $batchType;
  61. $this->writeOptions = $writeOptions;
  62. }
  63. /**
  64. * Adds a write operation to a batch
  65. *
  66. * @see http://php.net/manual/en/mongowritebatch.add.php
  67. * @param array|object $item
  68. * @return boolean
  69. */
  70. public function add($item)
  71. {
  72. if (is_object($item)) {
  73. $item = (array)$item;
  74. }
  75. $this->validate($item);
  76. $this->addItem($item);
  77. return true;
  78. }
  79. /**
  80. * Executes a batch of write operations
  81. *
  82. * @see http://php.net/manual/en/mongowritebatch.execute.php
  83. * @param array $writeOptions
  84. * @return array
  85. */
  86. final public function execute(array $writeOptions = [])
  87. {
  88. $writeOptions += $this->writeOptions;
  89. if (! count($this->items)) {
  90. return ['ok' => true];
  91. }
  92. if (isset($writeOptions['j'])) {
  93. trigger_error('j parameter is not supported', E_WARNING);
  94. }
  95. if (isset($writeOptions['fsync'])) {
  96. trigger_error('fsync parameter is not supported', E_WARNING);
  97. }
  98. $options['writeConcern'] = $this->createWriteConcernFromArray($writeOptions);
  99. if (isset($writeOptions['ordered'])) {
  100. $options['ordered'] = $writeOptions['ordered'];
  101. }
  102. $collection = $this->collection->getCollection();
  103. try {
  104. $result = $collection->BulkWrite($this->items, $options);
  105. $ok = true;
  106. } catch (\MongoDB\Driver\Exception\BulkWriteException $e) {
  107. $result = $e->getWriteResult();
  108. $ok = false;
  109. }
  110. if ($ok === true) {
  111. $this->items = [];
  112. }
  113. switch ($this->batchType) {
  114. case self::COMMAND_UPDATE:
  115. $upsertedIds = [];
  116. foreach ($result->getUpsertedIds() as $index => $id) {
  117. $upsertedIds[] = [
  118. 'index' => $index,
  119. '_id' => TypeConverter::toLegacy($id)
  120. ];
  121. }
  122. $result = [
  123. 'nMatched' => $result->getMatchedCount(),
  124. 'nModified' => $result->getModifiedCount(),
  125. 'nUpserted' => $result->getUpsertedCount(),
  126. 'ok' => $ok,
  127. ];
  128. if (count($upsertedIds)) {
  129. $result['upserted'] = $upsertedIds;
  130. }
  131. return $result;
  132. case self::COMMAND_DELETE:
  133. return [
  134. 'nRemoved' => $result->getDeletedCount(),
  135. 'ok' => $ok,
  136. ];
  137. case self::COMMAND_INSERT:
  138. return [
  139. 'nInserted' => $result->getInsertedCount(),
  140. 'ok' => $ok,
  141. ];
  142. }
  143. }
  144. private function validate(array $item)
  145. {
  146. switch ($this->batchType) {
  147. case self::COMMAND_UPDATE:
  148. if (! isset($item['q'])) {
  149. throw new Exception("Expected \$item to contain 'q' key");
  150. }
  151. if (! isset($item['u'])) {
  152. throw new Exception("Expected \$item to contain 'u' key");
  153. }
  154. break;
  155. case self::COMMAND_DELETE:
  156. if (! isset($item['q'])) {
  157. throw new Exception("Expected \$item to contain 'q' key");
  158. }
  159. if (! isset($item['limit'])) {
  160. throw new Exception("Expected \$item to contain 'limit' key");
  161. }
  162. break;
  163. }
  164. }
  165. private function addItem(array $item)
  166. {
  167. switch ($this->batchType) {
  168. case self::COMMAND_UPDATE:
  169. $method = isset($item['multi']) ? 'updateMany' : 'updateOne';
  170. $options = [];
  171. if (isset($item['upsert']) && $item['upsert']) {
  172. $options['upsert'] = true;
  173. }
  174. $this->items[] = [$method => [TypeConverter::fromLegacy($item['q']), TypeConverter::fromLegacy($item['u']), $options]];
  175. break;
  176. case self::COMMAND_INSERT:
  177. $this->items[] = ['insertOne' => [TypeConverter::fromLegacy($item)]];
  178. break;
  179. case self::COMMAND_DELETE:
  180. $method = $item['limit'] === 0 ? 'deleteMany' : 'deleteOne';
  181. $this->items[] = [$method => [TypeConverter::fromLegacy($item['q'])]];
  182. break;
  183. }
  184. }
  185. }