You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

391 lines
9.0 KiB

  1. /**
  2. * Module dependencies.
  3. */
  4. var qs = require('querystring')
  5. , parse = require('url').parse
  6. , readFileSync = require('fs').readFileSync
  7. , crypto = require('crypto')
  8. , base64id = require('base64id')
  9. , transports = require('./transports')
  10. , EventEmitter = require('events').EventEmitter
  11. , Socket = require('./socket')
  12. , WebSocketServer = require('ws').Server
  13. , debug = require('debug')('engine');
  14. /**
  15. * Module exports.
  16. */
  17. module.exports = Server;
  18. /**
  19. * Server constructor.
  20. *
  21. * @param {Object} options
  22. * @api public
  23. */
  24. function Server(opts){
  25. if (!(this instanceof Server)) {
  26. return new Server(opts);
  27. }
  28. this.clients = {};
  29. this.clientsCount = 0;
  30. opts = opts || {};
  31. this.pingTimeout = opts.pingTimeout || 60000;
  32. this.pingInterval = opts.pingInterval || 25000;
  33. this.upgradeTimeout = opts.upgradeTimeout || 10000;
  34. this.maxHttpBufferSize = opts.maxHttpBufferSize || 10E7;
  35. this.transports = opts.transports || Object.keys(transports);
  36. this.allowUpgrades = false !== opts.allowUpgrades;
  37. this.allowRequest = opts.allowRequest;
  38. this.cookie = false !== opts.cookie ? (opts.cookie || 'io') : false;
  39. // initialize websocket server
  40. if (~this.transports.indexOf('websocket')) {
  41. this.ws = new WebSocketServer({ noServer: true, clientTracking: false });
  42. }
  43. }
  44. /**
  45. * Protocol errors mappings.
  46. */
  47. Server.errors = {
  48. UNKNOWN_TRANSPORT: 0,
  49. UNKNOWN_SID: 1,
  50. BAD_HANDSHAKE_METHOD: 2,
  51. BAD_REQUEST: 3
  52. };
  53. Server.errorMessages = {
  54. 0: 'Transport unknown',
  55. 1: 'Session ID unknown',
  56. 2: 'Bad handshake method',
  57. 3: 'Bad request'
  58. };
  59. /**
  60. * Inherits from EventEmitter.
  61. */
  62. Server.prototype.__proto__ = EventEmitter.prototype;
  63. /**
  64. * Hash of open clients.
  65. *
  66. * @api public
  67. */
  68. Server.prototype.clients;
  69. /**
  70. * Returns a list of available transports for upgrade given a certain transport.
  71. *
  72. * @return {Array}
  73. * @api public
  74. */
  75. Server.prototype.upgrades = function(transport){
  76. if (!this.allowUpgrades) return [];
  77. return transports[transport].upgradesTo || [];
  78. };
  79. /**
  80. * Verifies a request.
  81. *
  82. * @param {http.ServerRequest}
  83. * @return {Boolean} whether the request is valid
  84. * @api private
  85. */
  86. Server.prototype.verify = function(req, upgrade, fn){
  87. // transport check
  88. var transport = req._query.transport;
  89. if (!~this.transports.indexOf(transport)) {
  90. debug('unknown transport "%s"', transport);
  91. return fn(Server.errors.UNKNOWN_TRANSPORT, false);
  92. }
  93. // sid check
  94. var sid = req._query.sid;
  95. if (sid) {
  96. if (!this.clients.hasOwnProperty(sid))
  97. return fn(Server.errors.UNKNOWN_SID, false);
  98. if (!upgrade && this.clients[sid].transport.name !== transport) {
  99. debug('bad request: unexpected transport without upgrade');
  100. return fn(Server.errors.BAD_REQUEST, false);
  101. }
  102. } else {
  103. // handshake is GET only
  104. if ('GET' != req.method) return fn(Server.errors.BAD_HANDSHAKE_METHOD, false);
  105. if (!this.allowRequest) return fn(null, true);
  106. return this.allowRequest(req, fn);
  107. }
  108. fn(null, true);
  109. };
  110. /**
  111. * Prepares a request by processing the query string.
  112. *
  113. * @api private
  114. */
  115. Server.prototype.prepare = function(req){
  116. // try to leverage pre-existing `req._query` (e.g: from connect)
  117. if (!req._query) {
  118. req._query = ~req.url.indexOf('?') ? qs.parse(parse(req.url).query) : {};
  119. }
  120. };
  121. /**
  122. * Closes all clients.
  123. *
  124. * @api public
  125. */
  126. Server.prototype.close = function(){
  127. debug('closing all open clients');
  128. for (var i in this.clients) {
  129. this.clients[i].close();
  130. }
  131. return this;
  132. };
  133. /**
  134. * Handles an Engine.IO HTTP request.
  135. *
  136. * @param {http.ServerRequest} request
  137. * @param {http.ServerResponse|http.OutgoingMessage} response
  138. * @api public
  139. */
  140. Server.prototype.handleRequest = function(req, res){
  141. debug('handling "%s" http request "%s"', req.method, req.url);
  142. this.prepare(req);
  143. req.res = res;
  144. var self = this;
  145. this.verify(req, false, function(err, success) {
  146. if (!success) {
  147. sendErrorMessage(req, res, err);
  148. return;
  149. }
  150. if (req._query.sid) {
  151. debug('setting new request for existing client');
  152. self.clients[req._query.sid].transport.onRequest(req);
  153. } else {
  154. self.handshake(req._query.transport, req);
  155. }
  156. });
  157. };
  158. /**
  159. * Sends an Engine.IO Error Message
  160. *
  161. * @param {http.ServerResponse} response
  162. * @param {code} error code
  163. * @api private
  164. */
  165. function sendErrorMessage(req, res, code) {
  166. var headers = { 'Content-Type': 'application/json' };
  167. if (req.headers.origin) {
  168. headers['Access-Control-Allow-Credentials'] = 'true';
  169. headers['Access-Control-Allow-Origin'] = req.headers.origin;
  170. } else {
  171. headers['Access-Control-Allow-Origin'] = '*';
  172. }
  173. res.writeHead(400, headers);
  174. res.end(JSON.stringify({
  175. code: code,
  176. message: Server.errorMessages[code]
  177. }));
  178. }
  179. /**
  180. * Handshakes a new client.
  181. *
  182. * @param {String} transport name
  183. * @param {Object} request object
  184. * @api private
  185. */
  186. Server.prototype.handshake = function(transport, req){
  187. var id = base64id.generateId();
  188. debug('handshaking client "%s"', id);
  189. var transportName = transport;
  190. try {
  191. var transport = new transports[transport](req);
  192. if ('polling' == transportName) {
  193. transport.maxHttpBufferSize = this.maxHttpBufferSize;
  194. }
  195. if (req._query && req._query.b64) {
  196. transport.supportsBinary = false;
  197. } else {
  198. transport.supportsBinary = true;
  199. }
  200. }
  201. catch (e) {
  202. sendErrorMessage(req, req.res, Server.errors.BAD_REQUEST);
  203. return;
  204. }
  205. var socket = new Socket(id, this, transport, req);
  206. var self = this;
  207. if (false !== this.cookie) {
  208. transport.on('headers', function(headers){
  209. headers['Set-Cookie'] = self.cookie + '=' + id;
  210. });
  211. }
  212. transport.onRequest(req);
  213. this.clients[id] = socket;
  214. this.clientsCount++;
  215. socket.once('close', function(){
  216. delete self.clients[id];
  217. self.clientsCount--;
  218. });
  219. this.emit('connection', socket);
  220. };
  221. /**
  222. * Handles an Engine.IO HTTP Upgrade.
  223. *
  224. * @api public
  225. */
  226. Server.prototype.handleUpgrade = function(req, socket, upgradeHead){
  227. this.prepare(req);
  228. var self = this;
  229. this.verify(req, true, function(err, success) {
  230. if (!success) {
  231. socket.end();
  232. return;
  233. }
  234. var head = new Buffer(upgradeHead.length);
  235. upgradeHead.copy(head);
  236. upgradeHead = null;
  237. // delegate to ws
  238. self.ws.handleUpgrade(req, socket, head, function(conn){
  239. self.onWebSocket(req, conn);
  240. });
  241. });
  242. };
  243. /**
  244. * Called upon a ws.io connection.
  245. *
  246. * @param {ws.Socket} websocket
  247. * @api private
  248. */
  249. Server.prototype.onWebSocket = function(req, socket){
  250. if (!transports[req._query.transport].prototype.handlesUpgrades) {
  251. debug('transport doesnt handle upgraded requests');
  252. socket.close();
  253. return;
  254. }
  255. // get client id
  256. var id = req._query.sid;
  257. // keep a reference to the ws.Socket
  258. req.websocket = socket;
  259. if (id) {
  260. if (!this.clients[id]) {
  261. debug('upgrade attempt for closed client');
  262. socket.close();
  263. } else if (this.clients[id].upgraded) {
  264. debug('transport had already been upgraded');
  265. socket.close();
  266. } else {
  267. debug('upgrading existing transport');
  268. var transport = new transports[req._query.transport](req);
  269. if (req._query && req._query.b64) {
  270. transport.supportsBinary = false;
  271. } else {
  272. transport.supportsBinary = true;
  273. }
  274. this.clients[id].maybeUpgrade(transport);
  275. }
  276. } else {
  277. this.handshake(req._query.transport, req);
  278. }
  279. };
  280. /**
  281. * Captures upgrade requests for a http.Server.
  282. *
  283. * @param {http.Server} server
  284. * @param {Object} options
  285. * @api public
  286. */
  287. Server.prototype.attach = function(server, options){
  288. var self = this;
  289. var options = options || {};
  290. var path = (options.path || '/engine.io').replace(/\/$/, '');
  291. var destroyUpgrade = (options.destroyUpgrade !== undefined) ? options.destroyUpgrade : true;
  292. var destroyUpgradeTimeout = options.destroyUpgradeTimeout || 1000;
  293. // normalize path
  294. path += '/';
  295. function check (req) {
  296. return path == req.url.substr(0, path.length);
  297. }
  298. // cache and clean up listeners
  299. var listeners = server.listeners('request').slice(0);
  300. server.removeAllListeners('request');
  301. server.on('close', self.close.bind(self));
  302. // add request handler
  303. server.on('request', function(req, res){
  304. if (check(req)) {
  305. debug('intercepting request for path "%s"', path);
  306. self.handleRequest(req, res);
  307. } else {
  308. for (var i = 0, l = listeners.length; i < l; i++) {
  309. listeners[i].call(server, req, res);
  310. }
  311. }
  312. });
  313. if(~self.transports.indexOf('websocket')) {
  314. server.on('upgrade', function (req, socket, head) {
  315. if (check(req)) {
  316. self.handleUpgrade(req, socket, head);
  317. } else if (false !== options.destroyUpgrade) {
  318. // default node behavior is to disconnect when no handlers
  319. // but by adding a handler, we prevent that
  320. // and if no eio thing handles the upgrade
  321. // then the socket needs to die!
  322. setTimeout(function() {
  323. if (socket.writable && socket.bytesWritten <= 0) {
  324. return socket.end();
  325. }
  326. }, options.destroyUpgradeTimeout);
  327. }
  328. });
  329. }
  330. };