You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

262 lines
6.4 KiB

/*!
* Module dependencies.
*/
var PromiseProvider = require('./promise_provider');
var Readable = require('stream').Readable;
var helpers = require('./queryhelpers');
var util = require('util');
/**
* A QueryCursor is a concurrency primitive for processing query results
* one document at a time. A QueryCursor fulfills the [Node.js streams3 API](https://strongloop.com/strongblog/whats-new-io-js-beta-streams3/),
* in addition to several other mechanisms for loading documents from MongoDB
* one at a time.
*
* Unless you're an advanced user, do **not** instantiate this class directly.
* Use [`Query#cursor()`](/api.html#query_Query-cursor) instead.
*
* @param {Query} query
* @param {Object} options query options passed to `.find()`
* @inherits Readable
* @event `cursor`: Emitted when the cursor is created
* @event `error`: Emitted when an error occurred
* @event `data`: Emitted when the stream is flowing and the next doc is ready
* @event `end`: Emitted when the stream is exhausted
* @api public
*/
function QueryCursor(query, options) {
Readable.call(this, { objectMode: true });
this.cursor = null;
this.query = query;
var _this = this;
var model = query.model;
model.collection.find(query._conditions, options, function(err, cursor) {
if (_this._error) {
cursor.close(function() {});
_this.listeners('error').length > 0 && _this.emit('error', _this._error);
}
if (err) {
return _this.emit('error', err);
}
_this.cursor = cursor;
_this.emit('cursor', cursor);
});
}
util.inherits(QueryCursor, Readable);
/*!
* Necessary to satisfy the Readable API
*/
QueryCursor.prototype._read = function() {
var _this = this;
_next(this, function(error, doc) {
if (error) {
return _this.emit('error', error);
}
if (!doc) {
_this.push(null);
return _this.cursor.close(function(error) {
if (error) {
return _this.emit('error', error);
}
_this.emit('close');
});
}
_this.push(doc);
});
};
/*!
* Marks this cursor as errored
*/
QueryCursor.prototype._markError = function(error) {
this._error = error;
return this;
};
/**
* Marks this cursor as closed. Will stop streaming and subsequent calls to
* `next()` will error.
*
* @param {Function} callback
* @return {Promise}
* @api public
* @method close
* @emits close
* @see MongoDB driver cursor#close http://mongodb.github.io/node-mongodb-native/2.1/api/Cursor.html#close
*/
QueryCursor.prototype.close = function(callback) {
var Promise = PromiseProvider.get();
var _this = this;
return new Promise.ES6(function(resolve, reject) {
_this.cursor.close(function(error) {
if (error) {
callback && callback(error);
reject(error);
return _this.listeners('error').length > 0 &&
_this.emit('error', error);
}
_this.emit('close');
resolve();
callback && callback();
});
});
};
/**
* Get the next document from this cursor. Will return `null` when there are
* no documents left.
*
* @param {Function} callback
* @return {Promise}
* @api public
* @method next
*/
QueryCursor.prototype.next = function(callback) {
var Promise = PromiseProvider.get();
var _this = this;
return new Promise.ES6(function(resolve, reject) {
_next(_this, function(error, doc) {
if (error) {
callback && callback(error);
return reject(error);
}
callback && callback(null, doc);
resolve(doc);
});
});
};
/**
* Execute `fn` for every document in the cursor. If `fn` returns a promise,
* will wait for the promise to resolve before iterating on to the next one.
* Returns a promise that resolves when done.
*
* @param {Function} fn
* @param {Function} [callback] executed when all docs have been processed
* @return {Promise}
* @api public
* @method eachAsync
*/
QueryCursor.prototype.eachAsync = function(fn, callback) {
var Promise = PromiseProvider.get();
var _this = this;
var handleNextResult = function(doc, callback) {
var promise = fn(doc);
if (promise && typeof promise.then === 'function') {
promise.then(
function() { callback(null); },
function(error) { callback(error); });
} else {
callback(null);
}
};
var iterate = function(callback) {
return _next(_this, function(error, doc) {
if (error) {
return callback(error);
}
if (!doc) {
return callback(null);
}
handleNextResult(doc, function(error) {
if (error) {
return callback(error);
}
iterate(callback);
});
});
};
return new Promise.ES6(function(resolve, reject) {
iterate(function(error) {
if (error) {
callback && callback(error);
return reject(error);
}
callback && callback(null);
return resolve();
});
});
};
/*!
* Get the next doc from the underlying cursor and mongooseify it
* (populate, etc.)
*/
function _next(ctx, callback) {
if (ctx._error) {
return process.nextTick(function() {
callback(ctx._error);
});
}
if (ctx.cursor) {
ctx.cursor.next(function(error, doc) {
if (error) {
return callback(error);
}
if (!doc) {
return callback(null, null);
}
var opts = ctx.query._mongooseOptions;
if (!opts.populate) {
return opts.lean === true ?
callback(null, doc) :
_create(ctx, doc, null, callback);
}
var pop = helpers.preparePopulationOptionsMQ(ctx.query,
ctx.query._mongooseOptions);
pop.forEach(function(option) {
delete option.model;
});
pop.__noPromise = true;
ctx.query.model.populate(doc, pop, function(err, doc) {
if (err) {
return callback(err);
}
return opts.lean === true ?
callback(null, doc) :
_create(ctx, doc, pop, callback);
});
});
} else {
ctx.once('cursor', function() {
_next(ctx, callback);
});
}
}
/*!
* Convert a raw doc into a full mongoose doc.
*/
function _create(ctx, doc, populatedIds, cb) {
var instance = helpers.createModel(ctx.query.model, doc, ctx.query._fields);
var opts = populatedIds ?
{ populated: populatedIds } :
undefined;
instance.init(doc, opts, function(err) {
if (err) {
return cb(err);
}
cb(null, instance);
});
}
module.exports = QueryCursor;