AbstractCursor.php 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. <?php
  2. /*
  3. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  4. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  5. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  6. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  7. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  8. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  9. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  10. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  11. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  12. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  13. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  14. */
  15. namespace Alcaeus\MongoDbAdapter;
  16. use Alcaeus\MongoDbAdapter\Helper\ReadPreference;
  17. use MongoDB\Collection;
  18. use MongoDB\Driver\Cursor;
  19. /**
  20. * @internal
  21. */
  22. abstract class AbstractCursor
  23. {
  24. use ReadPreference;
  25. /**
  26. * @var int
  27. */
  28. protected $batchSize = 0;
  29. /**
  30. * @var Collection
  31. */
  32. protected $collection;
  33. /**
  34. * @var \MongoClient
  35. */
  36. protected $connection;
  37. /**
  38. * @var Cursor
  39. */
  40. protected $cursor;
  41. /**
  42. * @var \MongoDB\Database
  43. */
  44. protected $db;
  45. /**
  46. * @var \Iterator
  47. */
  48. protected $iterator;
  49. /**
  50. * @var string
  51. */
  52. protected $ns;
  53. /**
  54. * @var bool
  55. */
  56. protected $startedIterating = false;
  57. /**
  58. * @var int
  59. */
  60. protected $position = 0;
  61. /**
  62. * @var array
  63. */
  64. protected $optionNames = [
  65. 'batchSize',
  66. 'readPreference',
  67. ];
  68. /**
  69. * @return Cursor
  70. */
  71. abstract protected function ensureCursor();
  72. /**
  73. * @return array
  74. */
  75. abstract protected function getCursorInfo();
  76. /**
  77. * Create a new cursor
  78. * @link http://www.php.net/manual/en/mongocursor.construct.php
  79. * @param \MongoClient $connection Database connection.
  80. * @param string $ns Full name of database and collection.
  81. */
  82. public function __construct(\MongoClient $connection, $ns)
  83. {
  84. $this->connection = $connection;
  85. $this->ns = $ns;
  86. $nsParts = explode('.', $ns);
  87. $dbName = array_shift($nsParts);
  88. $collectionName = implode('.', $nsParts);
  89. $this->db = $connection->selectDB($dbName)->getDb();
  90. if ($collectionName) {
  91. $this->collection = $connection->selectCollection($dbName, $collectionName)->getCollection();
  92. }
  93. }
  94. /**
  95. * Returns the current element
  96. * @link http://www.php.net/manual/en/mongocursor.current.php
  97. * @return array
  98. */
  99. public function current()
  100. {
  101. if (! $this->startedIterating) {
  102. return null;
  103. }
  104. $document = $this->ensureIterator()->current();
  105. if ($document !== null) {
  106. $document = TypeConverter::toLegacy($document);
  107. }
  108. return $document;
  109. }
  110. /**
  111. * Returns the current result's _id
  112. * @link http://www.php.net/manual/en/mongocursor.key.php
  113. * @return string The current result's _id as a string.
  114. */
  115. public function key()
  116. {
  117. if (! $this->startedIterating) {
  118. return null;
  119. }
  120. return $this->ensureIterator()->key();
  121. }
  122. /**
  123. * Advances the cursor to the next result, and returns that result
  124. * @link http://www.php.net/manual/en/mongocursor.next.php
  125. * @throws \MongoConnectionException
  126. * @throws \MongoCursorTimeoutException
  127. * @return array Returns the next object
  128. */
  129. public function next()
  130. {
  131. if (! $this->startedIterating) {
  132. $this->ensureIterator();
  133. $this->startedIterating = true;
  134. } else {
  135. $this->ensureIterator()->next();
  136. $this->position++;
  137. }
  138. return $this->current();
  139. }
  140. /**
  141. * Returns the cursor to the beginning of the result set
  142. * @throws \MongoConnectionException
  143. * @throws \MongoCursorTimeoutException
  144. * @return void
  145. */
  146. public function rewind()
  147. {
  148. // We can recreate the cursor to allow it to be rewound
  149. $this->reset();
  150. $this->startedIterating = true;
  151. $this->position = 0;
  152. $this->ensureIterator()->rewind();
  153. }
  154. /**
  155. * Checks if the cursor is reading a valid result.
  156. * @link http://www.php.net/manual/en/mongocursor.valid.php
  157. * @return boolean If the current result is not null.
  158. */
  159. public function valid()
  160. {
  161. if (! $this->startedIterating) {
  162. return false;
  163. }
  164. return $this->ensureIterator()->valid();
  165. }
  166. /**
  167. * Limits the number of elements returned in one batch.
  168. *
  169. * @link http://docs.php.net/manual/en/mongocursor.batchsize.php
  170. * @param int $batchSize The number of results to return per batch
  171. * @return $this Returns this cursor.
  172. */
  173. public function batchSize($batchSize)
  174. {
  175. $this->batchSize = $batchSize;
  176. return $this;
  177. }
  178. /**
  179. * Checks if there are documents that have not been sent yet from the database for this cursor
  180. * @link http://www.php.net/manual/en/mongocursor.dead.php
  181. * @return boolean Returns if there are more results that have not been sent to the client, yet.
  182. */
  183. public function dead()
  184. {
  185. return $this->ensureCursor()->isDead();
  186. }
  187. /**
  188. * @return array
  189. */
  190. public function info()
  191. {
  192. return $this->getCursorInfo() + $this->getIterationInfo();
  193. }
  194. /**
  195. * @link http://www.php.net/manual/en/mongocursor.setreadpreference.php
  196. * @param string $readPreference
  197. * @param array $tags
  198. * @return $this Returns this cursor.
  199. */
  200. public function setReadPreference($readPreference, $tags = null)
  201. {
  202. $this->setReadPreferenceFromParameters($readPreference, $tags);
  203. return $this;
  204. }
  205. /**
  206. * Sets a client-side timeout for this query
  207. * @link http://www.php.net/manual/en/mongocursor.timeout.php
  208. * @param int $ms The number of milliseconds for the cursor to wait for a response. By default, the cursor will wait forever.
  209. * @return $this Returns this cursor
  210. */
  211. public function timeout($ms)
  212. {
  213. $this->notImplemented();
  214. }
  215. /**
  216. * Applies all options set on the cursor, overwriting any options that have already been set
  217. *
  218. * @param array $optionNames Array of option names to be applied (will be read from properties)
  219. * @return array
  220. */
  221. protected function getOptions($optionNames = null)
  222. {
  223. $options = [];
  224. if ($optionNames === null) {
  225. $optionNames = $this->optionNames;
  226. }
  227. foreach ($optionNames as $option) {
  228. $converter = 'convert' . ucfirst($option);
  229. $value = method_exists($this, $converter) ? $this->$converter() : $this->$option;
  230. if ($value === null) {
  231. continue;
  232. }
  233. $options[$option] = $value;
  234. }
  235. return $options;
  236. }
  237. /**
  238. * @return \Iterator
  239. */
  240. protected function ensureIterator()
  241. {
  242. if ($this->iterator === null) {
  243. // MongoDB\Driver\Cursor needs to be wrapped into a \Generator so that a valid \Iterator with working implementations of
  244. // next, current, valid, key and rewind is returned. These methods don't work if we wrap the Cursor inside an \IteratorIterator
  245. $this->iterator = $this->wrapTraversable($this->ensureCursor());
  246. }
  247. return $this->iterator;
  248. }
  249. /**
  250. * @param \Traversable $traversable
  251. * @return \Generator
  252. */
  253. protected function wrapTraversable(\Traversable $traversable)
  254. {
  255. foreach ($traversable as $key => $value) {
  256. yield $key => $value;
  257. }
  258. }
  259. /**
  260. * @throws \MongoCursorException
  261. */
  262. protected function errorIfOpened()
  263. {
  264. if ($this->cursor === null) {
  265. return;
  266. }
  267. throw new \MongoCursorException('cannot modify cursor after beginning iteration.');
  268. }
  269. /**
  270. * @return array
  271. */
  272. protected function getIterationInfo()
  273. {
  274. $iterationInfo = [
  275. 'started_iterating' => $this->cursor !== null,
  276. ];
  277. if ($this->cursor !== null) {
  278. switch ($this->cursor->getServer()->getType()) {
  279. case \MongoDB\Driver\Server::TYPE_RS_ARBITER:
  280. $typeString = 'ARBITER';
  281. break;
  282. case \MongoDB\Driver\Server::TYPE_MONGOS:
  283. $typeString = 'MONGOS';
  284. break;
  285. case \MongoDB\Driver\Server::TYPE_RS_PRIMARY:
  286. $typeString = 'PRIMARY';
  287. break;
  288. case \MongoDB\Driver\Server::TYPE_RS_SECONDARY:
  289. $typeString = 'SECONDARY';
  290. break;
  291. default:
  292. $typeString = 'STANDALONE';
  293. }
  294. $cursorId = (string) $this->cursor->getId();
  295. $iterationInfo += [
  296. 'id' => (int) $cursorId,
  297. 'at' => $this->position,
  298. 'numReturned' => $this->position, // This can't be obtained from the new cursor
  299. 'server' => sprintf('%s:%d;-;.;%d', $this->cursor->getServer()->getHost(), $this->cursor->getServer()->getPort(), getmypid()),
  300. 'host' => $this->cursor->getServer()->getHost(),
  301. 'port' => $this->cursor->getServer()->getPort(),
  302. 'connection_type_desc' => $typeString,
  303. ];
  304. }
  305. return $iterationInfo;
  306. }
  307. /**
  308. * @throws \Exception
  309. */
  310. protected function notImplemented()
  311. {
  312. throw new \Exception('Not implemented');
  313. }
  314. /**
  315. * Clears the cursor
  316. *
  317. * This is generic but implemented as protected since it's only exposed in MongoCursor
  318. */
  319. protected function reset()
  320. {
  321. $this->startedIterating = false;
  322. $this->cursor = null;
  323. $this->iterator = null;
  324. }
  325. }