You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

367 lines
8.1 KiB

  1. /* eslint no-empty: 1 */
  2. /*!
  3. * Module dependencies.
  4. */
  5. var Stream = require('stream').Stream;
  6. var utils = require('./utils');
  7. var helpers = require('./queryhelpers');
  8. var K = function(k) {
  9. return k;
  10. };
  11. /**
  12. * Provides a Node.js 0.8 style [ReadStream](http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream) interface for Queries.
  13. *
  14. * var stream = Model.find().stream();
  15. *
  16. * stream.on('data', function (doc) {
  17. * // do something with the mongoose document
  18. * }).on('error', function (err) {
  19. * // handle the error
  20. * }).on('close', function () {
  21. * // the stream is closed
  22. * });
  23. *
  24. *
  25. * The stream interface allows us to simply "plug-in" to other _Node.js 0.8_ style write streams.
  26. *
  27. * Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);
  28. *
  29. * ####Valid options
  30. *
  31. * - `transform`: optional function which accepts a mongoose document. The return value of the function will be emitted on `data`.
  32. *
  33. * ####Example
  34. *
  35. * // JSON.stringify all documents before emitting
  36. * var stream = Thing.find().stream({ transform: JSON.stringify });
  37. * stream.pipe(writeStream);
  38. *
  39. * _NOTE: plugging into an HTTP response will *not* work out of the box. Those streams expect only strings or buffers to be emitted, so first formatting our documents as strings/buffers is necessary._
  40. *
  41. * _NOTE: these streams are Node.js 0.8 style read streams which differ from Node.js 0.10 style. Node.js 0.10 streams are not well tested yet and are not guaranteed to work._
  42. *
  43. * @param {Query} query
  44. * @param {Object} [options]
  45. * @inherits NodeJS Stream http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream
  46. * @event `data`: emits a single Mongoose document
  47. * @event `error`: emits when an error occurs during streaming. This will emit _before_ the `close` event.
  48. * @event `close`: emits when the stream reaches the end of the cursor or an error occurs, or the stream is manually `destroy`ed. After this event, no more events are emitted.
  49. * @api public
  50. */
  51. function QueryStream(query, options) {
  52. Stream.call(this);
  53. this.query = query;
  54. this.readable = true;
  55. this.paused = false;
  56. this._cursor = null;
  57. this._destroyed = null;
  58. this._fields = null;
  59. this._buffer = null;
  60. this._inline = T_INIT;
  61. this._running = false;
  62. this._transform = options && typeof options.transform === 'function'
  63. ? options.transform
  64. : K;
  65. // give time to hook up events
  66. var _this = this;
  67. process.nextTick(function() {
  68. _this._init();
  69. });
  70. }
  71. /*!
  72. * Inherit from Stream
  73. */
  74. QueryStream.prototype.__proto__ = Stream.prototype;
  75. /**
  76. * Flag stating whether or not this stream is readable.
  77. *
  78. * @property readable
  79. * @api public
  80. */
  81. QueryStream.prototype.readable;
  82. /**
  83. * Flag stating whether or not this stream is paused.
  84. *
  85. * @property paused
  86. * @api public
  87. */
  88. QueryStream.prototype.paused;
  89. // trampoline flags
  90. var T_INIT = 0;
  91. var T_IDLE = 1;
  92. var T_CONT = 2;
  93. /**
  94. * Initializes the query.
  95. *
  96. * @api private
  97. */
  98. QueryStream.prototype._init = function() {
  99. if (this._destroyed) {
  100. return;
  101. }
  102. var query = this.query,
  103. model = query.model,
  104. options = query._optionsForExec(model),
  105. _this = this;
  106. try {
  107. query.cast(model);
  108. } catch (err) {
  109. return _this.destroy(err);
  110. }
  111. _this._fields = utils.clone(query._fields);
  112. options.fields = query._castFields(_this._fields);
  113. model.collection.find(query._conditions, options, function(err, cursor) {
  114. if (err) {
  115. return _this.destroy(err);
  116. }
  117. _this._cursor = cursor;
  118. _this._next();
  119. });
  120. };
  121. /**
  122. * Trampoline for pulling the next doc from cursor.
  123. *
  124. * @see QueryStream#__next #querystream_QueryStream-__next
  125. * @api private
  126. */
  127. QueryStream.prototype._next = function _next() {
  128. if (this.paused || this._destroyed) {
  129. this._running = false;
  130. return this._running;
  131. }
  132. this._running = true;
  133. if (this._buffer && this._buffer.length) {
  134. var arg;
  135. while (!this.paused && !this._destroyed && (arg = this._buffer.shift())) { // eslint-disable-line no-cond-assign
  136. this._onNextObject.apply(this, arg);
  137. }
  138. }
  139. // avoid stack overflows with large result sets.
  140. // trampoline instead of recursion.
  141. while (this.__next()) {
  142. }
  143. };
  144. /**
  145. * Pulls the next doc from the cursor.
  146. *
  147. * @see QueryStream#_next #querystream_QueryStream-_next
  148. * @api private
  149. */
  150. QueryStream.prototype.__next = function() {
  151. if (this.paused || this._destroyed) {
  152. this._running = false;
  153. return this._running;
  154. }
  155. var _this = this;
  156. _this._inline = T_INIT;
  157. _this._cursor.nextObject(function cursorcb(err, doc) {
  158. _this._onNextObject(err, doc);
  159. });
  160. // if onNextObject() was already called in this tick
  161. // return ourselves to the trampoline.
  162. if (T_CONT === this._inline) {
  163. return true;
  164. }
  165. // onNextObject() hasn't fired yet. tell onNextObject
  166. // that its ok to call _next b/c we are not within
  167. // the trampoline anymore.
  168. this._inline = T_IDLE;
  169. };
  170. /**
  171. * Transforms raw `doc`s returned from the cursor into a model instance.
  172. *
  173. * @param {Error|null} err
  174. * @param {Object} doc
  175. * @api private
  176. */
  177. QueryStream.prototype._onNextObject = function _onNextObject(err, doc) {
  178. if (this._destroyed) {
  179. return;
  180. }
  181. if (this.paused) {
  182. this._buffer || (this._buffer = []);
  183. this._buffer.push([err, doc]);
  184. this._running = false;
  185. return this._running;
  186. }
  187. if (err) {
  188. return this.destroy(err);
  189. }
  190. // when doc is null we hit the end of the cursor
  191. if (!doc) {
  192. this.emit('end');
  193. return this.destroy();
  194. }
  195. var opts = this.query._mongooseOptions;
  196. if (!opts.populate) {
  197. return opts.lean === true ?
  198. emit(this, doc) :
  199. createAndEmit(this, null, doc);
  200. }
  201. var _this = this;
  202. var pop = helpers.preparePopulationOptionsMQ(_this.query, _this.query._mongooseOptions);
  203. // Hack to work around gh-3108
  204. pop.forEach(function(option) {
  205. delete option.model;
  206. });
  207. pop.__noPromise = true;
  208. _this.query.model.populate(doc, pop, function(err, doc) {
  209. if (err) {
  210. return _this.destroy(err);
  211. }
  212. return opts.lean === true ?
  213. emit(_this, doc) :
  214. createAndEmit(_this, pop, doc);
  215. });
  216. };
  217. function createAndEmit(self, populatedIds, doc) {
  218. var instance = helpers.createModel(self.query.model, doc, self._fields);
  219. var opts = populatedIds ?
  220. {populated: populatedIds} :
  221. undefined;
  222. instance.init(doc, opts, function(err) {
  223. if (err) {
  224. return self.destroy(err);
  225. }
  226. emit(self, instance);
  227. });
  228. }
  229. /*!
  230. * Emit a data event and manage the trampoline state
  231. */
  232. function emit(self, doc) {
  233. self.emit('data', self._transform(doc));
  234. // trampoline management
  235. if (T_IDLE === self._inline) {
  236. // no longer in trampoline. restart it.
  237. self._next();
  238. } else {
  239. // in a trampoline. tell __next that its
  240. // ok to continue jumping.
  241. self._inline = T_CONT;
  242. }
  243. }
  244. /**
  245. * Pauses this stream.
  246. *
  247. * @api public
  248. */
  249. QueryStream.prototype.pause = function() {
  250. this.paused = true;
  251. };
  252. /**
  253. * Resumes this stream.
  254. *
  255. * @api public
  256. */
  257. QueryStream.prototype.resume = function() {
  258. this.paused = false;
  259. if (!this._cursor) {
  260. // cannot start if not initialized
  261. return;
  262. }
  263. // are we within the trampoline?
  264. if (T_INIT === this._inline) {
  265. return;
  266. }
  267. if (!this._running) {
  268. // outside QueryStream control, need manual restart
  269. return this._next();
  270. }
  271. };
  272. /**
  273. * Destroys the stream, closing the underlying cursor, which emits the close event. No more events will be emitted after the close event.
  274. *
  275. * @param {Error} [err]
  276. * @api public
  277. */
  278. QueryStream.prototype.destroy = function(err) {
  279. if (this._destroyed) {
  280. return;
  281. }
  282. this._destroyed = true;
  283. this._running = false;
  284. this.readable = false;
  285. if (this._cursor) {
  286. this._cursor.close();
  287. }
  288. if (err) {
  289. this.emit('error', err);
  290. }
  291. this.emit('close');
  292. };
  293. /**
  294. * Pipes this query stream into another stream. This method is inherited from NodeJS Streams.
  295. *
  296. * ####Example:
  297. *
  298. * query.stream().pipe(writeStream [, options])
  299. *
  300. * @method pipe
  301. * @memberOf QueryStream
  302. * @see NodeJS http://nodejs.org/api/stream.html
  303. * @api public
  304. */
  305. /*!
  306. * Module exports
  307. */
  308. module.exports = exports = QueryStream;