فهرست منبع

Merge pull request #10 from alcaeus/implement-command-cursor

Implement command cursors and aggregation
Andreas 10 سال پیش
والد
کامیت
b3f7d48206

+ 395 - 0
lib/Alcaeus/MongoDbAdapter/AbstractCursor.php

@@ -0,0 +1,395 @@
+<?php
+/*
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+namespace Alcaeus\MongoDbAdapter;
+use MongoDB\Collection;
+use MongoDB\Driver\Cursor;
+use MongoDB\Driver\ReadPreference;
+
+/**
+ * @internal
+ */
+abstract class AbstractCursor
+{
+    /**
+     * @var int
+     */
+    protected $batchSize;
+
+    /**
+     * @var Collection
+     */
+    protected $collection;
+
+    /**
+     * @var \MongoClient
+     */
+    protected $connection;
+
+    /**
+     * @var Cursor
+     */
+    protected $cursor;
+
+    /**
+     * @var \MongoDB\Database
+     */
+    protected $db;
+
+    /**
+     * @var \IteratorIterator
+     */
+    protected $iterator;
+
+    /**
+     * @var string
+     */
+    protected $ns;
+
+    /**
+     * @var array
+     */
+    protected $optionNames = [
+        'batchSize',
+        'readPreference',
+    ];
+
+    /**
+     * @var array
+     */
+    protected $readPreference = [];
+
+    /**
+     * @return Cursor
+     */
+    abstract protected function ensureCursor();
+
+    /**
+     * @return array
+     */
+    abstract protected function getCursorInfo();
+
+    /**
+     * Create a new cursor
+     * @link http://www.php.net/manual/en/mongocursor.construct.php
+     * @param \MongoClient $connection Database connection.
+     * @param string $ns Full name of database and collection.
+     */
+    public function __construct(\MongoClient $connection, $ns)
+    {
+        $this->connection = $connection;
+        $this->ns = $ns;
+
+        $nsParts = explode('.', $ns);
+        $dbName = array_shift($nsParts);
+        $collectionName = implode('.', $nsParts);
+
+        $this->db = $connection->selectDB($dbName)->getDb();
+
+        if ($collectionName) {
+            $this->collection = $connection->selectCollection($dbName, $collectionName)->getCollection();
+        }
+    }
+
+    /**
+     * Returns the current element
+     * @link http://www.php.net/manual/en/mongocursor.current.php
+     * @return array
+     */
+    public function current()
+    {
+        $document = $this->ensureIterator()->current();
+        if ($document !== null) {
+            $document = TypeConverter::convertObjectToLegacyArray($document);
+        }
+
+        return $document;
+    }
+
+    /**
+     * Returns the current result's _id
+     * @link http://www.php.net/manual/en/mongocursor.key.php
+     * @return string The current result's _id as a string.
+     */
+    public function key()
+    {
+        return $this->ensureIterator()->key();
+    }
+
+    /**
+     * Advances the cursor to the next result
+     * @link http://www.php.net/manual/en/mongocursor.next.php
+     * @throws \MongoConnectionException
+     * @throws \MongoCursorTimeoutException
+     * @return void
+     */
+    public function next()
+    {
+        $this->ensureIterator()->next();
+    }
+
+    /**
+     * Returns the cursor to the beginning of the result set
+     * @throws \MongoConnectionException
+     * @throws \MongoCursorTimeoutException
+     * @return void
+     */
+    public function rewind()
+    {
+        // We can recreate the cursor to allow it to be rewound
+        $this->reset();
+        $this->ensureIterator()->rewind();
+    }
+
+    /**
+     * Checks if the cursor is reading a valid result.
+     * @link http://www.php.net/manual/en/mongocursor.valid.php
+     * @return boolean If the current result is not null.
+     */
+    public function valid()
+    {
+        return $this->ensureIterator()->valid();
+    }
+
+    /**
+     * Limits the number of elements returned in one batch.
+     *
+     * @link http://docs.php.net/manual/en/mongocursor.batchsize.php
+     * @param int $batchSize The number of results to return per batch
+     * @return $this Returns this cursor.
+     */
+    public function batchSize($batchSize)
+    {
+        $this->batchSize = $batchSize;
+
+        return $this;
+    }
+
+    /**
+     * Checks if there are documents that have not been sent yet from the database for this cursor
+     * @link http://www.php.net/manual/en/mongocursor.dead.php
+     * @return boolean Returns if there are more results that have not been sent to the client, yet.
+     */
+    public function dead()
+    {
+        return $this->ensureCursor()->isDead();
+    }
+
+    /**
+     * Get the read preference for this query
+     * @link http://www.php.net/manual/en/mongocursor.getreadpreference.php
+     * @return array
+     */
+    public function getReadPreference()
+    {
+        return $this->readPreference;
+    }
+
+    /**
+     * @return array
+     */
+    public function info()
+    {
+        return $this->getCursorInfo() + $this->getIterationInfo();
+    }
+
+    /**
+     * @link http://www.php.net/manual/en/mongocursor.setreadpreference.php
+     * @param string $readPreference
+     * @param array $tags
+     * @return $this Returns this cursor.
+     */
+    public function setReadPreference($readPreference, $tags = null)
+    {
+        $availableReadPreferences = [
+            \MongoClient::RP_PRIMARY,
+            \MongoClient::RP_PRIMARY_PREFERRED,
+            \MongoClient::RP_SECONDARY,
+            \MongoClient::RP_SECONDARY_PREFERRED,
+            \MongoClient::RP_NEAREST
+        ];
+        if (! in_array($readPreference, $availableReadPreferences)) {
+            trigger_error("The value '$readPreference' is not valid as read preference type", E_WARNING);
+            return $this;
+        }
+
+        if ($readPreference == \MongoClient::RP_PRIMARY && count($tags)) {
+            trigger_error("You can't use read preference tags with a read preference of PRIMARY", E_WARNING);
+            return $this;
+        }
+
+        $this->readPreference = [
+            'type' => $readPreference,
+            'tagsets' => $tags
+        ];
+
+        return $this;
+    }
+
+    /**
+     * Sets a client-side timeout for this query
+     * @link http://www.php.net/manual/en/mongocursor.timeout.php
+     * @param int $ms The number of milliseconds for the cursor to wait for a response. By default, the cursor will wait forever.
+     * @return $this Returns this cursor
+     */
+    public function timeout($ms)
+    {
+        $this->notImplemented();
+    }
+
+    /**
+     * Applies all options set on the cursor, overwriting any options that have already been set
+     *
+     * @param array $optionNames Array of option names to be applied (will be read from properties)
+     * @return array
+     */
+    protected function getOptions($optionNames = null)
+    {
+        $options = [];
+
+        if ($optionNames === null) {
+            $optionNames = $this->optionNames;
+        }
+
+        foreach ($optionNames as $option) {
+            $converter = 'convert' . ucfirst($option);
+            $value = method_exists($this, $converter) ? $this->$converter() : $this->$option;
+
+            if ($value === null) {
+                continue;
+            }
+
+            $options[$option] = $value;
+        }
+
+        return $options;
+    }
+
+    /**
+     * @return ReadPreference|null
+     */
+    protected function convertReadPreference()
+    {
+        $type = array_key_exists('type', $this->readPreference) ? $this->readPreference['type'] : null;
+        if ($type === null) {
+            return null;
+        }
+
+        switch ($type) {
+            case \MongoClient::RP_PRIMARY_PREFERRED:
+                $mode = ReadPreference::RP_PRIMARY_PREFERRED;
+                break;
+            case \MongoClient::RP_SECONDARY:
+                $mode = ReadPreference::RP_SECONDARY;
+                break;
+            case \MongoClient::RP_SECONDARY_PREFERRED:
+                $mode = ReadPreference::RP_SECONDARY_PREFERRED;
+                break;
+            case \MongoClient::RP_NEAREST:
+                $mode = ReadPreference::RP_NEAREST;
+                break;
+            default:
+                $mode = ReadPreference::RP_PRIMARY;
+        }
+
+        $tagSets = array_key_exists('tagsets', $this->readPreference) ? $this->readPreference['tagsets'] : [];
+
+        return new ReadPreference($mode, $tagSets);
+    }
+
+    /**
+     * @return \IteratorIterator
+     */
+    protected function ensureIterator()
+    {
+        if ($this->iterator === null) {
+            $this->iterator = new \IteratorIterator($this->ensureCursor());
+        }
+
+        return $this->iterator;
+    }
+
+    /**
+     * @throws \MongoCursorException
+     */
+    protected function errorIfOpened()
+    {
+        if ($this->cursor === null) {
+            return;
+        }
+
+        throw new \MongoCursorException('cannot modify cursor after beginning iteration.');
+    }
+
+    /**
+     * @return array
+     */
+    protected function getIterationInfo()
+    {
+        $iterationInfo = [
+            'started_iterating' => $this->cursor !== null,
+        ];
+
+        if ($this->cursor !== null) {
+            switch ($this->cursor->getServer()->getType()) {
+                case \MongoDB\Driver\Server::TYPE_RS_ARBITER:
+                    $typeString = 'ARBITER';
+                    break;
+                case \MongoDB\Driver\Server::TYPE_MONGOS:
+                    $typeString = 'MONGOS';
+                    break;
+                case \MongoDB\Driver\Server::TYPE_RS_PRIMARY:
+                    $typeString = 'PRIMARY';
+                    break;
+                case \MongoDB\Driver\Server::TYPE_RS_SECONDARY:
+                    $typeString = 'SECONDARY';
+                    break;
+                default:
+                    $typeString = 'STANDALONE';
+            }
+
+            $iterationInfo += [
+                'id' => (string) $this->cursor->getId(),
+                'at' => null, // @todo Complete info for cursor that is iterating
+                'numReturned' => null, // @todo Complete info for cursor that is iterating
+                'server' => null, // @todo Complete info for cursor that is iterating
+                'host' => $this->cursor->getServer()->getHost(),
+                'port' => $this->cursor->getServer()->getPort(),
+                'connection_type_desc' => $typeString,
+            ];
+        }
+
+        return $iterationInfo;
+    }
+
+    /**
+     * @throws \Exception
+     */
+    protected function notImplemented()
+    {
+        throw new \Exception('Not implemented');
+    }
+
+    /**
+     * Clears the cursor
+     *
+     * This is generic but implemented as protected since it's only exposed in MongoCursor
+     */
+    protected function reset()
+    {
+        $this->cursor = null;
+        $this->iterator = null;
+    }
+}

