|
|
/* eslint no-empty: 1 */
/*! * Module dependencies. */
var Stream = require('stream').Stream; var utils = require('./utils'); var helpers = require('./queryhelpers'); var K = function(k) { return k; };
/** * Provides a Node.js 0.8 style [ReadStream](http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream) interface for Queries.
* * var stream = Model.find().stream(); * * stream.on('data', function (doc) { * // do something with the mongoose document
* }).on('error', function (err) { * // handle the error
* }).on('close', function () { * // the stream is closed
* }); * * * The stream interface allows us to simply "plug-in" to other _Node.js 0.8_ style write streams. * * Model.where('created').gte(twoWeeksAgo).stream().pipe(writeStream); * * ####Valid options * * - `transform`: optional function which accepts a mongoose document. The return value of the function will be emitted on `data`. * * ####Example * * // JSON.stringify all documents before emitting
* var stream = Thing.find().stream({ transform: JSON.stringify }); * stream.pipe(writeStream); * * _NOTE: plugging into an HTTP response will *not* work out of the box. Those streams expect only strings or buffers to be emitted, so first formatting our documents as strings/buffers is necessary._ * * _NOTE: these streams are Node.js 0.8 style read streams which differ from Node.js 0.10 style. Node.js 0.10 streams are not well tested yet and are not guaranteed to work._ * * @param {Query} query * @param {Object} [options] * @inherits NodeJS Stream http://nodejs.org/docs/v0.8.21/api/stream.html#stream_readable_stream
* @event `data`: emits a single Mongoose document * @event `error`: emits when an error occurs during streaming. This will emit _before_ the `close` event. * @event `close`: emits when the stream reaches the end of the cursor or an error occurs, or the stream is manually `destroy`ed. After this event, no more events are emitted. * @api public */
function QueryStream(query, options) { Stream.call(this);
this.query = query; this.readable = true; this.paused = false; this._cursor = null; this._destroyed = null; this._fields = null; this._buffer = null; this._inline = T_INIT; this._running = false; this._transform = options && typeof options.transform === 'function' ? options.transform : K;
// give time to hook up events
var _this = this; process.nextTick(function() { _this._init(); }); }
/*! * Inherit from Stream */
QueryStream.prototype.__proto__ = Stream.prototype;
/** * Flag stating whether or not this stream is readable. * * @property readable * @api public */
QueryStream.prototype.readable;
/** * Flag stating whether or not this stream is paused. * * @property paused * @api public */
QueryStream.prototype.paused;
// trampoline flags
var T_INIT = 0; var T_IDLE = 1; var T_CONT = 2;
/** * Initializes the query. * * @api private */
QueryStream.prototype._init = function() { if (this._destroyed) { return; }
var query = this.query, model = query.model, options = query._optionsForExec(model), _this = this;
try { query.cast(model); } catch (err) { return _this.destroy(err); }
_this._fields = utils.clone(query._fields); options.fields = query._castFields(_this._fields);
model.collection.find(query._conditions, options, function(err, cursor) { if (err) { return _this.destroy(err); } _this._cursor = cursor; _this._next(); }); };
/** * Trampoline for pulling the next doc from cursor. * * @see QueryStream#__next #querystream_QueryStream-__next * @api private */
QueryStream.prototype._next = function _next() { if (this.paused || this._destroyed) { this._running = false; return this._running; }
this._running = true;
if (this._buffer && this._buffer.length) { var arg; while (!this.paused && !this._destroyed && (arg = this._buffer.shift())) { // eslint-disable-line no-cond-assign
this._onNextObject.apply(this, arg); } }
// avoid stack overflows with large result sets.
// trampoline instead of recursion.
while (this.__next()) { } };
/** * Pulls the next doc from the cursor. * * @see QueryStream#_next #querystream_QueryStream-_next * @api private */
QueryStream.prototype.__next = function() { if (this.paused || this._destroyed) { this._running = false; return this._running; }
var _this = this; _this._inline = T_INIT;
_this._cursor.nextObject(function cursorcb(err, doc) { _this._onNextObject(err, doc); });
// if onNextObject() was already called in this tick
// return ourselves to the trampoline.
if (T_CONT === this._inline) { return true; } // onNextObject() hasn't fired yet. tell onNextObject
// that its ok to call _next b/c we are not within
// the trampoline anymore.
this._inline = T_IDLE; };
/** * Transforms raw `doc`s returned from the cursor into a model instance. * * @param {Error|null} err * @param {Object} doc * @api private */
QueryStream.prototype._onNextObject = function _onNextObject(err, doc) { if (this._destroyed) { return; }
if (this.paused) { this._buffer || (this._buffer = []); this._buffer.push([err, doc]); this._running = false; return this._running; }
if (err) { return this.destroy(err); }
// when doc is null we hit the end of the cursor
if (!doc) { this.emit('end'); return this.destroy(); }
var opts = this.query._mongooseOptions;
if (!opts.populate) { return opts.lean === true ? emit(this, doc) : createAndEmit(this, null, doc); }
var _this = this; var pop = helpers.preparePopulationOptionsMQ(_this.query, _this.query._mongooseOptions);
// Hack to work around gh-3108
pop.forEach(function(option) { delete option.model; });
pop.__noPromise = true; _this.query.model.populate(doc, pop, function(err, doc) { if (err) { return _this.destroy(err); } return opts.lean === true ? emit(_this, doc) : createAndEmit(_this, pop, doc); }); };
function createAndEmit(self, populatedIds, doc) { var instance = helpers.createModel(self.query.model, doc, self._fields); var opts = populatedIds ? {populated: populatedIds} : undefined;
instance.init(doc, opts, function(err) { if (err) { return self.destroy(err); } emit(self, instance); }); }
/*! * Emit a data event and manage the trampoline state */
function emit(self, doc) { self.emit('data', self._transform(doc));
// trampoline management
if (T_IDLE === self._inline) { // no longer in trampoline. restart it.
self._next(); } else { // in a trampoline. tell __next that its
// ok to continue jumping.
self._inline = T_CONT; } }
/** * Pauses this stream. * * @api public */
QueryStream.prototype.pause = function() { this.paused = true; };
/** * Resumes this stream. * * @api public */
QueryStream.prototype.resume = function() { this.paused = false;
if (!this._cursor) { // cannot start if not initialized
return; }
// are we within the trampoline?
if (T_INIT === this._inline) { return; }
if (!this._running) { // outside QueryStream control, need manual restart
return this._next(); } };
/** * Destroys the stream, closing the underlying cursor, which emits the close event. No more events will be emitted after the close event. * * @param {Error} [err] * @api public */
QueryStream.prototype.destroy = function(err) { if (this._destroyed) { return; } this._destroyed = true; this._running = false; this.readable = false;
if (this._cursor) { this._cursor.close(); }
if (err) { this.emit('error', err); }
this.emit('close'); };
/** * Pipes this query stream into another stream. This method is inherited from NodeJS Streams. * * ####Example: * * query.stream().pipe(writeStream [, options]) * * @method pipe * @memberOf QueryStream * @see NodeJS http://nodejs.org/api/stream.html
* @api public */
/*! * Module exports */
module.exports = exports = QueryStream;
|