You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

683 lines
16 KiB

  1. /**
  2. * Module dependencies.
  3. */
  4. var transports = require('./transports');
  5. var Emitter = require('component-emitter');
  6. var debug = require('debug')('engine.io-client:socket');
  7. var index = require('indexof');
  8. var parser = require('engine.io-parser');
  9. var parseuri = require('parseuri');
  10. var parsejson = require('parsejson');
  11. var parseqs = require('parseqs');
  12. /**
  13. * Module exports.
  14. */
  15. module.exports = Socket;
  16. /**
  17. * Noop function.
  18. *
  19. * @api private
  20. */
  21. function noop(){}
  22. /**
  23. * Socket constructor.
  24. *
  25. * @param {String|Object} uri or options
  26. * @param {Object} options
  27. * @api public
  28. */
  29. function Socket(uri, opts){
  30. if (!(this instanceof Socket)) return new Socket(uri, opts);
  31. opts = opts || {};
  32. if (uri && 'object' == typeof uri) {
  33. opts = uri;
  34. uri = null;
  35. }
  36. if (uri) {
  37. uri = parseuri(uri);
  38. opts.host = uri.host;
  39. opts.secure = uri.protocol == 'https' || uri.protocol == 'wss';
  40. opts.port = uri.port;
  41. if (uri.query) opts.query = uri.query;
  42. }
  43. this.secure = null != opts.secure ? opts.secure :
  44. (global.location && 'https:' == location.protocol);
  45. if (opts.host) {
  46. var pieces = opts.host.split(':');
  47. opts.hostname = pieces.shift();
  48. if (pieces.length) opts.port = pieces.pop();
  49. }
  50. this.agent = opts.agent || false;
  51. this.hostname = opts.hostname ||
  52. (global.location ? location.hostname : 'localhost');
  53. this.port = opts.port || (global.location && location.port ?
  54. location.port :
  55. (this.secure ? 443 : 80));
  56. this.query = opts.query || {};
  57. if ('string' == typeof this.query) this.query = parseqs.decode(this.query);
  58. this.upgrade = false !== opts.upgrade;
  59. this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/';
  60. this.forceJSONP = !!opts.forceJSONP;
  61. this.jsonp = false !== opts.jsonp;
  62. this.forceBase64 = !!opts.forceBase64;
  63. this.enablesXDR = !!opts.enablesXDR;
  64. this.timestampParam = opts.timestampParam || 't';
  65. this.timestampRequests = opts.timestampRequests;
  66. this.transports = opts.transports || ['polling', 'websocket'];
  67. this.readyState = '';
  68. this.writeBuffer = [];
  69. this.callbackBuffer = [];
  70. this.policyPort = opts.policyPort || 843;
  71. this.rememberUpgrade = opts.rememberUpgrade || false;
  72. this.open();
  73. this.binaryType = null;
  74. this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
  75. }
  76. Socket.priorWebsocketSuccess = false;
  77. /**
  78. * Mix in `Emitter`.
  79. */
  80. Emitter(Socket.prototype);
  81. /**
  82. * Protocol version.
  83. *
  84. * @api public
  85. */
  86. Socket.protocol = parser.protocol; // this is an int
  87. /**
  88. * Expose deps for legacy compatibility
  89. * and standalone browser access.
  90. */
  91. Socket.Socket = Socket;
  92. Socket.Transport = require('./transport');
  93. Socket.transports = require('./transports');
  94. Socket.parser = require('engine.io-parser');
  95. /**
  96. * Creates transport of the given type.
  97. *
  98. * @param {String} transport name
  99. * @return {Transport}
  100. * @api private
  101. */
  102. Socket.prototype.createTransport = function (name) {
  103. debug('creating transport "%s"', name);
  104. var query = clone(this.query);
  105. // append engine.io protocol identifier
  106. query.EIO = parser.protocol;
  107. // transport name
  108. query.transport = name;
  109. // session id if we already have one
  110. if (this.id) query.sid = this.id;
  111. var transport = new transports[name]({
  112. agent: this.agent,
  113. hostname: this.hostname,
  114. port: this.port,
  115. secure: this.secure,
  116. path: this.path,
  117. query: query,
  118. forceJSONP: this.forceJSONP,
  119. jsonp: this.jsonp,
  120. forceBase64: this.forceBase64,
  121. enablesXDR: this.enablesXDR,
  122. timestampRequests: this.timestampRequests,
  123. timestampParam: this.timestampParam,
  124. policyPort: this.policyPort,
  125. socket: this
  126. });
  127. return transport;
  128. };
  129. function clone (obj) {
  130. var o = {};
  131. for (var i in obj) {
  132. if (obj.hasOwnProperty(i)) {
  133. o[i] = obj[i];
  134. }
  135. }
  136. return o;
  137. }
  138. /**
  139. * Initializes transport to use and starts probe.
  140. *
  141. * @api private
  142. */
  143. Socket.prototype.open = function () {
  144. var transport;
  145. if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') != -1) {
  146. transport = 'websocket';
  147. } else if (0 == this.transports.length) {
  148. // Emit error on next tick so it can be listened to
  149. var self = this;
  150. setTimeout(function() {
  151. self.emit('error', 'No transports available');
  152. }, 0);
  153. return;
  154. } else {
  155. transport = this.transports[0];
  156. }
  157. this.readyState = 'opening';
  158. // Retry with the next transport if the transport is disabled (jsonp: false)
  159. var transport;
  160. try {
  161. transport = this.createTransport(transport);
  162. } catch (e) {
  163. this.transports.shift();
  164. this.open();
  165. return;
  166. }
  167. transport.open();
  168. this.setTransport(transport);
  169. };
  170. /**
  171. * Sets the current transport. Disables the existing one (if any).
  172. *
  173. * @api private
  174. */
  175. Socket.prototype.setTransport = function(transport){
  176. debug('setting transport %s', transport.name);
  177. var self = this;
  178. if (this.transport) {
  179. debug('clearing existing transport %s', this.transport.name);
  180. this.transport.removeAllListeners();
  181. }
  182. // set up transport
  183. this.transport = transport;
  184. // set up transport listeners
  185. transport
  186. .on('drain', function(){
  187. self.onDrain();
  188. })
  189. .on('packet', function(packet){
  190. self.onPacket(packet);
  191. })
  192. .on('error', function(e){
  193. self.onError(e);
  194. })
  195. .on('close', function(){
  196. self.onClose('transport close');
  197. });
  198. };
  199. /**
  200. * Probes a transport.
  201. *
  202. * @param {String} transport name
  203. * @api private
  204. */
  205. Socket.prototype.probe = function (name) {
  206. debug('probing transport "%s"', name);
  207. var transport = this.createTransport(name, { probe: 1 })
  208. , failed = false
  209. , self = this;
  210. Socket.priorWebsocketSuccess = false;
  211. function onTransportOpen(){
  212. if (self.onlyBinaryUpgrades) {
  213. var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
  214. failed = failed || upgradeLosesBinary;
  215. }
  216. if (failed) return;
  217. debug('probe transport "%s" opened', name);
  218. transport.send([{ type: 'ping', data: 'probe' }]);
  219. transport.once('packet', function (msg) {
  220. if (failed) return;
  221. if ('pong' == msg.type && 'probe' == msg.data) {
  222. debug('probe transport "%s" pong', name);
  223. self.upgrading = true;
  224. self.emit('upgrading', transport);
  225. if (!transport) return;
  226. Socket.priorWebsocketSuccess = 'websocket' == transport.name;
  227. debug('pausing current transport "%s"', self.transport.name);
  228. self.transport.pause(function () {
  229. if (failed) return;
  230. if ('closed' == self.readyState) return;
  231. debug('changing transport and sending upgrade packet');
  232. cleanup();
  233. self.setTransport(transport);
  234. transport.send([{ type: 'upgrade' }]);
  235. self.emit('upgrade', transport);
  236. transport = null;
  237. self.upgrading = false;
  238. self.flush();
  239. });
  240. } else {
  241. debug('probe transport "%s" failed', name);
  242. var err = new Error('probe error');
  243. err.transport = transport.name;
  244. self.emit('upgradeError', err);
  245. }
  246. });
  247. }
  248. function freezeTransport() {
  249. if (failed) return;
  250. // Any callback called by transport should be ignored since now
  251. failed = true;
  252. cleanup();
  253. transport.close();
  254. transport = null;
  255. }
  256. //Handle any error that happens while probing
  257. function onerror(err) {
  258. var error = new Error('probe error: ' + err);
  259. error.transport = transport.name;
  260. freezeTransport();
  261. debug('probe transport "%s" failed because of error: %s', name, err);
  262. self.emit('upgradeError', error);
  263. }
  264. function onTransportClose(){
  265. onerror("transport closed");
  266. }
  267. //When the socket is closed while we're probing
  268. function onclose(){
  269. onerror("socket closed");
  270. }
  271. //When the socket is upgraded while we're probing
  272. function onupgrade(to){
  273. if (transport && to.name != transport.name) {
  274. debug('"%s" works - aborting "%s"', to.name, transport.name);
  275. freezeTransport();
  276. }
  277. }
  278. //Remove all listeners on the transport and on self
  279. function cleanup(){
  280. transport.removeListener('open', onTransportOpen);
  281. transport.removeListener('error', onerror);
  282. transport.removeListener('close', onTransportClose);
  283. self.removeListener('close', onclose);
  284. self.removeListener('upgrading', onupgrade);
  285. }
  286. transport.once('open', onTransportOpen);
  287. transport.once('error', onerror);
  288. transport.once('close', onTransportClose);
  289. this.once('close', onclose);
  290. this.once('upgrading', onupgrade);
  291. transport.open();
  292. };
  293. /**
  294. * Called when connection is deemed open.
  295. *
  296. * @api public
  297. */
  298. Socket.prototype.onOpen = function () {
  299. debug('socket open');
  300. this.readyState = 'open';
  301. Socket.priorWebsocketSuccess = 'websocket' == this.transport.name;
  302. this.emit('open');
  303. this.flush();
  304. // we check for `readyState` in case an `open`
  305. // listener already closed the socket
  306. if ('open' == this.readyState && this.upgrade && this.transport.pause) {
  307. debug('starting upgrade probes');
  308. for (var i = 0, l = this.upgrades.length; i < l; i++) {
  309. this.probe(this.upgrades[i]);
  310. }
  311. }
  312. };
  313. /**
  314. * Handles a packet.
  315. *
  316. * @api private
  317. */
  318. Socket.prototype.onPacket = function (packet) {
  319. if ('opening' == this.readyState || 'open' == this.readyState) {
  320. debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
  321. this.emit('packet', packet);
  322. // Socket is live - any packet counts
  323. this.emit('heartbeat');
  324. switch (packet.type) {
  325. case 'open':
  326. this.onHandshake(parsejson(packet.data));
  327. break;
  328. case 'pong':
  329. this.setPing();
  330. break;
  331. case 'error':
  332. var err = new Error('server error');
  333. err.code = packet.data;
  334. this.emit('error', err);
  335. break;
  336. case 'message':
  337. this.emit('data', packet.data);
  338. this.emit('message', packet.data);
  339. break;
  340. }
  341. } else {
  342. debug('packet received with socket readyState "%s"', this.readyState);
  343. }
  344. };
  345. /**
  346. * Called upon handshake completion.
  347. *
  348. * @param {Object} handshake obj
  349. * @api private
  350. */
  351. Socket.prototype.onHandshake = function (data) {
  352. this.emit('handshake', data);
  353. this.id = data.sid;
  354. this.transport.query.sid = data.sid;
  355. this.upgrades = this.filterUpgrades(data.upgrades);
  356. this.pingInterval = data.pingInterval;
  357. this.pingTimeout = data.pingTimeout;
  358. this.onOpen();
  359. // In case open handler closes socket
  360. if ('closed' == this.readyState) return;
  361. this.setPing();
  362. // Prolong liveness of socket on heartbeat
  363. this.removeListener('heartbeat', this.onHeartbeat);
  364. this.on('heartbeat', this.onHeartbeat);
  365. };
  366. /**
  367. * Resets ping timeout.
  368. *
  369. * @api private
  370. */
  371. Socket.prototype.onHeartbeat = function (timeout) {
  372. clearTimeout(this.pingTimeoutTimer);
  373. var self = this;
  374. self.pingTimeoutTimer = setTimeout(function () {
  375. if ('closed' == self.readyState) return;
  376. self.onClose('ping timeout');
  377. }, timeout || (self.pingInterval + self.pingTimeout));
  378. };
  379. /**
  380. * Pings server every `this.pingInterval` and expects response
  381. * within `this.pingTimeout` or closes connection.
  382. *
  383. * @api private
  384. */
  385. Socket.prototype.setPing = function () {
  386. var self = this;
  387. clearTimeout(self.pingIntervalTimer);
  388. self.pingIntervalTimer = setTimeout(function () {
  389. debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
  390. self.ping();
  391. self.onHeartbeat(self.pingTimeout);
  392. }, self.pingInterval);
  393. };
  394. /**
  395. * Sends a ping packet.
  396. *
  397. * @api public
  398. */
  399. Socket.prototype.ping = function () {
  400. this.sendPacket('ping');
  401. };
  402. /**
  403. * Called on `drain` event
  404. *
  405. * @api private
  406. */
  407. Socket.prototype.onDrain = function() {
  408. for (var i = 0; i < this.prevBufferLen; i++) {
  409. if (this.callbackBuffer[i]) {
  410. this.callbackBuffer[i]();
  411. }
  412. }
  413. this.writeBuffer.splice(0, this.prevBufferLen);
  414. this.callbackBuffer.splice(0, this.prevBufferLen);
  415. // setting prevBufferLen = 0 is very important
  416. // for example, when upgrading, upgrade packet is sent over,
  417. // and a nonzero prevBufferLen could cause problems on `drain`
  418. this.prevBufferLen = 0;
  419. if (this.writeBuffer.length == 0) {
  420. this.emit('drain');
  421. } else {
  422. this.flush();
  423. }
  424. };
  425. /**
  426. * Flush write buffers.
  427. *
  428. * @api private
  429. */
  430. Socket.prototype.flush = function () {
  431. if ('closed' != this.readyState && this.transport.writable &&
  432. !this.upgrading && this.writeBuffer.length) {
  433. debug('flushing %d packets in socket', this.writeBuffer.length);
  434. this.transport.send(this.writeBuffer);
  435. // keep track of current length of writeBuffer
  436. // splice writeBuffer and callbackBuffer on `drain`
  437. this.prevBufferLen = this.writeBuffer.length;
  438. this.emit('flush');
  439. }
  440. };
  441. /**
  442. * Sends a message.
  443. *
  444. * @param {String} message.
  445. * @param {Function} callback function.
  446. * @return {Socket} for chaining.
  447. * @api public
  448. */
  449. Socket.prototype.write =
  450. Socket.prototype.send = function (msg, fn) {
  451. this.sendPacket('message', msg, fn);
  452. return this;
  453. };
  454. /**
  455. * Sends a packet.
  456. *
  457. * @param {String} packet type.
  458. * @param {String} data.
  459. * @param {Function} callback function.
  460. * @api private
  461. */
  462. Socket.prototype.sendPacket = function (type, data, fn) {
  463. if ('closing' == this.readyState || 'closed' == this.readyState) {
  464. return;
  465. }
  466. var packet = { type: type, data: data };
  467. this.emit('packetCreate', packet);
  468. this.writeBuffer.push(packet);
  469. this.callbackBuffer.push(fn);
  470. this.flush();
  471. };
  472. /**
  473. * Closes the connection.
  474. *
  475. * @api private
  476. */
  477. Socket.prototype.close = function () {
  478. if ('opening' == this.readyState || 'open' == this.readyState) {
  479. this.readyState = 'closing';
  480. var self = this;
  481. function close() {
  482. self.onClose('forced close');
  483. debug('socket closing - telling transport to close');
  484. self.transport.close();
  485. }
  486. function cleanupAndClose() {
  487. self.removeListener('upgrade', cleanupAndClose);
  488. self.removeListener('upgradeError', cleanupAndClose);
  489. close();
  490. }
  491. function waitForUpgrade() {
  492. // wait for upgrade to finish since we can't send packets while pausing a transport
  493. self.once('upgrade', cleanupAndClose);
  494. self.once('upgradeError', cleanupAndClose);
  495. }
  496. if (this.writeBuffer.length) {
  497. this.once('drain', function() {
  498. if (this.upgrading) {
  499. waitForUpgrade();
  500. } else {
  501. close();
  502. }
  503. });
  504. } else if (this.upgrading) {
  505. waitForUpgrade();
  506. } else {
  507. close();
  508. }
  509. }
  510. return this;
  511. };
  512. /**
  513. * Called upon transport error
  514. *
  515. * @api private
  516. */
  517. Socket.prototype.onError = function (err) {
  518. debug('socket error %j', err);
  519. Socket.priorWebsocketSuccess = false;
  520. this.emit('error', err);
  521. this.onClose('transport error', err);
  522. };
  523. /**
  524. * Called upon transport close.
  525. *
  526. * @api private
  527. */
  528. Socket.prototype.onClose = function (reason, desc) {
  529. if ('opening' == this.readyState || 'open' == this.readyState || 'closing' == this.readyState) {
  530. debug('socket close with reason: "%s"', reason);
  531. var self = this;
  532. // clear timers
  533. clearTimeout(this.pingIntervalTimer);
  534. clearTimeout(this.pingTimeoutTimer);
  535. // clean buffers in next tick, so developers can still
  536. // grab the buffers on `close` event
  537. setTimeout(function() {
  538. self.writeBuffer = [];
  539. self.callbackBuffer = [];
  540. self.prevBufferLen = 0;
  541. }, 0);
  542. // stop event from firing again for transport
  543. this.transport.removeAllListeners('close');
  544. // ensure transport won't stay open
  545. this.transport.close();
  546. // ignore further transport communication
  547. this.transport.removeAllListeners();
  548. // set ready state
  549. this.readyState = 'closed';
  550. // clear session id
  551. this.id = null;
  552. // emit close event
  553. this.emit('close', reason, desc);
  554. }
  555. };
  556. /**
  557. * Filters upgrades, returning only those matching client transports.
  558. *
  559. * @param {Array} server upgrades
  560. * @api private
  561. *
  562. */
  563. Socket.prototype.filterUpgrades = function (upgrades) {
  564. var filteredUpgrades = [];
  565. for (var i = 0, j = upgrades.length; i<j; i++) {
  566. if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]);
  567. }
  568. return filteredUpgrades;
  569. };