You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

310 lines
7.4 KiB

  1. /*!
  2. * Module dependencies.
  3. */
  4. var MongooseConnection = require('../../connection')
  5. , mongo = require('mongodb')
  6. , Db = mongo.Db
  7. , Server = mongo.Server
  8. , Mongos = mongo.Mongos
  9. , STATES = require('../../connectionstate')
  10. , ReplSetServers = mongo.ReplSetServers;
  11. /**
  12. * A [node-mongodb-native](https://github.com/mongodb/node-mongodb-native) connection implementation.
  13. *
  14. * @inherits Connection
  15. * @api private
  16. */
  17. function NativeConnection() {
  18. MongooseConnection.apply(this, arguments);
  19. this._listening = false;
  20. };
  21. /**
  22. * Expose the possible connection states.
  23. * @api public
  24. */
  25. NativeConnection.STATES = STATES;
  26. /*!
  27. * Inherits from Connection.
  28. */
  29. NativeConnection.prototype.__proto__ = MongooseConnection.prototype;
  30. /**
  31. * Opens the connection to MongoDB.
  32. *
  33. * @param {Function} fn
  34. * @return {Connection} this
  35. * @api private
  36. */
  37. NativeConnection.prototype.doOpen = function (fn) {
  38. if (this.db) {
  39. mute(this);
  40. }
  41. var server = new Server(this.host, this.port, this.options.server);
  42. this.db = new Db(this.name, server, this.options.db);
  43. var self = this;
  44. this.db.open(function (err) {
  45. if (err) return fn(err);
  46. listen(self);
  47. fn();
  48. });
  49. return this;
  50. };
  51. /*!
  52. * Register listeners for important events and bubble appropriately.
  53. */
  54. function listen (conn) {
  55. if (conn._listening) return;
  56. conn._listening = true;
  57. conn.db.on('close', function(){
  58. if (conn._closeCalled) return;
  59. // the driver never emits an `open` event. auto_reconnect still
  60. // emits a `close` event but since we never get another
  61. // `open` we can't emit close
  62. if (conn.db.serverConfig.autoReconnect) {
  63. conn.readyState = STATES.disconnected;
  64. conn.emit('close');
  65. return;
  66. }
  67. conn.onClose();
  68. });
  69. conn.db.on('error', function(err){
  70. conn.emit('error', err);
  71. });
  72. conn.db.on('timeout', function(err){
  73. var error = new Error(err && err.err || 'connection timeout');
  74. conn.emit('error', error);
  75. });
  76. conn.db.on('open', function (err, db) {
  77. if (STATES.disconnected === conn.readyState && db && db.databaseName) {
  78. conn.readyState = STATES.connected;
  79. conn.emit('reconnected')
  80. }
  81. })
  82. }
  83. /*!
  84. * Remove listeners registered in `listen`
  85. */
  86. function mute (conn) {
  87. if (!conn.db) throw new Error('missing db');
  88. conn.db.removeAllListeners("close");
  89. conn.db.removeAllListeners("error");
  90. conn.db.removeAllListeners("timeout");
  91. conn.db.removeAllListeners("open");
  92. conn.db.removeAllListeners("fullsetup");
  93. conn._listening = false;
  94. }
  95. /**
  96. * Opens a connection to a MongoDB ReplicaSet.
  97. *
  98. * See description of [doOpen](#NativeConnection-doOpen) for server options. In this case `options.replset` is also passed to ReplSetServers.
  99. *
  100. * @param {Function} fn
  101. * @api private
  102. * @return {Connection} this
  103. */
  104. NativeConnection.prototype.doOpenSet = function (fn) {
  105. if (this.db) {
  106. mute(this);
  107. }
  108. var servers = []
  109. , self = this;
  110. this.hosts.forEach(function (server) {
  111. var host = server.host || server.ipc;
  112. var port = server.port || 27017;
  113. servers.push(new Server(host, port, self.options.server));
  114. })
  115. var server = this.options.mongos
  116. ? new Mongos(servers, this.options.mongos)
  117. : new ReplSetServers(servers, this.options.replset);
  118. this.db = new Db(this.name, server, this.options.db);
  119. this.db.on('fullsetup', function () {
  120. self.emit('fullsetup')
  121. });
  122. this.db.open(function (err) {
  123. if (err) return fn(err);
  124. fn();
  125. listen(self);
  126. });
  127. return this;
  128. };
  129. /**
  130. * Closes the connection
  131. *
  132. * @param {Function} fn
  133. * @return {Connection} this
  134. * @api private
  135. */
  136. NativeConnection.prototype.doClose = function (fn) {
  137. this.db.close();
  138. if (fn) fn();
  139. return this;
  140. }
  141. /**
  142. * Prepares default connection options for the node-mongodb-native driver.
  143. *
  144. * _NOTE: `passed` options take precedence over connection string options._
  145. *
  146. * @param {Object} passed options that were passed directly during connection
  147. * @param {Object} [connStrOptions] options that were passed in the connection string
  148. * @api private
  149. */
  150. NativeConnection.prototype.parseOptions = function (passed, connStrOpts) {
  151. var o = passed || {};
  152. o.db || (o.db = {});
  153. o.auth || (o.auth = {});
  154. o.server || (o.server = {});
  155. o.replset || (o.replset = {});
  156. o.server.socketOptions || (o.server.socketOptions = {});
  157. o.replset.socketOptions || (o.replset.socketOptions = {});
  158. var opts = connStrOpts || {};
  159. Object.keys(opts).forEach(function (name) {
  160. switch (name) {
  161. case 'poolSize':
  162. if ('undefined' == typeof o.server.poolSize) {
  163. o.server.poolSize = o.replset.poolSize = opts[name];
  164. }
  165. break;
  166. case 'slaveOk':
  167. if ('undefined' == typeof o.server.slave_ok) {
  168. o.server.slave_ok = opts[name];
  169. }
  170. break;
  171. case 'autoReconnect':
  172. if ('undefined' == typeof o.server.auto_reconnect) {
  173. o.server.auto_reconnect = opts[name];
  174. }
  175. break;
  176. case 'ssl':
  177. case 'socketTimeoutMS':
  178. case 'connectTimeoutMS':
  179. if ('undefined' == typeof o.server.socketOptions[name]) {
  180. o.server.socketOptions[name] = o.replset.socketOptions[name] = opts[name];
  181. }
  182. break;
  183. case 'authdb':
  184. if ('undefined' == typeof o.auth.authdb) {
  185. o.auth.authdb = opts[name];
  186. }
  187. break;
  188. case 'authSource':
  189. if ('undefined' == typeof o.auth.authSource) {
  190. o.auth.authSource = opts[name];
  191. }
  192. break;
  193. case 'retries':
  194. case 'reconnectWait':
  195. case 'rs_name':
  196. if ('undefined' == typeof o.replset[name]) {
  197. o.replset[name] = opts[name];
  198. }
  199. break;
  200. case 'replicaSet':
  201. if ('undefined' == typeof o.replset.rs_name) {
  202. o.replset.rs_name = opts[name];
  203. }
  204. break;
  205. case 'readSecondary':
  206. if ('undefined' == typeof o.replset.read_secondary) {
  207. o.replset.read_secondary = opts[name];
  208. }
  209. break;
  210. case 'nativeParser':
  211. if ('undefined' == typeof o.db.native_parser) {
  212. o.db.native_parser = opts[name];
  213. }
  214. break;
  215. case 'w':
  216. case 'safe':
  217. case 'fsync':
  218. case 'journal':
  219. case 'wtimeoutMS':
  220. if ('undefined' == typeof o.db[name]) {
  221. o.db[name] = opts[name];
  222. }
  223. break;
  224. case 'readPreference':
  225. if ('undefined' == typeof o.db.read_preference) {
  226. o.db.read_preference = opts[name];
  227. }
  228. break;
  229. case 'readPreferenceTags':
  230. if ('undefined' == typeof o.db.read_preference_tags) {
  231. o.db.read_preference_tags = opts[name];
  232. }
  233. break;
  234. }
  235. })
  236. if (!('auto_reconnect' in o.server)) {
  237. o.server.auto_reconnect = true;
  238. }
  239. if (!o.db.read_preference) {
  240. // read from primaries by default
  241. o.db.read_preference = 'primary';
  242. }
  243. // mongoose creates its own ObjectIds
  244. o.db.forceServerObjectId = false;
  245. // default safe using new nomenclature
  246. if (!('journal' in o.db || 'j' in o.db ||
  247. 'fsync' in o.db || 'safe' in o.db || 'w' in o.db)) {
  248. o.db.w = 1;
  249. }
  250. validate(o);
  251. return o;
  252. }
  253. /*!
  254. * Validates the driver db options.
  255. *
  256. * @param {Object} o
  257. */
  258. function validate (o) {
  259. if (-1 === o.db.w || 0 === o.db.w) {
  260. if (o.db.journal || o.db.fsync || o.db.safe) {
  261. throw new Error(
  262. 'Invalid writeConcern: '
  263. + 'w set to -1 or 0 cannot be combined with safe|fsync|journal');
  264. }
  265. }
  266. }
  267. /*!
  268. * Module exports.
  269. */
  270. module.exports = NativeConnection;