AbstractCursor.php 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. <?php
  2. /*
  3. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  4. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  5. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  6. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  7. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  8. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  9. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  10. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  11. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  12. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  13. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  14. */
  15. namespace Alcaeus\MongoDbAdapter;
  16. use Alcaeus\MongoDbAdapter\Helper\ReadPreference;
  17. use MongoDB\Collection;
  18. use MongoDB\Driver\Cursor;
  19. /**
  20. * @internal
  21. */
  22. abstract class AbstractCursor
  23. {
  24. use ReadPreference;
  25. /**
  26. * @var int
  27. */
  28. protected $batchSize;
  29. /**
  30. * @var Collection
  31. */
  32. protected $collection;
  33. /**
  34. * @var \MongoClient
  35. */
  36. protected $connection;
  37. /**
  38. * @var Cursor
  39. */
  40. protected $cursor;
  41. /**
  42. * @var \MongoDB\Database
  43. */
  44. protected $db;
  45. /**
  46. * @var \Iterator
  47. */
  48. protected $iterator;
  49. /**
  50. * @var string
  51. */
  52. protected $ns;
  53. /**
  54. * @var bool
  55. */
  56. protected $startedIterating = false;
  57. /**
  58. * @var array
  59. */
  60. protected $optionNames = [
  61. 'batchSize',
  62. 'readPreference',
  63. ];
  64. /**
  65. * @return Cursor
  66. */
  67. abstract protected function ensureCursor();
  68. /**
  69. * @return array
  70. */
  71. abstract protected function getCursorInfo();
  72. /**
  73. * Create a new cursor
  74. * @link http://www.php.net/manual/en/mongocursor.construct.php
  75. * @param \MongoClient $connection Database connection.
  76. * @param string $ns Full name of database and collection.
  77. */
  78. public function __construct(\MongoClient $connection, $ns)
  79. {
  80. $this->connection = $connection;
  81. $this->ns = $ns;
  82. $nsParts = explode('.', $ns);
  83. $dbName = array_shift($nsParts);
  84. $collectionName = implode('.', $nsParts);
  85. $this->db = $connection->selectDB($dbName)->getDb();
  86. if ($collectionName) {
  87. $this->collection = $connection->selectCollection($dbName, $collectionName)->getCollection();
  88. }
  89. }
  90. /**
  91. * Returns the current element
  92. * @link http://www.php.net/manual/en/mongocursor.current.php
  93. * @return array
  94. */
  95. public function current()
  96. {
  97. $this->startedIterating = true;
  98. $document = $this->ensureIterator()->current();
  99. if ($document !== null) {
  100. $document = TypeConverter::toLegacy($document);
  101. }
  102. return $document;
  103. }
  104. /**
  105. * Returns the current result's _id
  106. * @link http://www.php.net/manual/en/mongocursor.key.php
  107. * @return string The current result's _id as a string.
  108. */
  109. public function key()
  110. {
  111. return $this->ensureIterator()->key();
  112. }
  113. /**
  114. * Advances the cursor to the next result, and returns that result
  115. * @link http://www.php.net/manual/en/mongocursor.next.php
  116. * @throws \MongoConnectionException
  117. * @throws \MongoCursorTimeoutException
  118. * @return array Returns the next object
  119. */
  120. public function next()
  121. {
  122. if (!$this->startedIterating) {
  123. $this->ensureIterator();
  124. $this->startedIterating = true;
  125. } else {
  126. $this->ensureIterator()->next();
  127. }
  128. return $this->current();
  129. }
  130. /**
  131. * Returns the cursor to the beginning of the result set
  132. * @throws \MongoConnectionException
  133. * @throws \MongoCursorTimeoutException
  134. * @return void
  135. */
  136. public function rewind()
  137. {
  138. // We can recreate the cursor to allow it to be rewound
  139. $this->reset();
  140. $this->startedIterating = true;
  141. $this->ensureIterator()->rewind();
  142. }
  143. /**
  144. * Checks if the cursor is reading a valid result.
  145. * @link http://www.php.net/manual/en/mongocursor.valid.php
  146. * @return boolean If the current result is not null.
  147. */
  148. public function valid()
  149. {
  150. return $this->ensureIterator()->valid();
  151. }
  152. /**
  153. * Limits the number of elements returned in one batch.
  154. *
  155. * @link http://docs.php.net/manual/en/mongocursor.batchsize.php
  156. * @param int $batchSize The number of results to return per batch
  157. * @return $this Returns this cursor.
  158. */
  159. public function batchSize($batchSize)
  160. {
  161. $this->batchSize = $batchSize;
  162. return $this;
  163. }
  164. /**
  165. * Checks if there are documents that have not been sent yet from the database for this cursor
  166. * @link http://www.php.net/manual/en/mongocursor.dead.php
  167. * @return boolean Returns if there are more results that have not been sent to the client, yet.
  168. */
  169. public function dead()
  170. {
  171. return $this->ensureCursor()->isDead();
  172. }
  173. /**
  174. * @return array
  175. */
  176. public function info()
  177. {
  178. return $this->getCursorInfo() + $this->getIterationInfo();
  179. }
  180. /**
  181. * @link http://www.php.net/manual/en/mongocursor.setreadpreference.php
  182. * @param string $readPreference
  183. * @param array $tags
  184. * @return $this Returns this cursor.
  185. */
  186. public function setReadPreference($readPreference, $tags = null)
  187. {
  188. $this->setReadPreferenceFromParameters($readPreference, $tags);
  189. return $this;
  190. }
  191. /**
  192. * Sets a client-side timeout for this query
  193. * @link http://www.php.net/manual/en/mongocursor.timeout.php
  194. * @param int $ms The number of milliseconds for the cursor to wait for a response. By default, the cursor will wait forever.
  195. * @return $this Returns this cursor
  196. */
  197. public function timeout($ms)
  198. {
  199. $this->notImplemented();
  200. }
  201. /**
  202. * Applies all options set on the cursor, overwriting any options that have already been set
  203. *
  204. * @param array $optionNames Array of option names to be applied (will be read from properties)
  205. * @return array
  206. */
  207. protected function getOptions($optionNames = null)
  208. {
  209. $options = [];
  210. if ($optionNames === null) {
  211. $optionNames = $this->optionNames;
  212. }
  213. foreach ($optionNames as $option) {
  214. $converter = 'convert' . ucfirst($option);
  215. $value = method_exists($this, $converter) ? $this->$converter() : $this->$option;
  216. if ($value === null) {
  217. continue;
  218. }
  219. $options[$option] = $value;
  220. }
  221. return $options;
  222. }
  223. /**
  224. * @return \Iterator
  225. */
  226. protected function ensureIterator()
  227. {
  228. if ($this->iterator === null) {
  229. // MongoDB\Driver\Cursor needs to be wrapped into a \Generator so that a valid \Iterator with working implementations of
  230. // next, current, valid, key and rewind is returned. These methods don't work if we wrap the Cursor inside an \IteratorIterator
  231. $this->iterator = $this->wrapTraversable($this->ensureCursor());
  232. }
  233. return $this->iterator;
  234. }
  235. /**
  236. * @param \Traversable $traversable
  237. * @return \Generator
  238. */
  239. protected function wrapTraversable(\Traversable $traversable)
  240. {
  241. foreach ($traversable as $key => $value) {
  242. yield $key => $value;
  243. }
  244. }
  245. /**
  246. * @throws \MongoCursorException
  247. */
  248. protected function errorIfOpened()
  249. {
  250. if ($this->cursor === null) {
  251. return;
  252. }
  253. throw new \MongoCursorException('cannot modify cursor after beginning iteration.');
  254. }
  255. /**
  256. * @return array
  257. */
  258. protected function getIterationInfo()
  259. {
  260. $iterationInfo = [
  261. 'started_iterating' => $this->cursor !== null,
  262. ];
  263. if ($this->cursor !== null) {
  264. switch ($this->cursor->getServer()->getType()) {
  265. case \MongoDB\Driver\Server::TYPE_RS_ARBITER:
  266. $typeString = 'ARBITER';
  267. break;
  268. case \MongoDB\Driver\Server::TYPE_MONGOS:
  269. $typeString = 'MONGOS';
  270. break;
  271. case \MongoDB\Driver\Server::TYPE_RS_PRIMARY:
  272. $typeString = 'PRIMARY';
  273. break;
  274. case \MongoDB\Driver\Server::TYPE_RS_SECONDARY:
  275. $typeString = 'SECONDARY';
  276. break;
  277. default:
  278. $typeString = 'STANDALONE';
  279. }
  280. $iterationInfo += [
  281. 'id' => (string) $this->cursor->getId(),
  282. 'at' => null, // @todo Complete info for cursor that is iterating
  283. 'numReturned' => null, // @todo Complete info for cursor that is iterating
  284. 'server' => sprintf('%s:%d;-;.;%d', $this->cursor->getServer()->getHost(), $this->cursor->getServer()->getPort(), getmypid()),
  285. 'host' => $this->cursor->getServer()->getHost(),
  286. 'port' => $this->cursor->getServer()->getPort(),
  287. 'connection_type_desc' => $typeString,
  288. ];
  289. }
  290. return $iterationInfo;
  291. }
  292. /**
  293. * @throws \Exception
  294. */
  295. protected function notImplemented()
  296. {
  297. throw new \Exception('Not implemented');
  298. }
  299. /**
  300. * Clears the cursor
  301. *
  302. * This is generic but implemented as protected since it's only exposed in MongoCursor
  303. */
  304. protected function reset()
  305. {
  306. $this->startedIterating = false;
  307. $this->cursor = null;
  308. $this->iterator = null;
  309. }
  310. }