MongoWriteBatch.php 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. <?php
  2. /*
  3. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  4. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  5. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  6. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  7. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  8. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  9. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  10. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  11. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  12. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  13. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  14. */
  15. use Alcaeus\MongoDbAdapter\TypeConverter;
  16. use Alcaeus\MongoDbAdapter\Helper\WriteConcernConverter;
  17. /**
  18. * MongoWriteBatch allows you to "batch up" multiple operations (of same type)
  19. * and shipping them all to MongoDB at the same time. This can be especially
  20. * useful when operating on many documents at the same time to reduce roundtrips.
  21. *
  22. * @see http://php.net/manual/en/class.mongowritebatch.php
  23. */
  24. class MongoWriteBatch
  25. {
  26. use WriteConcernConverter;
  27. const COMMAND_INSERT = 1;
  28. const COMMAND_UPDATE = 2;
  29. const COMMAND_DELETE = 3;
  30. /**
  31. * @var MongoCollection
  32. */
  33. private $collection;
  34. /**
  35. * @var int
  36. */
  37. private $batchType;
  38. /**
  39. * @var array
  40. */
  41. private $writeOptions;
  42. /**
  43. * @var array
  44. */
  45. private $items = [];
  46. /**
  47. * Creates a new batch of write operations
  48. *
  49. * @see http://php.net/manual/en/mongowritebatch.construct.php
  50. * @param MongoCollection $collection
  51. * @param int $batchType
  52. * @param array $writeOptions
  53. */
  54. protected function __construct(MongoCollection $collection, $batchType, $writeOptions)
  55. {
  56. $this->collection = $collection;
  57. $this->batchType = $batchType;
  58. $this->writeOptions = $writeOptions;
  59. }
  60. /**
  61. * Adds a write operation to a batch
  62. *
  63. * @see http://php.net/manual/en/mongowritebatch.add.php
  64. * @param array|object $item
  65. * @return boolean
  66. */
  67. public function add($item)
  68. {
  69. if (is_object($item)) {
  70. $item = (array)$item;
  71. }
  72. $this->validate($item);
  73. $this->addItem($item);
  74. return true;
  75. }
  76. /**
  77. * Executes a batch of write operations
  78. *
  79. * @see http://php.net/manual/en/mongowritebatch.execute.php
  80. * @param array $writeOptions
  81. * @return array
  82. */
  83. final public function execute(array $writeOptions = [])
  84. {
  85. $writeOptions += $this->writeOptions;
  86. if (! count($this->items)) {
  87. return ['ok' => true];
  88. }
  89. if (isset($writeOptions['j'])) {
  90. trigger_error('j parameter is not supported', E_WARNING);
  91. }
  92. if (isset($writeOptions['fsync'])) {
  93. trigger_error('fsync parameter is not supported', E_WARNING);
  94. }
  95. $options['writeConcern'] = $this->createWriteConcernFromArray($writeOptions);
  96. if (isset($writeOptions['ordered'])) {
  97. $options['ordered'] = $writeOptions['ordered'];
  98. }
  99. $collection = $this->collection->getCollection();
  100. try {
  101. $result = $collection->BulkWrite($this->items, $options);
  102. $ok = 1.0;
  103. } catch (\MongoDB\Driver\Exception\BulkWriteException $e) {
  104. $result = $e->getWriteResult();
  105. $ok = 0.0;
  106. }
  107. if ($ok === 1.0) {
  108. $this->items = [];
  109. }
  110. return [
  111. 'ok' => $ok,
  112. 'nInserted' => $result->getInsertedCount(),
  113. 'nMatched' => $result->getMatchedCount(),
  114. 'nModified' => $result->getModifiedCount(),
  115. 'nUpserted' => $result->getUpsertedCount(),
  116. 'nRemoved' => $result->getDeletedCount(),
  117. ];
  118. }
  119. private function validate(array $item)
  120. {
  121. switch ($this->batchType) {
  122. case self::COMMAND_UPDATE:
  123. if (! isset($item['q']) || ! isset($item['u'])) {
  124. throw new Exception('invalid item');
  125. }
  126. break;
  127. case self::COMMAND_DELETE:
  128. if (! isset($item['q']) || ! isset($item['limit'])) {
  129. throw new Exception('invalid item');
  130. }
  131. break;
  132. }
  133. }
  134. private function addItem(array $item)
  135. {
  136. switch ($this->batchType) {
  137. case self::COMMAND_UPDATE:
  138. $method = isset($item['multi']) ? 'updateMany' : 'updateOne';
  139. $options = [];
  140. if (isset($item['upsert']) && $item['upsert']) {
  141. $options['upsert'] = true;
  142. }
  143. $this->items[] = [$method => [TypeConverter::fromLegacy($item['q']), TypeConverter::fromLegacy($item['u']), $options]];
  144. break;
  145. case self::COMMAND_INSERT:
  146. $this->items[] = ['insertOne' => [TypeConverter::fromLegacy($item)]];
  147. break;
  148. case self::COMMAND_DELETE:
  149. $method = $item['limit'] === 0 ? 'deleteMany' : 'deleteOne';
  150. $this->items[] = [$method => [TypeConverter::fromLegacy($item['q'])]];
  151. break;
  152. }
  153. }
  154. }