+ 37 - 6
lib/Alcaeus/MongoDbAdapter/TypeConverter.php

@@ -22,14 +22,14 @@ class TypeConverter
 {
     public static function convertLegacyArrayToObject($array)
     {
-        // TODO: provide actual class
-        $result = new \stdClass();
+        // TODO: provide actual class once mongodb/mongo-php-library#78 has been merged
+        $result = [];
 
         foreach ($array as $key => $value) {
-            $result->$key = (is_array($value)) ? static::convertLegacyArrayToObject($value) : static::convertToBSONType($value);
+            $result[$key] = (is_array($value)) ? static::convertLegacyArrayToObject($value) : static::convertToBSONType($value);
         }
 
-        return $result;
+        return self::ensureCorrectType($result);
     }
 
     public static function convertObjectToLegacyArray($object)
@@ -37,8 +37,8 @@ class TypeConverter
         $result = [];
 
         foreach ($object as $key => $value) {
-            // TODO: maybe add a more meaningful check instead of stdClass?
-            $result[$key] = ($value instanceof \stdClass) ? static::convertObjectToLegacyArray($value) : static::convertToLegacyType($value);
+            // TODO: use actual class instead of \stdClass once mongodb/mongo-php-library#78 has been merged
+            $result[$key] = ($value instanceof \stdClass || is_array($value)) ? static::convertObjectToLegacyArray($value) : static::convertToLegacyType($value);
         }
 
         return $result;
@@ -65,4 +65,35 @@ class TypeConverter
                 return $value;
         }
     }
