You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

336 lines
7.6 KiB

  1. /*!
  2. * Module dependencies.
  3. */
  4. var Stream = require('stream').Stream
  5. var utils = require('./utils')
  6. var helpers = require('./queryhelpers')
  7. var K = function(k){ return k }
  8. /**
  9. * Provides a Node.js 0.8 style [ReadStream](http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream) interface for Queries.
  10. *
  11. * var stream = Model.find().stream();
  12. *
  13. * stream.on('data', function (doc) {
  14. * // do something with the mongoose document
  15. * }).on('error', function (err) {
  16. * // handle the error
  17. * }).on('close', function () {
  18. * // the stream is closed
  19. * });
  20. *
  21. *
  22. * The stream interface allows us to simply "plug-in" to other _Node.js 0.8_ style write streams.
  23. *
  24. * Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream);
  25. *
  26. * ####Valid options
  27. *
  28. * - `transform`: optional function which accepts a mongoose document. The return value of the function will be emitted on `data`.
  29. *
  30. * ####Example
  31. *
  32. * // JSON.stringify all documents before emitting
  33. * var stream = Thing.find().stream({ transform: JSON.stringify });
  34. * stream.pipe(writeStream);
  35. *
  36. * _NOTE: plugging into an HTTP response will *not* work out of the box. Those streams expect only strings or buffers to be emitted, so first formatting our documents as strings/buffers is necessary._
  37. *
  38. * _NOTE: these streams are Node.js 0.8 style read streams which differ from Node.js 0.10 style. Node.js 0.10 streams are not well tested yet and are not guaranteed to work._
  39. *
  40. * @param {Query} query
  41. * @param {Object} [options]
  42. * @inherits NodeJS Stream http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream
  43. * @event `data`: emits a single Mongoose document
  44. * @event `error`: emits when an error occurs during streaming. This will emit _before_ the `close` event.
  45. * @event `close`: emits when the stream reaches the end of the cursor or an error occurs, or the stream is manually `destroy`ed. After this event, no more events are emitted.
  46. * @api public
  47. */
  48. function QueryStream (query, options) {
  49. Stream.call(this);
  50. this.query = query;
  51. this.readable = true;
  52. this.paused = false;
  53. this._cursor = null;
  54. this._destroyed = null;
  55. this._fields = null;
  56. this._buffer = null;
  57. this._inline = T_INIT;
  58. this._running = false;
  59. this._transform = options && 'function' == typeof options.transform
  60. ? options.transform
  61. : K;
  62. // give time to hook up events
  63. var self = this;
  64. process.nextTick(function () {
  65. self._init();
  66. });
  67. }
  68. /*!
  69. * Inherit from Stream
  70. */
  71. QueryStream.prototype.__proto__ = Stream.prototype;
  72. /**
  73. * Flag stating whether or not this stream is readable.
  74. *
  75. * @property readable
  76. * @api public
  77. */
  78. QueryStream.prototype.readable;
  79. /**
  80. * Flag stating whether or not this stream is paused.
  81. *
  82. * @property paused
  83. * @api public
  84. */
  85. QueryStream.prototype.paused;
  86. // trampoline flags
  87. var T_INIT = 0;
  88. var T_IDLE = 1;
  89. var T_CONT = 2;
  90. /**
  91. * Initializes the query.
  92. *
  93. * @api private
  94. */
  95. QueryStream.prototype._init = function () {
  96. if (this._destroyed) return;
  97. var query = this.query
  98. , model = query.model
  99. , options = query._optionsForExec(model)
  100. , self = this
  101. try {
  102. query.cast(model);
  103. } catch (err) {
  104. return self.destroy(err);
  105. }
  106. self._fields = utils.clone(query._fields);
  107. options.fields = query._castFields(self._fields);
  108. model.collection.find(query._conditions, options, function (err, cursor) {
  109. if (err) return self.destroy(err);
  110. self._cursor = cursor;
  111. self._next();
  112. });
  113. }
  114. /**
  115. * Trampoline for pulling the next doc from cursor.
  116. *
  117. * @see QueryStream#__next #querystream_QueryStream-__next
  118. * @api private
  119. */
  120. QueryStream.prototype._next = function _next () {
  121. if (this.paused || this._destroyed) {
  122. return this._running = false;
  123. }
  124. this._running = true;
  125. if (this._buffer && this._buffer.length) {
  126. var arg;
  127. while (!this.paused && !this._destroyed && (arg = this._buffer.shift())) {
  128. this._onNextObject.apply(this, arg);
  129. }
  130. }
  131. // avoid stack overflows with large result sets.
  132. // trampoline instead of recursion.
  133. while (this.__next()) {}
  134. }
  135. /**
  136. * Pulls the next doc from the cursor.
  137. *
  138. * @see QueryStream#_next #querystream_QueryStream-_next
  139. * @api private
  140. */
  141. QueryStream.prototype.__next = function () {
  142. if (this.paused || this._destroyed)
  143. return this._running = false;
  144. var self = this;
  145. self._inline = T_INIT;
  146. self._cursor.nextObject(function cursorcb (err, doc) {
  147. self._onNextObject(err, doc);
  148. });
  149. // if onNextObject() was already called in this tick
  150. // return ourselves to the trampoline.
  151. if (T_CONT === this._inline) {
  152. return true;
  153. } else {
  154. // onNextObject() hasn't fired yet. tell onNextObject
  155. // that its ok to call _next b/c we are not within
  156. // the trampoline anymore.
  157. this._inline = T_IDLE;
  158. }
  159. }
  160. /**
  161. * Transforms raw `doc`s returned from the cursor into a model instance.
  162. *
  163. * @param {Error|null} err
  164. * @param {Object} doc
  165. * @api private
  166. */
  167. QueryStream.prototype._onNextObject = function _onNextObject (err, doc) {
  168. if (this._destroyed) return;
  169. if (this.paused) {
  170. this._buffer || (this._buffer = []);
  171. this._buffer.push([err, doc]);
  172. return this._running = false;
  173. }
  174. if (err) return this.destroy(err);
  175. // when doc is null we hit the end of the cursor
  176. if (!doc) {
  177. this.emit('end');
  178. return this.destroy();
  179. }
  180. var opts = this.query.options;
  181. if (!opts.populate) {
  182. return true === opts.lean
  183. ? emit(this, doc)
  184. : createAndEmit(this, doc);
  185. }
  186. var self = this;
  187. var pop = helpers.preparePopulationOptions(self.query, self.query.options);
  188. self.query.model.populate(doc, pop, function (err, doc) {
  189. if (err) return self.destroy(err);
  190. return true === opts.lean
  191. ? emit(self, doc)
  192. : createAndEmit(self, doc);
  193. })
  194. }
  195. function createAndEmit (self, doc) {
  196. var instance = new self.query.model(undefined, self._fields, true);
  197. instance.init(doc, function (err) {
  198. if (err) return self.destroy(err);
  199. emit(self, instance);
  200. });
  201. }
  202. /*!
  203. * Emit a data event and manage the trampoline state
  204. */
  205. function emit (self, doc) {
  206. self.emit('data', self._transform(doc));
  207. // trampoline management
  208. if (T_IDLE === self._inline) {
  209. // no longer in trampoline. restart it.
  210. self._next();
  211. } else {
  212. // in a trampoline. tell __next that its
  213. // ok to continue jumping.
  214. self._inline = T_CONT;
  215. }
  216. }
  217. /**
  218. * Pauses this stream.
  219. *
  220. * @api public
  221. */
  222. QueryStream.prototype.pause = function () {
  223. this.paused = true;
  224. }
  225. /**
  226. * Resumes this stream.
  227. *
  228. * @api public
  229. */
  230. QueryStream.prototype.resume = function () {
  231. this.paused = false;
  232. if (!this._cursor) {
  233. // cannot start if not initialized
  234. return;
  235. }
  236. // are we within the trampoline?
  237. if (T_INIT === this._inline) {
  238. return;
  239. }
  240. if (!this._running) {
  241. // outside QueryStream control, need manual restart
  242. return this._next();
  243. }
  244. }
  245. /**
  246. * Destroys the stream, closing the underlying cursor. No more events will be emitted.
  247. *
  248. * @param {Error} [err]
  249. * @api public
  250. */
  251. QueryStream.prototype.destroy = function (err) {
  252. if (this._destroyed) return;
  253. this._destroyed = true;
  254. this._running = false;
  255. this.readable = false;
  256. if (this._cursor) {
  257. this._cursor.close();
  258. }
  259. if (err) {
  260. this.emit('error', err);
  261. }
  262. this.emit('close');
  263. }
  264. /**
  265. * Pipes this query stream into another stream. This method is inherited from NodeJS Streams.
  266. *
  267. * ####Example:
  268. *
  269. * query.stream().pipe(writeStream [, options])
  270. *
  271. * @method pipe
  272. * @memberOf QueryStream
  273. * @see NodeJS http://nodejs.org/api/stream.html
  274. * @api public
  275. */
  276. /*!
  277. * Module exports
  278. */
  279. module.exports = exports = QueryStream;