collection = $collection; $this->batchType = $batchType; $this->writeOptions = $writeOptions; } /** * Adds a write operation to a batch * * @see http://php.net/manual/en/mongowritebatch.add.php * @param array|object $item * @return boolean */ public function add($item) { if (is_object($item)) { $item = (array)$item; } $this->validate($item); $this->addItem($item); return true; } /** * Executes a batch of write operations * * @see http://php.net/manual/en/mongowritebatch.execute.php * @param array $writeOptions * @return array */ final public function execute(array $writeOptions = []) { $writeOptions += $this->writeOptions; if (! count($this->items)) { return ['ok' => true]; } if (isset($writeOptions['j'])) { trigger_error('j parameter is not supported', E_WARNING); } if (isset($writeOptions['fsync'])) { trigger_error('fsync parameter is not supported', E_WARNING); } $options['writeConcern'] = $this->createWriteConcernFromArray($writeOptions); if (isset($writeOptions['ordered'])) { $options['ordered'] = $writeOptions['ordered']; } $collection = $this->collection->getCollection(); try { $result = $collection->BulkWrite($this->items, $options); $ok = 1.0; } catch (\MongoDB\Driver\Exception\BulkWriteException $e) { $result = $e->getWriteResult(); $ok = 0.0; } if ($ok === 1.0) { $this->items = []; } return [ 'ok' => $ok, 'nInserted' => $result->getInsertedCount(), 'nMatched' => $result->getMatchedCount(), 'nModified' => $result->getModifiedCount(), 'nUpserted' => $result->getUpsertedCount(), 'nRemoved' => $result->getDeletedCount(), ]; } private function validate(array $item) { switch ($this->batchType) { case self::COMMAND_UPDATE: if (! isset($item['q']) || ! isset($item['u'])) { throw new Exception('invalid item'); } break; case self::COMMAND_DELETE: if (! isset($item['q']) || ! isset($item['limit'])) { throw new Exception('invalid item'); } break; } } private function addItem(array $item) { switch ($this->batchType) { case self::COMMAND_UPDATE: $method = isset($item['multi']) ? 'updateMany' : 'updateOne'; $options = []; if (isset($item['upsert']) && $item['upsert']) { $options['upsert'] = true; } $this->items[] = [$method => [TypeConverter::fromLegacy($item['q']), TypeConverter::fromLegacy($item['u']), $options]]; break; case self::COMMAND_INSERT: $this->items[] = ['insertOne' => [TypeConverter::fromLegacy($item)]]; break; case self::COMMAND_DELETE: $method = $item['limit'] === 0 ? 'deleteMany' : 'deleteOne'; $this->items[] = [$method => [TypeConverter::fromLegacy($item['q'])]]; break; } } }