+
+    /**
+     * @param array $array
+     * @return bool
+     */
+    public static function isNumericArray(array $array)
+    {
+        return $array === [] || is_numeric(array_keys($array)[0]);
+    }
+
+    /**
+     * Converts all arrays with non-numeric keys to stdClass
+     *
+     * @param array $array
+     * @return array|\stdClass
+     */
+    private static function ensureCorrectType(array $array)
+    {
+        // Empty arrays are left untouched since they may be an empty list or empty document
+        if (static::isNumericArray($array)) {
+            return $array;
+        }
+
+        // Can convert array to stdClass
+        $object = new \stdClass();
+        foreach ($array as $key => $value) {
+            $object->$key = $value;
+        }
+
+        return $object;
+    }
 }

+ 45 - 7
lib/Mongo/MongoCollection.php

@@ -100,12 +100,36 @@ class MongoCollection
      * @link http://www.php.net/manual/en/mongocollection.aggregate.php
      * @param array $pipeline
      * @param array $op
-     * @param array $pipelineOperators
      * @return array
      */
-    public function aggregate(array $pipeline, array $op, array $pipelineOperators)
+    public function aggregate(array $pipeline, array $op = [])
     {
-        $this->notImplemented();
+        if (! TypeConverter::isNumericArray($pipeline)) {
+            $pipeline = [];
+            $options = [];
+
+            $i = 0;
+            foreach (func_get_args() as $operator) {
+                $i++;
+                if (! is_array($operator)) {
+                    trigger_error("Argument $i is not an array", E_WARNING);
+                    return;
+                }
+
+                $pipeline[] = $operator;
+            }
+        } else {
+            $options = $op;
+        }
+
+        $command = [
+            'aggregate' => $this->name,
+            'pipeline' => $pipeline
+        ];
+
+        $command += $options;
+
+        return $this->db->command($command, [], $hash);
     }
 
     /**
@@ -114,9 +138,23 @@ class MongoCollection
      * @param array $options
      * @return MongoCommandCursor
      */
-    public function aggregateCursor(array $pipeline, array $options)
+    public function aggregateCursor(array $pipeline, array $options = [])
     {
-        $this->notImplemented();
+        // Build command manually, can't use mongo-php-library here
+        $command = [
+            'aggregate' => $this->name,
+            'pipeline' => $pipeline
+        ];
+
+        // Convert cursor option
+        if (! isset($options['cursor']) || $options['cursor'] === true || $options['cursor'] === []) {
+            // Cursor option needs to be an object convert bools and empty arrays since those won't be handled by TypeConverter
+            $options['cursor'] = new \stdClass;
+        }
+
+        $command += $options;
+
+        return new MongoCommandCursor($this->db->getConnection(), (string) $this, $command);
     }
 
     /**
@@ -323,7 +361,7 @@ class MongoCollection
      * @param array $query An optional query parameters
      * @return array|bool Returns an array of distinct values, or <b>FALSE</b> on failure
      */
-    public function distinct($key, array $query = NULL)
+    public function distinct($key, array $query = [])
     {
         return array_map([TypeConverter::class, 'convertToLegacyType'], $this->collection->distinct($key, $query));
     }
