collection = $collection; $this->batchType = $batchType; $this->writeOptions = $writeOptions; } /** * Adds a write operation to a batch * * @see http://php.net/manual/en/mongowritebatch.add.php * @param array|object $item * @return boolean */ public function add($item) { if (is_object($item)) { $item = (array)$item; } $this->validate($item); $this->addItem($item); return true; } /** * Executes a batch of write operations * * @see http://php.net/manual/en/mongowritebatch.execute.php * @param array $writeOptions * @return array */ final public function execute(array $writeOptions = []) { $writeOptions += $this->writeOptions; if (! count($this->items)) { return ['ok' => true]; } if (isset($writeOptions['j'])) { trigger_error('j parameter is not supported', E_WARNING); } if (isset($writeOptions['fsync'])) { trigger_error('fsync parameter is not supported', E_WARNING); } $options['writeConcern'] = $this->createWriteConcernFromArray($writeOptions); if (isset($writeOptions['ordered'])) { $options['ordered'] = $writeOptions['ordered']; } $collection = $this->collection->getCollection(); try { $result = $collection->BulkWrite($this->items, $options); $ok = true; } catch (\MongoDB\Driver\Exception\BulkWriteException $e) { $result = $e->getWriteResult(); $ok = false; } if ($ok === true) { $this->items = []; } switch ($this->batchType) { case self::COMMAND_UPDATE: $upsertedIds = []; foreach ($result->getUpsertedIds() as $index => $id) { $upsertedIds[] = [ 'index' => $index, '_id' => TypeConverter::toLegacy($id) ]; } $result = [ 'nMatched' => $result->getMatchedCount(), 'nModified' => $result->getModifiedCount(), 'nUpserted' => $result->getUpsertedCount(), 'ok' => $ok, ]; if (count($upsertedIds)) { $result['upserted'] = $upsertedIds; } return $result; case self::COMMAND_DELETE: return [ 'nRemoved' => $result->getDeletedCount(), 'ok' => $ok, ]; case self::COMMAND_INSERT: return [ 'nInserted' => $result->getInsertedCount(), 'ok' => $ok, ]; } } private function validate(array $item) { switch ($this->batchType) { case self::COMMAND_UPDATE: if (! isset($item['q'])) { throw new Exception("Expected \$item to contain 'q' key"); } if (! isset($item['u'])) { throw new Exception("Expected \$item to contain 'u' key"); } break; case self::COMMAND_DELETE: if (! isset($item['q'])) { throw new Exception("Expected \$item to contain 'q' key"); } if (! isset($item['limit'])) { throw new Exception("Expected \$item to contain 'limit' key"); } break; } } private function addItem(array $item) { switch ($this->batchType) { case self::COMMAND_UPDATE: $method = isset($item['multi']) ? 'updateMany' : 'updateOne'; $options = []; if (isset($item['upsert']) && $item['upsert']) { $options['upsert'] = true; } $this->items[] = [$method => [TypeConverter::fromLegacy($item['q']), TypeConverter::fromLegacy($item['u']), $options]]; break; case self::COMMAND_INSERT: $this->items[] = ['insertOne' => [TypeConverter::fromLegacy($item)]]; break; case self::COMMAND_DELETE: $method = $item['limit'] === 0 ? 'deleteMany' : 'deleteOne'; $this->items[] = [$method => [TypeConverter::fromLegacy($item['q'])]]; break; } } }