You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

738 lines
18 KiB

8 years ago
  1. /**
  2. * Module dependencies.
  3. */
  4. var transports = require('./transports/index');
  5. var Emitter = require('component-emitter');
  6. var debug = require('debug')('engine.io-client:socket');
  7. var index = require('indexof');
  8. var parser = require('engine.io-parser');
  9. var parseuri = require('parseuri');
  10. var parsejson = require('parsejson');
  11. var parseqs = require('parseqs');
  12. /**
  13. * Module exports.
  14. */
  15. module.exports = Socket;
  16. /**
  17. * Socket constructor.
  18. *
  19. * @param {String|Object} uri or options
  20. * @param {Object} options
  21. * @api public
  22. */
  23. function Socket (uri, opts) {
  24. if (!(this instanceof Socket)) return new Socket(uri, opts);
  25. opts = opts || {};
  26. if (uri && 'object' === typeof uri) {
  27. opts = uri;
  28. uri = null;
  29. }
  30. if (uri) {
  31. uri = parseuri(uri);
  32. opts.hostname = uri.host;
  33. opts.secure = uri.protocol === 'https' || uri.protocol === 'wss';
  34. opts.port = uri.port;
  35. if (uri.query) opts.query = uri.query;
  36. } else if (opts.host) {
  37. opts.hostname = parseuri(opts.host).host;
  38. }
  39. this.secure = null != opts.secure ? opts.secure
  40. : (global.location && 'https:' === location.protocol);
  41. if (opts.hostname && !opts.port) {
  42. // if no port is specified manually, use the protocol default
  43. opts.port = this.secure ? '443' : '80';
  44. }
  45. this.agent = opts.agent || false;
  46. this.hostname = opts.hostname ||
  47. (global.location ? location.hostname : 'localhost');
  48. this.port = opts.port || (global.location && location.port
  49. ? location.port
  50. : (this.secure ? 443 : 80));
  51. this.query = opts.query || {};
  52. if ('string' === typeof this.query) this.query = parseqs.decode(this.query);
  53. this.upgrade = false !== opts.upgrade;
  54. this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/';
  55. this.forceJSONP = !!opts.forceJSONP;
  56. this.jsonp = false !== opts.jsonp;
  57. this.forceBase64 = !!opts.forceBase64;
  58. this.enablesXDR = !!opts.enablesXDR;
  59. this.timestampParam = opts.timestampParam || 't';
  60. this.timestampRequests = opts.timestampRequests;
  61. this.transports = opts.transports || ['polling', 'websocket'];
  62. this.readyState = '';
  63. this.writeBuffer = [];
  64. this.prevBufferLen = 0;
  65. this.policyPort = opts.policyPort || 843;
  66. this.rememberUpgrade = opts.rememberUpgrade || false;
  67. this.binaryType = null;
  68. this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
  69. this.perMessageDeflate = false !== opts.perMessageDeflate ? (opts.perMessageDeflate || {}) : false;
  70. if (true === this.perMessageDeflate) this.perMessageDeflate = {};
  71. if (this.perMessageDeflate && null == this.perMessageDeflate.threshold) {
  72. this.perMessageDeflate.threshold = 1024;
  73. }
  74. // SSL options for Node.js client
  75. this.pfx = opts.pfx || null;
  76. this.key = opts.key || null;
  77. this.passphrase = opts.passphrase || null;
  78. this.cert = opts.cert || null;
  79. this.ca = opts.ca || null;
  80. this.ciphers = opts.ciphers || null;
  81. this.rejectUnauthorized = opts.rejectUnauthorized === undefined ? null : opts.rejectUnauthorized;
  82. this.forceNode = !!opts.forceNode;
  83. // other options for Node.js client
  84. var freeGlobal = typeof global === 'object' && global;
  85. if (freeGlobal.global === freeGlobal) {
  86. if (opts.extraHeaders && Object.keys(opts.extraHeaders).length > 0) {
  87. this.extraHeaders = opts.extraHeaders;
  88. }
  89. if (opts.localAddress) {
  90. this.localAddress = opts.localAddress;
  91. }
  92. }
  93. // set on handshake
  94. this.id = null;
  95. this.upgrades = null;
  96. this.pingInterval = null;
  97. this.pingTimeout = null;
  98. // set on heartbeat
  99. this.pingIntervalTimer = null;
  100. this.pingTimeoutTimer = null;
  101. this.open();
  102. }
  103. Socket.priorWebsocketSuccess = false;
  104. /**
  105. * Mix in `Emitter`.
  106. */
  107. Emitter(Socket.prototype);
  108. /**
  109. * Protocol version.
  110. *
  111. * @api public
  112. */
  113. Socket.protocol = parser.protocol; // this is an int
  114. /**
  115. * Expose deps for legacy compatibility
  116. * and standalone browser access.
  117. */
  118. Socket.Socket = Socket;
  119. Socket.Transport = require('./transport');
  120. Socket.transports = require('./transports/index');
  121. Socket.parser = require('engine.io-parser');
  122. /**
  123. * Creates transport of the given type.
  124. *
  125. * @param {String} transport name
  126. * @return {Transport}
  127. * @api private
  128. */
  129. Socket.prototype.createTransport = function (name) {
  130. debug('creating transport "%s"', name);
  131. var query = clone(this.query);
  132. // append engine.io protocol identifier
  133. query.EIO = parser.protocol;
  134. // transport name
  135. query.transport = name;
  136. // session id if we already have one
  137. if (this.id) query.sid = this.id;
  138. var transport = new transports[name]({
  139. agent: this.agent,
  140. hostname: this.hostname,
  141. port: this.port,
  142. secure: this.secure,
  143. path: this.path,
  144. query: query,
  145. forceJSONP: this.forceJSONP,
  146. jsonp: this.jsonp,
  147. forceBase64: this.forceBase64,
  148. enablesXDR: this.enablesXDR,
  149. timestampRequests: this.timestampRequests,
  150. timestampParam: this.timestampParam,
  151. policyPort: this.policyPort,
  152. socket: this,
  153. pfx: this.pfx,
  154. key: this.key,
  155. passphrase: this.passphrase,
  156. cert: this.cert,
  157. ca: this.ca,
  158. ciphers: this.ciphers,
  159. rejectUnauthorized: this.rejectUnauthorized,
  160. perMessageDeflate: this.perMessageDeflate,
  161. extraHeaders: this.extraHeaders,
  162. forceNode: this.forceNode,
  163. localAddress: this.localAddress
  164. });
  165. return transport;
  166. };
  167. function clone (obj) {
  168. var o = {};
  169. for (var i in obj) {
  170. if (obj.hasOwnProperty(i)) {
  171. o[i] = obj[i];
  172. }
  173. }
  174. return o;
  175. }
  176. /**
  177. * Initializes transport to use and starts probe.
  178. *
  179. * @api private
  180. */
  181. Socket.prototype.open = function () {
  182. var transport;
  183. if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') !== -1) {
  184. transport = 'websocket';
  185. } else if (0 === this.transports.length) {
  186. // Emit error on next tick so it can be listened to
  187. var self = this;
  188. setTimeout(function () {
  189. self.emit('error', 'No transports available');
  190. }, 0);
  191. return;
  192. } else {
  193. transport = this.transports[0];
  194. }
  195. this.readyState = 'opening';
  196. // Retry with the next transport if the transport is disabled (jsonp: false)
  197. try {
  198. transport = this.createTransport(transport);
  199. } catch (e) {
  200. this.transports.shift();
  201. this.open();
  202. return;
  203. }
  204. transport.open();
  205. this.setTransport(transport);
  206. };
  207. /**
  208. * Sets the current transport. Disables the existing one (if any).
  209. *
  210. * @api private
  211. */
  212. Socket.prototype.setTransport = function (transport) {
  213. debug('setting transport %s', transport.name);
  214. var self = this;
  215. if (this.transport) {
  216. debug('clearing existing transport %s', this.transport.name);
  217. this.transport.removeAllListeners();
  218. }
  219. // set up transport
  220. this.transport = transport;
  221. // set up transport listeners
  222. transport
  223. .on('drain', function () {
  224. self.onDrain();
  225. })
  226. .on('packet', function (packet) {
  227. self.onPacket(packet);
  228. })
  229. .on('error', function (e) {
  230. self.onError(e);
  231. })
  232. .on('close', function () {
  233. self.onClose('transport close');
  234. });
  235. };
  236. /**
  237. * Probes a transport.
  238. *
  239. * @param {String} transport name
  240. * @api private
  241. */
  242. Socket.prototype.probe = function (name) {
  243. debug('probing transport "%s"', name);
  244. var transport = this.createTransport(name, { probe: 1 });
  245. var failed = false;
  246. var self = this;
  247. Socket.priorWebsocketSuccess = false;
  248. function onTransportOpen () {
  249. if (self.onlyBinaryUpgrades) {
  250. var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
  251. failed = failed || upgradeLosesBinary;
  252. }
  253. if (failed) return;
  254. debug('probe transport "%s" opened', name);
  255. transport.send([{ type: 'ping', data: 'probe' }]);
  256. transport.once('packet', function (msg) {
  257. if (failed) return;
  258. if ('pong' === msg.type && 'probe' === msg.data) {
  259. debug('probe transport "%s" pong', name);
  260. self.upgrading = true;
  261. self.emit('upgrading', transport);
  262. if (!transport) return;
  263. Socket.priorWebsocketSuccess = 'websocket' === transport.name;
  264. debug('pausing current transport "%s"', self.transport.name);
  265. self.transport.pause(function () {
  266. if (failed) return;
  267. if ('closed' === self.readyState) return;
  268. debug('changing transport and sending upgrade packet');
  269. cleanup();
  270. self.setTransport(transport);
  271. transport.send([{ type: 'upgrade' }]);
  272. self.emit('upgrade', transport);
  273. transport = null;
  274. self.upgrading = false;
  275. self.flush();
  276. });
  277. } else {
  278. debug('probe transport "%s" failed', name);
  279. var err = new Error('probe error');
  280. err.transport = transport.name;
  281. self.emit('upgradeError', err);
  282. }
  283. });
  284. }
  285. function freezeTransport () {
  286. if (failed) return;
  287. // Any callback called by transport should be ignored since now
  288. failed = true;
  289. cleanup();
  290. transport.close();
  291. transport = null;
  292. }
  293. // Handle any error that happens while probing
  294. function onerror (err) {
  295. var error = new Error('probe error: ' + err);
  296. error.transport = transport.name;
  297. freezeTransport();
  298. debug('probe transport "%s" failed because of error: %s', name, err);
  299. self.emit('upgradeError', error);
  300. }
  301. function onTransportClose () {
  302. onerror('transport closed');
  303. }
  304. // When the socket is closed while we're probing
  305. function onclose () {
  306. onerror('socket closed');
  307. }
  308. // When the socket is upgraded while we're probing
  309. function onupgrade (to) {
  310. if (transport && to.name !== transport.name) {
  311. debug('"%s" works - aborting "%s"', to.name, transport.name);
  312. freezeTransport();
  313. }
  314. }
  315. // Remove all listeners on the transport and on self
  316. function cleanup () {
  317. transport.removeListener('open', onTransportOpen);
  318. transport.removeListener('error', onerror);
  319. transport.removeListener('close', onTransportClose);
  320. self.removeListener('close', onclose);
  321. self.removeListener('upgrading', onupgrade);
  322. }
  323. transport.once('open', onTransportOpen);
  324. transport.once('error', onerror);
  325. transport.once('close', onTransportClose);
  326. this.once('close', onclose);
  327. this.once('upgrading', onupgrade);
  328. transport.open();
  329. };
  330. /**
  331. * Called when connection is deemed open.
  332. *
  333. * @api public
  334. */
  335. Socket.prototype.onOpen = function () {
  336. debug('socket open');
  337. this.readyState = 'open';
  338. Socket.priorWebsocketSuccess = 'websocket' === this.transport.name;
  339. this.emit('open');
  340. this.flush();
  341. // we check for `readyState` in case an `open`
  342. // listener already closed the socket
  343. if ('open' === this.readyState && this.upgrade && this.transport.pause) {
  344. debug('starting upgrade probes');
  345. for (var i = 0, l = this.upgrades.length; i < l; i++) {
  346. this.probe(this.upgrades[i]);
  347. }
  348. }
  349. };
  350. /**
  351. * Handles a packet.
  352. *
  353. * @api private
  354. */
  355. Socket.prototype.onPacket = function (packet) {
  356. if ('opening' === this.readyState || 'open' === this.readyState ||
  357. 'closing' === this.readyState) {
  358. debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
  359. this.emit('packet', packet);
  360. // Socket is live - any packet counts
  361. this.emit('heartbeat');
  362. switch (packet.type) {
  363. case 'open':
  364. this.onHandshake(parsejson(packet.data));
  365. break;
  366. case 'pong':
  367. this.setPing();
  368. this.emit('pong');
  369. break;
  370. case 'error':
  371. var err = new Error('server error');
  372. err.code = packet.data;
  373. this.onError(err);
  374. break;
  375. case 'message':
  376. this.emit('data', packet.data);
  377. this.emit('message', packet.data);
  378. break;
  379. }
  380. } else {
  381. debug('packet received with socket readyState "%s"', this.readyState);
  382. }
  383. };
  384. /**
  385. * Called upon handshake completion.
  386. *
  387. * @param {Object} handshake obj
  388. * @api private
  389. */
  390. Socket.prototype.onHandshake = function (data) {
  391. this.emit('handshake', data);
  392. this.id = data.sid;
  393. this.transport.query.sid = data.sid;
  394. this.upgrades = this.filterUpgrades(data.upgrades);
  395. this.pingInterval = data.pingInterval;
  396. this.pingTimeout = data.pingTimeout;
  397. this.onOpen();
  398. // In case open handler closes socket
  399. if ('closed' === this.readyState) return;
  400. this.setPing();
  401. // Prolong liveness of socket on heartbeat
  402. this.removeListener('heartbeat', this.onHeartbeat);
  403. this.on('heartbeat', this.onHeartbeat);
  404. };
  405. /**
  406. * Resets ping timeout.
  407. *
  408. * @api private
  409. */
  410. Socket.prototype.onHeartbeat = function (timeout) {
  411. clearTimeout(this.pingTimeoutTimer);
  412. var self = this;
  413. self.pingTimeoutTimer = setTimeout(function () {
  414. if ('closed' === self.readyState) return;
  415. self.onClose('ping timeout');
  416. }, timeout || (self.pingInterval + self.pingTimeout));
  417. };
  418. /**
  419. * Pings server every `this.pingInterval` and expects response
  420. * within `this.pingTimeout` or closes connection.
  421. *
  422. * @api private
  423. */
  424. Socket.prototype.setPing = function () {
  425. var self = this;
  426. clearTimeout(self.pingIntervalTimer);
  427. self.pingIntervalTimer = setTimeout(function () {
  428. debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
  429. self.ping();
  430. self.onHeartbeat(self.pingTimeout);
  431. }, self.pingInterval);
  432. };
  433. /**
  434. * Sends a ping packet.
  435. *
  436. * @api private
  437. */
  438. Socket.prototype.ping = function () {
  439. var self = this;
  440. this.sendPacket('ping', function () {
  441. self.emit('ping');
  442. });
  443. };
  444. /**
  445. * Called on `drain` event
  446. *
  447. * @api private
  448. */
  449. Socket.prototype.onDrain = function () {
  450. this.writeBuffer.splice(0, this.prevBufferLen);
  451. // setting prevBufferLen = 0 is very important
  452. // for example, when upgrading, upgrade packet is sent over,
  453. // and a nonzero prevBufferLen could cause problems on `drain`
  454. this.prevBufferLen = 0;
  455. if (0 === this.writeBuffer.length) {
  456. this.emit('drain');
  457. } else {
  458. this.flush();
  459. }
  460. };
  461. /**
  462. * Flush write buffers.
  463. *
  464. * @api private
  465. */
  466. Socket.prototype.flush = function () {
  467. if ('closed' !== this.readyState && this.transport.writable &&
  468. !this.upgrading && this.writeBuffer.length) {
  469. debug('flushing %d packets in socket', this.writeBuffer.length);
  470. this.transport.send(this.writeBuffer);
  471. // keep track of current length of writeBuffer
  472. // splice writeBuffer and callbackBuffer on `drain`
  473. this.prevBufferLen = this.writeBuffer.length;
  474. this.emit('flush');
  475. }
  476. };
  477. /**
  478. * Sends a message.
  479. *
  480. * @param {String} message.
  481. * @param {Function} callback function.
  482. * @param {Object} options.
  483. * @return {Socket} for chaining.
  484. * @api public
  485. */
  486. Socket.prototype.write =
  487. Socket.prototype.send = function (msg, options, fn) {
  488. this.sendPacket('message', msg, options, fn);
  489. return this;
  490. };
  491. /**
  492. * Sends a packet.
  493. *
  494. * @param {String} packet type.
  495. * @param {String} data.
  496. * @param {Object} options.
  497. * @param {Function} callback function.
  498. * @api private
  499. */
  500. Socket.prototype.sendPacket = function (type, data, options, fn) {
  501. if ('function' === typeof data) {
  502. fn = data;
  503. data = undefined;
  504. }
  505. if ('function' === typeof options) {
  506. fn = options;
  507. options = null;
  508. }
  509. if ('closing' === this.readyState || 'closed' === this.readyState) {
  510. return;
  511. }
  512. options = options || {};
  513. options.compress = false !== options.compress;
  514. var packet = {
  515. type: type,
  516. data: data,
  517. options: options
  518. };
  519. this.emit('packetCreate', packet);
  520. this.writeBuffer.push(packet);
  521. if (fn) this.once('flush', fn);
  522. this.flush();
  523. };
  524. /**
  525. * Closes the connection.
  526. *
  527. * @api private
  528. */
  529. Socket.prototype.close = function () {
  530. if ('opening' === this.readyState || 'open' === this.readyState) {
  531. this.readyState = 'closing';
  532. var self = this;
  533. if (this.writeBuffer.length) {
  534. this.once('drain', function () {
  535. if (this.upgrading) {
  536. waitForUpgrade();
  537. } else {
  538. close();
  539. }
  540. });
  541. } else if (this.upgrading) {
  542. waitForUpgrade();
  543. } else {
  544. close();
  545. }
  546. }
  547. function close () {
  548. self.onClose('forced close');
  549. debug('socket closing - telling transport to close');
  550. self.transport.close();
  551. }
  552. function cleanupAndClose () {
  553. self.removeListener('upgrade', cleanupAndClose);
  554. self.removeListener('upgradeError', cleanupAndClose);
  555. close();
  556. }
  557. function waitForUpgrade () {
  558. // wait for upgrade to finish since we can't send packets while pausing a transport
  559. self.once('upgrade', cleanupAndClose);
  560. self.once('upgradeError', cleanupAndClose);
  561. }
  562. return this;
  563. };
  564. /**
  565. * Called upon transport error
  566. *
  567. * @api private
  568. */
  569. Socket.prototype.onError = function (err) {
  570. debug('socket error %j', err);
  571. Socket.priorWebsocketSuccess = false;
  572. this.emit('error', err);
  573. this.onClose('transport error', err);
  574. };
  575. /**
  576. * Called upon transport close.
  577. *
  578. * @api private
  579. */
  580. Socket.prototype.onClose = function (reason, desc) {
  581. if ('opening' === this.readyState || 'open' === this.readyState || 'closing' === this.readyState) {
  582. debug('socket close with reason: "%s"', reason);
  583. var self = this;
  584. // clear timers
  585. clearTimeout(this.pingIntervalTimer);
  586. clearTimeout(this.pingTimeoutTimer);
  587. // stop event from firing again for transport
  588. this.transport.removeAllListeners('close');
  589. // ensure transport won't stay open
  590. this.transport.close();
  591. // ignore further transport communication
  592. this.transport.removeAllListeners();
  593. // set ready state
  594. this.readyState = 'closed';
  595. // clear session id
  596. this.id = null;
  597. // emit close event
  598. this.emit('close', reason, desc);
  599. // clean buffers after, so users can still
  600. // grab the buffers on `close` event
  601. self.writeBuffer = [];
  602. self.prevBufferLen = 0;
  603. }
  604. };
  605. /**
  606. * Filters upgrades, returning only those matching client transports.
  607. *
  608. * @param {Array} server upgrades
  609. * @api private
  610. *
  611. */
  612. Socket.prototype.filterUpgrades = function (upgrades) {
  613. var filteredUpgrades = [];
  614. for (var i = 0, j = upgrades.length; i < j; i++) {
  615. if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]);
  616. }
  617. return filteredUpgrades;
  618. };