@@ -351,7 +389,7 @@ class MongoCollection
      */
     public function findOne(array $query = array(), array $fields = array())
     {
-        $document = $this->collection->findOne(TypeConverter::convertLegacyArrayToObject($query));
+        $document = $this->collection->findOne(TypeConverter::convertLegacyArrayToObject($query), ['projection' => $fields]);
         if ($document !== null) {
             $document = TypeConverter::convertObjectToLegacyArray($document);
         }

+ 47 - 39
lib/Mongo/MongoCommandCursor.php

@@ -13,57 +13,65 @@
  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
-class MongoCommandCursor implements MongoCursorInterface, Iterator{
-    /**
-     * Return the current element
-     * @link http://php.net/manual/en/iterator.current.php
-     * @return mixed Can return any type.
-     * @since 5.0.0
-     */
-    public function current(){}
+use Alcaeus\MongoDbAdapter\AbstractCursor;
+use Alcaeus\MongoDbAdapter\TypeConverter;
 
+class MongoCommandCursor extends AbstractCursor implements MongoCursorInterface
+{
     /**
-     * Move forward to next element
-     * @link http://php.net/manual/en/iterator.next.php
-     * @return void Any returned value is ignored.
-     * @since 5.0.0
+     * @var array
      */
-    public function next(){}
+    private $command;
 
     /**
-     * Return the key of the current element
-     * @link http://php.net/manual/en/iterator.key.php
-     * @return mixed scalar on success, or null on failure.
-     * @since 5.0.0
+     * MongoCommandCursor constructor.
+     * @param MongoClient $connection
+     * @param string $ns
+     * @param array $command
      */
-    public function key(){}
+    public function __construct(MongoClient $connection, $ns, array $command = [])
+    {
+        parent::__construct($connection, $ns);
+
+        $this->command = $command;
+    }
 
     /**
-     * Checks if current position is valid
-     * @link http://php.net/manual/en/iterator.valid.php
-     * @return boolean The return value will be casted to boolean and then evaluated.
-     * Returns true on success or false on failure.
-     * @since 5.0.0
+     * @param MongoClient $connection
+     * @param string $hash
+     * @param array $document
+     * @return MongoCommandCursor
      */
-    public function valid(){}
+    public static function createFromDocument(MongoClient $connection, $hash, array $document)
+    {
+        throw new \Exception('Not implemented');
+    }
 
     /**
-     * Rewind the Iterator to the first element
-     * @link http://php.net/manual/en/iterator.rewind.php
-     * @return void Any returned value is ignored.
-     * @since 5.0.0
+     * @return \MongoDB\Driver\Cursor
      */
-    public function rewind(){}
-
-    function batchSize(int $batchSize):MongoCursorInterface{}
-
-    function dead():bool{}
+    protected function ensureCursor()
+    {
+        if ($this->cursor === null) {
+            $this->cursor = $this->db->command(TypeConverter::convertLegacyArrayToObject($this->command), $this->getOptions());
+        }
 
-    function info():array{}
+        return $this->cursor;
+    }
 
-    function getReadPreference():array{}
-
-    function setReadPreference(string $read_preference, array $tags = null):MongoCursorInterface{}
-
-    function timeout(int $ms):MongoCursorInterface{}
+    /**
+     * @return array
+     */
+    protected function getCursorInfo()
+    {
+        return [
+            'ns' => $this->ns,
+            'limit' => 0,
+            'batchSize' => $this->batchSize,
+            'skip' => 0,
+            'flags' => 0,
+            'query' => $this->command,
+            'fields' => null,
+        ];
+    }
 }

+ 62 - 335
lib/Mongo/MongoCursor.php

@@ -13,8 +13,7 @@
  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  */
 
-use Alcaeus\MongoDbAdapter\TypeConverter;
-use MongoDB\Collection;
+use Alcaeus\MongoDbAdapter\AbstractCursor;
 use MongoDB\Driver\Cursor;
 use MongoDB\Driver\ReadPreference;
 use MongoDB\Operation\Find;
@@ -23,7 +22,7 @@ use MongoDB\Operation\Find;
  * Result object for database query.
  * @link http://www.php.net/manual/en/class.mongocursor.php
  */
-class MongoCursor implements Iterator
+class MongoCursor extends AbstractCursor implements Iterator
 {
     /**
      * @var bool
@@ -33,59 +32,47 @@ class MongoCursor implements Iterator
     /**
      * @var int
      */
-    static $timeout = 30000;
-
-    /**
-     * @var MongoClient
-     */
-    private $connection;
-
-    /**
-     * @var string
-     */
-    private $ns;
+    public static $timeout = 30000;
 
     /**
      * @var array
      */
-    private $query;
-
-    /**
-     * @var
-     */
-    private $filter;
+    protected $optionNames = [
+        'allowPartialResults',
+        'batchSize',
+        'cursorType',
+        'limit',
+        'maxTimeMS',
+        'modifiers',
+        'noCursorTimeout',
+        'projection',
+        'readPreference',
+        'skip',
+        'sort',
+    ];
 
     /**
-     * @var Collection
+     * @var array
      */
-    private $collection;
+    protected $projection;
 
     /**
-     * @var Cursor
+     * @var array
      */
-    private $cursor;
+    protected $query;
 
-    /**
-     * @var IteratorIterator
-     */
-    private $iterator;
-
-    private $allowPartialResults;
-    private $awaitData;
-    private $batchSize;
-    private $flags;
-    private $hint;
-    private $limit;
-    private $maxTimeMS;
-    private $noCursorTimeout;
-    private $oplogReplay;
-    private $options = [];
-    private $projection;
-    private $readPreference = [];
-    private $skip;
-    private $snapshot;
-    private $sort;
-    private $tailable;
+    protected $allowPartialResults;
+    protected $awaitData;
+    protected $flags = 0;
+    protected $hint;
+    protected $limit;
+    protected $maxTimeMS;
+    protected $noCursorTimeout;
+    protected $options = [];
+    protected $skip;
+    protected $snapshot;
+    protected $sort;
+    protected $tailable;
 
     /**
      * Create a new cursor
@@ -94,19 +81,13 @@ class MongoCursor implements Iterator
      * @param string $ns Full name of database and collection.
      * @param array $query Database query.
      * @param array $fields Fields to return.
-     * @return MongoCursor Returns the new cursor
      */
     public function __construct(MongoClient $connection, $ns, array $query = array(), array $fields = array())
     {
-        $this->connection = $connection;
-        $this->ns = $ns;
+        parent::__construct($connection, $ns);
+
         $this->query = $query;
         $this->projection = $fields;
-
-        $nsParts = explode('.', $ns);
-        $db = array_shift($nsParts);
-
-        $this->collection = $connection->selectCollection($db, implode('.', $nsParts))->getCollection();
     }
 
     /**
@@ -139,20 +120,6 @@ class MongoCursor implements Iterator
         return $this;
     }
 
-    /**
-     * Limits the number of elements returned in one batch.
-     *
-     * @link http://docs.php.net/manual/en/mongocursor.batchsize.php
-     * @param int $batchSize The number of results to return per batch
-     * @return MongoCursor Returns this cursor.
-     */
-    public function batchSize($batchSize)
-    {
-        $this->errorIfOpened();
-        $this->batchSize = $batchSize;
-
-        return $this;
-    }
 
     /**
      * Counts the number of results for this query
@@ -166,35 +133,15 @@ class MongoCursor implements Iterator
             return iterator_count($this->ensureIterator());
         }
 
-        $optionNames = ['hint', 'limit', 'maxTimeMS', 'skip'];
-        $options = $foundOnly ? $this->applyOptions($this->options, $optionNames) : $this->options;
-
-        return $this->collection->count($this->query, $options);
-    }
-
-    /**
-     * Returns the current element
-     * @link http://www.php.net/manual/en/mongocursor.current.php
-     * @return array
-     */
-    public function current()
-    {
-        $document = $this->ensureIterator()->current();
-        if ($document !== null) {
-            $document = TypeConverter::convertObjectToLegacyArray($document);
+        $optionNames = ['hint', 'maxTimeMS'];
+        if ($foundOnly) {
+            $optionNames = array_merge($optionNames, ['limit', 'skip']);
         }
 
-        return $document;
-    }
+        $options = $this->getOptions($optionNames) + $this->options;
 
-    /**
-     * Checks if there are documents that have not been sent yet from the database for this cursor
-     * @link http://www.php.net/manual/en/mongocursor.dead.php
-     * @return boolean Returns if there are more results that have not been sent to the client, yet.
-     */
-    public function dead()
-    {
-        return $this->ensureCursor()->isDead();
+        $count = $this->collection->count($this->query, $options);
+        return $count;
     }
 
     /**
@@ -205,7 +152,7 @@ class MongoCursor implements Iterator
      */
     protected function doQuery()
     {
-        $options = $this->applyOptions($this->options);
+        $options = $this->getOptions() + $this->options;
 
         $this->cursor = $this->collection->find($this->query, $options);
     }
@@ -250,16 +197,6 @@ class MongoCursor implements Iterator
     }
 
     /**
-     * Get the read preference for this query
-     * @link http://www.php.net/manual/en/mongocursor.getreadpreference.php
-     * @return array
-     */
-    public function getReadPreference()
-    {
-        return $this->readPreference;
-    }
-
-    /**
      * Checks if there are any more elements in this cursor
      * @link http://www.php.net/manual/en/mongocursor.hasnext.php
      * @throws MongoConnectionException
@@ -303,66 +240,6 @@ class MongoCursor implements Iterator
     }
 
     /**
-     * Gets the query, fields, limit, and skip for this cursor
-     * @link http://www.php.net/manual/en/mongocursor.info.php
-     * @return array The query, fields, limit, and skip for this cursor as an associative array.
-     */
-    public function info()
-    {
-        $info = [
-            'ns' => $this->ns,
-            'limit' => $this->limit,
-            'batchSize' => $this->batchSize,
-            'skip' => $this->skip,
-            'flags' => $this->flags,
-            'query' => $this->query,
-            'fields' => $this->projection,
-            'started_iterating' => $this->cursor !== null,
-        ];
-
-        if ($info['started_iterating']) {
-            switch ($this->cursor->getServer()->getType()) {
-                case \MongoDB\Driver\Server::TYPE_ARBITER:
-                    $typeString = 'ARBITER';
-                    break;
-                case \MongoDB\Driver\Server::TYPE_MONGOS:
-                    $typeString = 'MONGOS';
-                    break;
-                case \MongoDB\Driver\Server::TYPE_PRIMARY:
-                    $typeString = 'PRIMARY';
-                    break;
-                case \MongoDB\Driver\Server::TYPE_SECONDARY:
-                    $typeString = 'SECONDARY';
-                    break;
-                default:
-                    $typeString = 'STANDALONE';
-            }
-
-            $info = array_merge($info, [
-                'id' => (string) $this->cursor->getId(),
-                'at' => null, // @todo Complete info for cursor that is iterating
-                'numReturned' => null, // @todo Complete info for cursor that is iterating
-                'server' => null, // @todo Complete info for cursor that is iterating
-                'host' => $this->cursor->getServer()->getHost(),
-                'port' => $this->cursor->getServer()->getPort(),
-                'connection_type_desc' => $typeString,
-            ]);
-        }
-
-        return $info;
-    }
-
-    /**
-     * Returns the current result's _id
-     * @link http://www.php.net/manual/en/mongocursor.key.php
-     * @return string The current result's _id as a string.
-     */
-    public function key()
-    {
-        return $this->ensureIterator()->key();
-    }
-
-    /**
      * Limits the number of results returned
      * @link http://www.php.net/manual/en/mongocursor.limit.php
      * @param int $num The number of results to return.
@@ -391,18 +268,6 @@ class MongoCursor implements Iterator
     }
 
     /**
-     * Advances the cursor to the next result
-     * @link http://www.php.net/manual/en/mongocursor.next.php
-     * @throws MongoConnectionException
-     * @throws MongoCursorTimeoutException
-     * @return void
-     */
-    public function next()
-    {
-        $this->ensureIterator()->next();
-    }
-
-    /**
      * @link http://www.php.net/manual/en/mongocursor.partial.php
      * @param bool $okay [optional] <p>If receiving partial results is okay.</p>
      * @return MongoCursor Returns this cursor.
@@ -421,21 +286,7 @@ class MongoCursor implements Iterator
      */
     public function reset()
     {
-        $this->cursor = null;
-        $this->iterator = null;
-    }
-
-    /**
-     * Returns the cursor to the beginning of the result set
-     * @throws MongoConnectionException
-     * @throws MongoCursorTimeoutException
-     * @return void
-     */
-    public function rewind()
-    {
-        // Note: rewinding the cursor means recreating it internally
-        $this->reset();
-        $this->ensureIterator()->rewind();
+        parent::reset();
     }
 
     /**
@@ -450,39 +301,6 @@ class MongoCursor implements Iterator
     }
 
     /**
-     * @link http://www.php.net/manual/en/mongocursor.setreadpreference.php
-     * @param string $readPreference
-     * @param array $tags
-     * @return MongoCursor Returns this cursor.
-     */
-    public function setReadPreference($readPreference, array $tags = [])
-    {
-        $availableReadPreferences = [
-            MongoClient::RP_PRIMARY,
-            MongoClient::RP_PRIMARY_PREFERRED,
-            MongoClient::RP_SECONDARY,
-            MongoClient::RP_SECONDARY_PREFERRED,
-            MongoClient::RP_NEAREST
-        ];
-        if (! in_array($readPreference, $availableReadPreferences)) {
-            trigger_error("The value '$readPreference' is not valid as read preference type", E_WARNING);
-            return $this;
-        }
-
-        if ($readPreference == MongoClient::RP_PRIMARY && count($tags)) {
-            trigger_error("You can't use read preference tags with a read preference of PRIMARY", E_WARNING);
-            return $this;
-        }
-
-        $this->readPreference = [
-            'type' => $readPreference,
-            'tagsets' => $tags
-        ];
-
-        return $this;
-    }
-
-    /**
      * Skips a number of results
      * @link http://www.php.net/manual/en/mongocursor.skip.php
      * @param int $num The number of results to skip.
@@ -555,71 +373,9 @@ class MongoCursor implements Iterator
     }
 
     /**
-     * Sets a client-side timeout for this query
-     * @link http://www.php.net/manual/en/mongocursor.timeout.php
-     * @param int $ms The number of milliseconds for the cursor to wait for a response. By default, the cursor will wait forever.
-     * @throws MongoCursorTimeoutException
-     * @return MongoCursor Returns this cursor
-     */
-    public function timeout($ms)
-    {
-        $this->notImplemented();
-    }
-
-    /**
-     * Checks if the cursor is reading a valid result.
-     * @link http://www.php.net/manual/en/mongocursor.valid.php
-     * @return boolean If the current result is not null.
-     */
-    public function valid()
-    {
-        return $this->ensureIterator()->valid();
-    }
-
-    /**
-     * Applies all options set on the cursor, overwriting any options that have already been set
-     *
-     * @param array $options Existing options array
-     * @param array $optionNames Array of option names to be applied (will be read from properties)
-     * @return array
-     */
-    private function applyOptions($options, $optionNames = null)
-    {
-        if ($optionNames === null) {
-            $optionNames = [
-                'allowPartialResults',
-                'batchSize',
-                'cursorType',
-                'limit',
-                'maxTimeMS',
-                'modifiers',
-                'noCursorTimeout',
-                'oplogReplay',
-                'projection',
-                'readPreference',
-                'skip',
-                'sort',
-            ];
-        }
-
-        foreach ($optionNames as $option) {
-            $converter = 'convert' . ucfirst($option);
-            $value = method_exists($this, $converter) ? $this->$converter() : $this->$option;
-
-            if ($value === null) {
-                continue;
-            }
-
-            $options[$option] = $value;
-        }
-
-        return $options;
-    }
-
-    /**
      * @return int|null
      */
-    private function convertCursorType()
+    protected function convertCursorType()
     {
         if (! $this->tailable) {
             return null;
@@ -628,7 +384,7 @@ class MongoCursor implements Iterator
         return $this->awaitData ? Find::TAILABLE_AWAIT : Find::TAILABLE;
     }
 
-    private function convertModifiers()
+    protected function convertModifiers()
     {
         $modifiers = array_key_exists('modifiers', $this->options) ? $this->options['modifiers'] : [];
 
@@ -644,41 +400,22 @@ class MongoCursor implements Iterator
     }
 
     /**
-     * @return ReadPreference|null
+     * {@inheritdoc}
      */
-    private function convertReadPreference()
+    protected function convertReadPreference()
     {
-        $type = array_key_exists('type', $this->readPreference) ? $this->readPreference['type'] : null;
-        if ($type === null) {
-            return static::$slaveOkay ? new ReadPreference(ReadPreference::RP_SECONDARY_PREFERRED) : null;
+        $readPreference = parent::convertReadPreference();
+        if ($readPreference === null && static::$slaveOkay) {
+            $readPreference = new ReadPreference(ReadPreference::RP_SECONDARY_PREFERRED);
         }
 
-        switch ($type) {
-            case MongoClient::RP_PRIMARY_PREFERRED:
-                $mode = ReadPreference::RP_PRIMARY_PREFERRED;
-                break;
-            case MongoClient::RP_SECONDARY:
-                $mode = ReadPreference::RP_SECONDARY;
-                break;
-            case MongoClient::RP_SECONDARY_PREFERRED:
-                $mode = ReadPreference::RP_SECONDARY_PREFERRED;
-                break;
-            case MongoClient::RP_NEAREST:
-                $mode = ReadPreference::RP_NEAREST;
-                break;
-            default:
-                $mode = ReadPreference::RP_PRIMARY;
-        }
-
-        $tagSets = array_key_exists('tagsets', $this->readPreference) ? $this->readPreference['tagsets'] : [];
-
-        return new ReadPreference($mode, $tagSets);
+        return $readPreference;
     }
 
     /**
      * @return Cursor
      */
-    private function ensureCursor()
+    protected function ensureCursor()
     {
         if ($this->cursor === null) {
             $this->doQuery();
@@ -687,29 +424,19 @@ class MongoCursor implements Iterator
         return $this->cursor;
     }
 
-    private function errorIfOpened()
-    {
-        if ($this->cursor === null) {
-            return;
-        }
-
-        throw new MongoCursorException('cannot modify cursor after beginning iteration.');
-    }
-
     /**
-     * @return IteratorIterator
+     * @return array
      */
-    private function ensureIterator()
-    {
-        if ($this->iterator === null) {
-            $this->iterator = new IteratorIterator($this->ensureCursor());
-        }
-
-        return $this->iterator;
-    }
-
-    protected function notImplemented()
+    protected function getCursorInfo()
     {
-        throw new \Exception('Not implemented');
+        return [
+            'ns' => $this->ns,
+            'limit' => $this->limit,
+            'batchSize' => $this->batchSize,
+            'skip' => $this->skip,
+            'flags' => $this->flags,
+            'query' => $this->query,
+            'fields' => $this->projection,
+        ];
     }
 }

+ 6 - 6
lib/Mongo/MongoCursorInterface.php

@@ -19,33 +19,33 @@ interface MongoCursorInterface extends Iterator
      * @param int $batchSize
      * @return MongoCursorInterface
      */
-    function batchSize($batchSize);
+    public function batchSize($batchSize);
 
     /**
      * @return bool
      */
-    function dead();
+    public function dead();
 
     /**
      * @return array
      */
-    function info();
+    public function info();
 
     /**
      * @return array
      */
-    function getReadPreference();
+    public function getReadPreference();
 
     /**
      * @param string $read_preference
      * @param array|null $tags
      * @return MongoCursorInterface
      */
-    function setReadPreference($read_preference, $tags = null);
+    public function setReadPreference($read_preference, $tags = null);
 
     /**
      * @param int $ms
      * @return MongoCursorInterface
      */
-    function timeout($ms);
+    public function timeout($ms);
 }

+ 12 - 2
lib/Mongo/MongoDB.php

@@ -377,9 +377,19 @@ class MongoDB
      * The resulting document's structure depends on the command,
      * but most results will have the ok field to indicate success or failure and results containing an array of each of the resulting documents.
      */
-    public function command(array $data, $options)
+    public function command(array $data, $options, &$hash)
     {
-        $this->notImplemented();
+        try {
+            $cursor = new \MongoCommandCursor($this->connection, $this->name, $data);
+
+            return iterator_to_array($cursor)[0];
+        } catch (\MongoDB\Driver\Exception\RuntimeException $e) {
+            return [
+                'ok' => 0,
+                'errmsg' => $e->getMessage(),
+                'code' => $e->getCode(),
+            ];
+        }
     }
 
     /**

+ 102 - 4
tests/Alcaeus/MongoDbAdapter/MongoCollectionTest.php

@@ -33,22 +33,107 @@ class MongoCollectionTest extends TestCase
 
     public function testFindReturnsCursor()
     {
+        $this->prepareData();
         $collection = $this->getCollection();
 
-        $collection->insert(['sorter' => 1]);
-
         $this->assertInstanceOf('MongoCursor', $collection->find());
     }
 
     public function testCount()
     {
+        $this->prepareData();
+
         $collection = $this->getCollection();
 
+        $this->assertSame(3, $collection->count());
+        $this->assertSame(2, $collection->count(['foo' => 'bar']));
+    }
+
+    public function testFindOne()
+    {
+        $this->prepareData();
+
+        $document = $this->getCollection()->findOne(['foo' => 'foo'], ['_id' => false]);
+        $this->assertEquals(['foo' => 'foo'], $document);
+    }
+
+    public function testDistinct()
+    {
+        $this->prepareData();
+
+        $values = $this->getCollection()->distinct('foo');
+        $this->assertInternalType('array', $values);
+
+        sort($values);
+        $this->assertEquals(['bar', 'foo'], $values);
+    }
+
+    public function testDistinctWithQuery()
+    {
+        $this->prepareData();
+
+        $values = $this->getCollection()->distinct('foo', ['foo' => 'bar']);
+        $this->assertInternalType('array', $values);
+        $this->assertEquals(['bar'], $values);
+    }
+
+    public function testAggregate()
+    {
+        $collection = $this->getCollection();
+
+        $collection->insert(['foo' => 'bar']);
         $collection->insert(['foo' => 'bar']);
         $collection->insert(['foo' => 'foo']);
 
-        $this->assertSame(2, $collection->count());
-        $this->assertSame(1, $collection->count(['foo' => 'bar']));
+        $pipeline = [
+            [
+                '$group' => [
+                    '_id' => '$foo',
+                    'count' => [ '$sum' => 1 ],
+                ],
+            ],
+            [
+                '$sort' => ['_id' => 1]
+            ]
+        ];
+
+        $result = $collection->aggregate($pipeline);
+        $this->assertInternalType('array', $result);
+        $this->assertArrayHasKey('result', $result);
+
+        $this->assertEquals([
+            ['_id' => 'bar', 'count' => 2],
+            ['_id' => 'foo', 'count' => 1],
+        ], $result['result']);
+    }
+
+    public function testAggregateCursor()
+    {
+        $collection = $this->getCollection();
+
+        $collection->insert(['foo' => 'bar']);
+        $collection->insert(['foo' => 'bar']);
+        $collection->insert(['foo' => 'foo']);
+
+        $pipeline = [
+            [
+                '$group' => [
+                    '_id' => '$foo',
+                    'count' => [ '$sum' => 1 ],
+                ],
+            ],
+            [
+                '$sort' => ['_id' => 1]
+            ]
+        ];
+
+        $cursor = $collection->aggregateCursor($pipeline);
+        $this->assertInstanceOf('MongoCommandCursor', $cursor);
+
+        $this->assertEquals([
+            ['_id' => 'bar', 'count' => 2],
+            ['_id' => 'foo', 'count' => 1],
+        ], iterator_to_array($cursor));
     }
 
     /**
@@ -60,4 +145,17 @@ class MongoCollectionTest extends TestCase
 
         return $client->selectCollection('mongo-php-adapter', $name);
     }
+
+    /**
+     * @return \MongoCollection
+     */
+    protected function prepareData()
+    {
+        $collection = $this->getCollection();
+
+        $collection->insert(['foo' => 'bar']);
+        $collection->insert(['foo' => 'bar']);
+        $collection->insert(['foo' => 'foo']);
+        return $collection;
+    }
 }

+ 77 - 0
tests/Alcaeus/MongoDbAdapter/MongoCommandCursorTest.php

@@ -0,0 +1,77 @@
+<?php
+
+namespace Alcaeus\MongoDbAdapter\Tests;
+use MongoDB\Driver\ReadPreference;
+use MongoDB\Operation\Find;
+
+/**
+ * @author alcaeus <alcaeus@alcaeus.org>
+ */
+class MongoCommandCursorTest extends TestCase
+{
+    public function testInfo()
+    {
+        $this->prepareData();
+        $cursor = $this->getCollection()->aggregateCursor([['$match' => ['foo' => 'bar']]]);
+
+        $expected = [
+            'ns' => 'mongo-php-adapter.test',
+            'limit' => 0,
+            'batchSize' => null,
+            'skip' => 0,
+            'flags' => 0,
+            'query' => [
+                'aggregate' => 'test',
+                'pipeline' => [
+                    [
+                        '$match' => ['foo' => 'bar']
+                    ]
+                ],
+                'cursor' => new \stdClass()
+            ],
+            'fields' => null,
+            'started_iterating' => false,
+        ];
+        $this->assertEquals($expected, $cursor->info());
+
+        // Ensure cursor started iterating
+        iterator_to_array($cursor);
+
+        $expected['started_iterating'] = true;
+        $expected += [
+            'id' => '0',
+            'at' => null,
+            'numReturned' => null,
+            'server' => null,
+            'host' => 'localhost',
+            'port' => 27017,
+            'connection_type_desc' => 'STANDALONE'
+        ];
+
+        $this->assertEquals($expected, $cursor->info());
+    }
+
+    /**
+     * @param string $name
+     * @return \MongoCollection
+     */
+    protected function getCollection($name = 'test')
+    {
+        $client = new \MongoClient();
+
+        return $client->selectCollection('mongo-php-adapter', $name);
+    }
+
+    /**
+     * @return \MongoCollection
+     */
+    protected function prepareData()
+    {
+        $collection = $this->getCollection();
+
+        $collection->insert(['foo' => 'bar']);
+        $collection->insert(['foo' => 'bar']);
+        $collection->insert(['foo' => 'foo']);
+        return $collection;
+    }
+}

+ 37 - 0
tests/Alcaeus/MongoDbAdapter/MongoCursorTest.php

@@ -201,6 +201,43 @@ class MongoCursorTest extends TestCase
         return $tests;
     }
 
+    public function testCursorInfo()
+    {
+        $this->prepareData();
+
+        $collection = $this->getCollection();
+        $cursor = $collection->find(['foo' => 'bar'], ['_id' => false])->skip(1)->limit(3);
+
+        $expected = [
+            'ns' => 'mongo-php-adapter.test',
+            'limit' => 3,
+            'batchSize' => null,
+            'skip' => 1,
+            'flags' => 0,
+            'query' => ['foo' => 'bar'],
+            'fields' => ['_id' => false],
+            'started_iterating' => false,
+        ];
+
+        $this->assertSame($expected, $cursor->info());
+
+        // Ensure cursor started iterating
+        iterator_to_array($cursor);
+
+        $expected['started_iterating'] = true;
+        $expected += [
+            'id' => '0',
+            'at' => null,
+            'numReturned' => null,
+            'server' => null,
+            'host' => 'localhost',
+            'port' => 27017,
+            'connection_type_desc' => 'STANDALONE'
+        ];
+
+        $this->assertSame($expected, $cursor->info());
+    }
+
     /**
      * @param string $name
      * @return \MongoCollection

+ 18 - 0
tests/Alcaeus/MongoDbAdapter/MongoDBTest.php

@@ -23,6 +23,24 @@ class MongoDBTest extends TestCase
         $this->assertSame('mongo-php-adapter.test', (string) $collection);
     }
 
+    public function testCommand()
+    {
+        $db = $this->getDatabase();
+        $this->assertEquals(['ok' => 1], $db->command(['ping' => 1], [], $hash));
+    }
+
+    public function testCommandError()
+    {
+        $db = $this->getDatabase();
+        $expected = [
+            'ok' => 0,
+            'errmsg' => 'listDatabases may only be run against the admin database.',
+            'code' => 13,
+        ];
+
+        $this->assertEquals($expected, $db->command(['listDatabases' => 1], [], $hash));
+    }
+
     /**
      * @return \MongoDB
      */