/**
|
|
* Module dependencies.
|
|
*/
|
|
|
|
var transports = require('./transports');
|
|
var Emitter = require('component-emitter');
|
|
var debug = require('debug')('engine.io-client:socket');
|
|
var index = require('indexof');
|
|
var parser = require('engine.io-parser');
|
|
var parseuri = require('parseuri');
|
|
var parsejson = require('parsejson');
|
|
var parseqs = require('parseqs');
|
|
|
|
/**
|
|
* Module exports.
|
|
*/
|
|
|
|
module.exports = Socket;
|
|
|
|
/**
|
|
* Noop function.
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
function noop(){}
|
|
|
|
/**
|
|
* Socket constructor.
|
|
*
|
|
* @param {String|Object} uri or options
|
|
* @param {Object} options
|
|
* @api public
|
|
*/
|
|
|
|
function Socket(uri, opts){
|
|
if (!(this instanceof Socket)) return new Socket(uri, opts);
|
|
|
|
opts = opts || {};
|
|
|
|
if (uri && 'object' == typeof uri) {
|
|
opts = uri;
|
|
uri = null;
|
|
}
|
|
|
|
if (uri) {
|
|
uri = parseuri(uri);
|
|
opts.host = uri.host;
|
|
opts.secure = uri.protocol == 'https' || uri.protocol == 'wss';
|
|
opts.port = uri.port;
|
|
if (uri.query) opts.query = uri.query;
|
|
}
|
|
|
|
this.secure = null != opts.secure ? opts.secure :
|
|
(global.location && 'https:' == location.protocol);
|
|
|
|
if (opts.host) {
|
|
var pieces = opts.host.split(':');
|
|
opts.hostname = pieces.shift();
|
|
if (pieces.length) opts.port = pieces.pop();
|
|
}
|
|
|
|
this.agent = opts.agent || false;
|
|
this.hostname = opts.hostname ||
|
|
(global.location ? location.hostname : 'localhost');
|
|
this.port = opts.port || (global.location && location.port ?
|
|
location.port :
|
|
(this.secure ? 443 : 80));
|
|
this.query = opts.query || {};
|
|
if ('string' == typeof this.query) this.query = parseqs.decode(this.query);
|
|
this.upgrade = false !== opts.upgrade;
|
|
this.path = (opts.path || '/engine.io').replace(/\/$/, '') + '/';
|
|
this.forceJSONP = !!opts.forceJSONP;
|
|
this.jsonp = false !== opts.jsonp;
|
|
this.forceBase64 = !!opts.forceBase64;
|
|
this.enablesXDR = !!opts.enablesXDR;
|
|
this.timestampParam = opts.timestampParam || 't';
|
|
this.timestampRequests = opts.timestampRequests;
|
|
this.transports = opts.transports || ['polling', 'websocket'];
|
|
this.readyState = '';
|
|
this.writeBuffer = [];
|
|
this.callbackBuffer = [];
|
|
this.policyPort = opts.policyPort || 843;
|
|
this.rememberUpgrade = opts.rememberUpgrade || false;
|
|
this.open();
|
|
this.binaryType = null;
|
|
this.onlyBinaryUpgrades = opts.onlyBinaryUpgrades;
|
|
}
|
|
|
|
Socket.priorWebsocketSuccess = false;
|
|
|
|
/**
|
|
* Mix in `Emitter`.
|
|
*/
|
|
|
|
Emitter(Socket.prototype);
|
|
|
|
/**
|
|
* Protocol version.
|
|
*
|
|
* @api public
|
|
*/
|
|
|
|
Socket.protocol = parser.protocol; // this is an int
|
|
|
|
/**
|
|
* Expose deps for legacy compatibility
|
|
* and standalone browser access.
|
|
*/
|
|
|
|
Socket.Socket = Socket;
|
|
Socket.Transport = require('./transport');
|
|
Socket.transports = require('./transports');
|
|
Socket.parser = require('engine.io-parser');
|
|
|
|
/**
|
|
* Creates transport of the given type.
|
|
*
|
|
* @param {String} transport name
|
|
* @return {Transport}
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.createTransport = function (name) {
|
|
debug('creating transport "%s"', name);
|
|
var query = clone(this.query);
|
|
|
|
// append engine.io protocol identifier
|
|
query.EIO = parser.protocol;
|
|
|
|
// transport name
|
|
query.transport = name;
|
|
|
|
// session id if we already have one
|
|
if (this.id) query.sid = this.id;
|
|
|
|
var transport = new transports[name]({
|
|
agent: this.agent,
|
|
hostname: this.hostname,
|
|
port: this.port,
|
|
secure: this.secure,
|
|
path: this.path,
|
|
query: query,
|
|
forceJSONP: this.forceJSONP,
|
|
jsonp: this.jsonp,
|
|
forceBase64: this.forceBase64,
|
|
enablesXDR: this.enablesXDR,
|
|
timestampRequests: this.timestampRequests,
|
|
timestampParam: this.timestampParam,
|
|
policyPort: this.policyPort,
|
|
socket: this
|
|
});
|
|
|
|
return transport;
|
|
};
|
|
|
|
function clone (obj) {
|
|
var o = {};
|
|
for (var i in obj) {
|
|
if (obj.hasOwnProperty(i)) {
|
|
o[i] = obj[i];
|
|
}
|
|
}
|
|
return o;
|
|
}
|
|
|
|
/**
|
|
* Initializes transport to use and starts probe.
|
|
*
|
|
* @api private
|
|
*/
|
|
Socket.prototype.open = function () {
|
|
var transport;
|
|
if (this.rememberUpgrade && Socket.priorWebsocketSuccess && this.transports.indexOf('websocket') != -1) {
|
|
transport = 'websocket';
|
|
} else if (0 == this.transports.length) {
|
|
// Emit error on next tick so it can be listened to
|
|
var self = this;
|
|
setTimeout(function() {
|
|
self.emit('error', 'No transports available');
|
|
}, 0);
|
|
return;
|
|
} else {
|
|
transport = this.transports[0];
|
|
}
|
|
this.readyState = 'opening';
|
|
|
|
// Retry with the next transport if the transport is disabled (jsonp: false)
|
|
var transport;
|
|
try {
|
|
transport = this.createTransport(transport);
|
|
} catch (e) {
|
|
this.transports.shift();
|
|
this.open();
|
|
return;
|
|
}
|
|
|
|
transport.open();
|
|
this.setTransport(transport);
|
|
};
|
|
|
|
/**
|
|
* Sets the current transport. Disables the existing one (if any).
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.setTransport = function(transport){
|
|
debug('setting transport %s', transport.name);
|
|
var self = this;
|
|
|
|
if (this.transport) {
|
|
debug('clearing existing transport %s', this.transport.name);
|
|
this.transport.removeAllListeners();
|
|
}
|
|
|
|
// set up transport
|
|
this.transport = transport;
|
|
|
|
// set up transport listeners
|
|
transport
|
|
.on('drain', function(){
|
|
self.onDrain();
|
|
})
|
|
.on('packet', function(packet){
|
|
self.onPacket(packet);
|
|
})
|
|
.on('error', function(e){
|
|
self.onError(e);
|
|
})
|
|
.on('close', function(){
|
|
self.onClose('transport close');
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Probes a transport.
|
|
*
|
|
* @param {String} transport name
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.probe = function (name) {
|
|
debug('probing transport "%s"', name);
|
|
var transport = this.createTransport(name, { probe: 1 })
|
|
, failed = false
|
|
, self = this;
|
|
|
|
Socket.priorWebsocketSuccess = false;
|
|
|
|
function onTransportOpen(){
|
|
if (self.onlyBinaryUpgrades) {
|
|
var upgradeLosesBinary = !this.supportsBinary && self.transport.supportsBinary;
|
|
failed = failed || upgradeLosesBinary;
|
|
}
|
|
if (failed) return;
|
|
|
|
debug('probe transport "%s" opened', name);
|
|
transport.send([{ type: 'ping', data: 'probe' }]);
|
|
transport.once('packet', function (msg) {
|
|
if (failed) return;
|
|
if ('pong' == msg.type && 'probe' == msg.data) {
|
|
debug('probe transport "%s" pong', name);
|
|
self.upgrading = true;
|
|
self.emit('upgrading', transport);
|
|
if (!transport) return;
|
|
Socket.priorWebsocketSuccess = 'websocket' == transport.name;
|
|
|
|
debug('pausing current transport "%s"', self.transport.name);
|
|
self.transport.pause(function () {
|
|
if (failed) return;
|
|
if ('closed' == self.readyState) return;
|
|
debug('changing transport and sending upgrade packet');
|
|
|
|
cleanup();
|
|
|
|
self.setTransport(transport);
|
|
transport.send([{ type: 'upgrade' }]);
|
|
self.emit('upgrade', transport);
|
|
transport = null;
|
|
self.upgrading = false;
|
|
self.flush();
|
|
});
|
|
} else {
|
|
debug('probe transport "%s" failed', name);
|
|
var err = new Error('probe error');
|
|
err.transport = transport.name;
|
|
self.emit('upgradeError', err);
|
|
}
|
|
});
|
|
}
|
|
|
|
function freezeTransport() {
|
|
if (failed) return;
|
|
|
|
// Any callback called by transport should be ignored since now
|
|
failed = true;
|
|
|
|
cleanup();
|
|
|
|
transport.close();
|
|
transport = null;
|
|
}
|
|
|
|
//Handle any error that happens while probing
|
|
function onerror(err) {
|
|
var error = new Error('probe error: ' + err);
|
|
error.transport = transport.name;
|
|
|
|
freezeTransport();
|
|
|
|
debug('probe transport "%s" failed because of error: %s', name, err);
|
|
|
|
self.emit('upgradeError', error);
|
|
}
|
|
|
|
function onTransportClose(){
|
|
onerror("transport closed");
|
|
}
|
|
|
|
//When the socket is closed while we're probing
|
|
function onclose(){
|
|
onerror("socket closed");
|
|
}
|
|
|
|
//When the socket is upgraded while we're probing
|
|
function onupgrade(to){
|
|
if (transport && to.name != transport.name) {
|
|
debug('"%s" works - aborting "%s"', to.name, transport.name);
|
|
freezeTransport();
|
|
}
|
|
}
|
|
|
|
//Remove all listeners on the transport and on self
|
|
function cleanup(){
|
|
transport.removeListener('open', onTransportOpen);
|
|
transport.removeListener('error', onerror);
|
|
transport.removeListener('close', onTransportClose);
|
|
self.removeListener('close', onclose);
|
|
self.removeListener('upgrading', onupgrade);
|
|
}
|
|
|
|
transport.once('open', onTransportOpen);
|
|
transport.once('error', onerror);
|
|
transport.once('close', onTransportClose);
|
|
|
|
this.once('close', onclose);
|
|
this.once('upgrading', onupgrade);
|
|
|
|
transport.open();
|
|
|
|
};
|
|
|
|
/**
|
|
* Called when connection is deemed open.
|
|
*
|
|
* @api public
|
|
*/
|
|
|
|
Socket.prototype.onOpen = function () {
|
|
debug('socket open');
|
|
this.readyState = 'open';
|
|
Socket.priorWebsocketSuccess = 'websocket' == this.transport.name;
|
|
this.emit('open');
|
|
this.flush();
|
|
|
|
// we check for `readyState` in case an `open`
|
|
// listener already closed the socket
|
|
if ('open' == this.readyState && this.upgrade && this.transport.pause) {
|
|
debug('starting upgrade probes');
|
|
for (var i = 0, l = this.upgrades.length; i < l; i++) {
|
|
this.probe(this.upgrades[i]);
|
|
}
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Handles a packet.
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.onPacket = function (packet) {
|
|
if ('opening' == this.readyState || 'open' == this.readyState) {
|
|
debug('socket receive: type "%s", data "%s"', packet.type, packet.data);
|
|
|
|
this.emit('packet', packet);
|
|
|
|
// Socket is live - any packet counts
|
|
this.emit('heartbeat');
|
|
|
|
switch (packet.type) {
|
|
case 'open':
|
|
this.onHandshake(parsejson(packet.data));
|
|
break;
|
|
|
|
case 'pong':
|
|
this.setPing();
|
|
break;
|
|
|
|
case 'error':
|
|
var err = new Error('server error');
|
|
err.code = packet.data;
|
|
this.emit('error', err);
|
|
break;
|
|
|
|
case 'message':
|
|
this.emit('data', packet.data);
|
|
this.emit('message', packet.data);
|
|
break;
|
|
}
|
|
} else {
|
|
debug('packet received with socket readyState "%s"', this.readyState);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Called upon handshake completion.
|
|
*
|
|
* @param {Object} handshake obj
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.onHandshake = function (data) {
|
|
this.emit('handshake', data);
|
|
this.id = data.sid;
|
|
this.transport.query.sid = data.sid;
|
|
this.upgrades = this.filterUpgrades(data.upgrades);
|
|
this.pingInterval = data.pingInterval;
|
|
this.pingTimeout = data.pingTimeout;
|
|
this.onOpen();
|
|
// In case open handler closes socket
|
|
if ('closed' == this.readyState) return;
|
|
this.setPing();
|
|
|
|
// Prolong liveness of socket on heartbeat
|
|
this.removeListener('heartbeat', this.onHeartbeat);
|
|
this.on('heartbeat', this.onHeartbeat);
|
|
};
|
|
|
|
/**
|
|
* Resets ping timeout.
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.onHeartbeat = function (timeout) {
|
|
clearTimeout(this.pingTimeoutTimer);
|
|
var self = this;
|
|
self.pingTimeoutTimer = setTimeout(function () {
|
|
if ('closed' == self.readyState) return;
|
|
self.onClose('ping timeout');
|
|
}, timeout || (self.pingInterval + self.pingTimeout));
|
|
};
|
|
|
|
/**
|
|
* Pings server every `this.pingInterval` and expects response
|
|
* within `this.pingTimeout` or closes connection.
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.setPing = function () {
|
|
var self = this;
|
|
clearTimeout(self.pingIntervalTimer);
|
|
self.pingIntervalTimer = setTimeout(function () {
|
|
debug('writing ping packet - expecting pong within %sms', self.pingTimeout);
|
|
self.ping();
|
|
self.onHeartbeat(self.pingTimeout);
|
|
}, self.pingInterval);
|
|
};
|
|
|
|
/**
|
|
* Sends a ping packet.
|
|
*
|
|
* @api public
|
|
*/
|
|
|
|
Socket.prototype.ping = function () {
|
|
this.sendPacket('ping');
|
|
};
|
|
|
|
/**
|
|
* Called on `drain` event
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.onDrain = function() {
|
|
for (var i = 0; i < this.prevBufferLen; i++) {
|
|
if (this.callbackBuffer[i]) {
|
|
this.callbackBuffer[i]();
|
|
}
|
|
}
|
|
|
|
this.writeBuffer.splice(0, this.prevBufferLen);
|
|
this.callbackBuffer.splice(0, this.prevBufferLen);
|
|
|
|
// setting prevBufferLen = 0 is very important
|
|
// for example, when upgrading, upgrade packet is sent over,
|
|
// and a nonzero prevBufferLen could cause problems on `drain`
|
|
this.prevBufferLen = 0;
|
|
|
|
if (this.writeBuffer.length == 0) {
|
|
this.emit('drain');
|
|
} else {
|
|
this.flush();
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Flush write buffers.
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.flush = function () {
|
|
if ('closed' != this.readyState && this.transport.writable &&
|
|
!this.upgrading && this.writeBuffer.length) {
|
|
debug('flushing %d packets in socket', this.writeBuffer.length);
|
|
this.transport.send(this.writeBuffer);
|
|
// keep track of current length of writeBuffer
|
|
// splice writeBuffer and callbackBuffer on `drain`
|
|
this.prevBufferLen = this.writeBuffer.length;
|
|
this.emit('flush');
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Sends a message.
|
|
*
|
|
* @param {String} message.
|
|
* @param {Function} callback function.
|
|
* @return {Socket} for chaining.
|
|
* @api public
|
|
*/
|
|
|
|
Socket.prototype.write =
|
|
Socket.prototype.send = function (msg, fn) {
|
|
this.sendPacket('message', msg, fn);
|
|
return this;
|
|
};
|
|
|
|
/**
|
|
* Sends a packet.
|
|
*
|
|
* @param {String} packet type.
|
|
* @param {String} data.
|
|
* @param {Function} callback function.
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.sendPacket = function (type, data, fn) {
|
|
if ('closing' == this.readyState || 'closed' == this.readyState) {
|
|
return;
|
|
}
|
|
|
|
var packet = { type: type, data: data };
|
|
this.emit('packetCreate', packet);
|
|
this.writeBuffer.push(packet);
|
|
this.callbackBuffer.push(fn);
|
|
this.flush();
|
|
};
|
|
|
|
/**
|
|
* Closes the connection.
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.close = function () {
|
|
if ('opening' == this.readyState || 'open' == this.readyState) {
|
|
this.readyState = 'closing';
|
|
|
|
var self = this;
|
|
|
|
function close() {
|
|
self.onClose('forced close');
|
|
debug('socket closing - telling transport to close');
|
|
self.transport.close();
|
|
}
|
|
|
|
function cleanupAndClose() {
|
|
self.removeListener('upgrade', cleanupAndClose);
|
|
self.removeListener('upgradeError', cleanupAndClose);
|
|
close();
|
|
}
|
|
|
|
function waitForUpgrade() {
|
|
// wait for upgrade to finish since we can't send packets while pausing a transport
|
|
self.once('upgrade', cleanupAndClose);
|
|
self.once('upgradeError', cleanupAndClose);
|
|
}
|
|
|
|
if (this.writeBuffer.length) {
|
|
this.once('drain', function() {
|
|
if (this.upgrading) {
|
|
waitForUpgrade();
|
|
} else {
|
|
close();
|
|
}
|
|
});
|
|
} else if (this.upgrading) {
|
|
waitForUpgrade();
|
|
} else {
|
|
close();
|
|
}
|
|
}
|
|
|
|
return this;
|
|
};
|
|
|
|
/**
|
|
* Called upon transport error
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.onError = function (err) {
|
|
debug('socket error %j', err);
|
|
Socket.priorWebsocketSuccess = false;
|
|
this.emit('error', err);
|
|
this.onClose('transport error', err);
|
|
};
|
|
|
|
/**
|
|
* Called upon transport close.
|
|
*
|
|
* @api private
|
|
*/
|
|
|
|
Socket.prototype.onClose = function (reason, desc) {
|
|
if ('opening' == this.readyState || 'open' == this.readyState || 'closing' == this.readyState) {
|
|
debug('socket close with reason: "%s"', reason);
|
|
var self = this;
|
|
|
|
// clear timers
|
|
clearTimeout(this.pingIntervalTimer);
|
|
clearTimeout(this.pingTimeoutTimer);
|
|
|
|
// clean buffers in next tick, so developers can still
|
|
// grab the buffers on `close` event
|
|
setTimeout(function() {
|
|
self.writeBuffer = [];
|
|
self.callbackBuffer = [];
|
|
self.prevBufferLen = 0;
|
|
}, 0);
|
|
|
|
// stop event from firing again for transport
|
|
this.transport.removeAllListeners('close');
|
|
|
|
// ensure transport won't stay open
|
|
this.transport.close();
|
|
|
|
// ignore further transport communication
|
|
this.transport.removeAllListeners();
|
|
|
|
// set ready state
|
|
this.readyState = 'closed';
|
|
|
|
// clear session id
|
|
this.id = null;
|
|
|
|
// emit close event
|
|
this.emit('close', reason, desc);
|
|
}
|
|
};
|
|
|
|
/**
|
|
* Filters upgrades, returning only those matching client transports.
|
|
*
|
|
* @param {Array} server upgrades
|
|
* @api private
|
|
*
|
|
*/
|
|
|
|
Socket.prototype.filterUpgrades = function (upgrades) {
|
|
var filteredUpgrades = [];
|
|
for (var i = 0, j = upgrades.length; i<j; i++) {
|
|
if (~index(this.transports, upgrades[i])) filteredUpgrades.push(upgrades[i]);
|
|
}
|
|
return filteredUpgrades;
|
|
};
|