|
|
'use strict';
/*! * ws: a node.js websocket client * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com> * MIT Licensed */
var url = require('url') , util = require('util') , http = require('http') , https = require('https') , crypto = require('crypto') , stream = require('stream') , Ultron = require('ultron') , Options = require('options') , Sender = require('./Sender') , Receiver = require('./Receiver') , SenderHixie = require('./Sender.hixie') , ReceiverHixie = require('./Receiver.hixie') , Extensions = require('./Extensions') , PerMessageDeflate = require('./PerMessageDeflate') , EventEmitter = require('events').EventEmitter;
/** * Constants */
// Default protocol version
var protocolVersion = 13;
// Close timeout
var closeTimeout = 30 * 1000; // Allow 30 seconds to terminate the connection cleanly
/** * WebSocket implementation * * @constructor * @param {String} address Connection address. * @param {String|Array} protocols WebSocket protocols. * @param {Object} options Additional connection options. * @api public */ function WebSocket(address, protocols, options) { if (this instanceof WebSocket === false) { return new WebSocket(address, protocols, options); }
EventEmitter.call(this);
if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) { // accept the "options" Object as the 2nd argument
options = protocols; protocols = null; }
if ('string' === typeof protocols) { protocols = [ protocols ]; }
if (!Array.isArray(protocols)) { protocols = []; }
this._socket = null; this._ultron = null; this._closeReceived = false; this.bytesReceived = 0; this.readyState = null; this.supports = {}; this.extensions = {}; this._binaryType = 'nodebuffer';
if (Array.isArray(address)) { initAsServerClient.apply(this, address.concat(options)); } else { initAsClient.apply(this, [address, protocols, options]); } }
/** * Inherits from EventEmitter. */ util.inherits(WebSocket, EventEmitter);
/** * Ready States */ ["CONNECTING", "OPEN", "CLOSING", "CLOSED"].forEach(function each(state, index) { WebSocket.prototype[state] = WebSocket[state] = index; });
/** * Gracefully closes the connection, after sending a description message to the server * * @param {Object} data to be sent to the server * @api public */ WebSocket.prototype.close = function close(code, data) { if (this.readyState === WebSocket.CLOSED) return;
if (this.readyState === WebSocket.CONNECTING) { this.readyState = WebSocket.CLOSED; return; }
if (this.readyState === WebSocket.CLOSING) { if (this._closeReceived && this._isServer) { this.terminate(); } return; }
var self = this; try { this.readyState = WebSocket.CLOSING; this._closeCode = code; this._closeMessage = data; var mask = !this._isServer; this._sender.close(code, data, mask, function(err) { if (err) self.emit('error', err);
if (self._closeReceived && self._isServer) { self.terminate(); } else { // ensure that the connection is cleaned up even when no response of closing handshake.
clearTimeout(self._closeTimer); self._closeTimer = setTimeout(cleanupWebsocketResources.bind(self, true), closeTimeout); } }); } catch (e) { this.emit('error', e); } };
/** * Pause the client stream * * @api public */ WebSocket.prototype.pause = function pauser() { if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
return this._socket.pause(); };
/** * Sends a ping * * @param {Object} data to be sent to the server * @param {Object} Members - mask: boolean, binary: boolean * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open * @api public */ WebSocket.prototype.ping = function ping(data, options, dontFailWhenClosed) { if (this.readyState !== WebSocket.OPEN) { if (dontFailWhenClosed === true) return; throw new Error('not opened'); }
options = options || {};
if (typeof options.mask === 'undefined') options.mask = !this._isServer;
this._sender.ping(data, options); };
/** * Sends a pong * * @param {Object} data to be sent to the server * @param {Object} Members - mask: boolean, binary: boolean * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open * @api public */ WebSocket.prototype.pong = function(data, options, dontFailWhenClosed) { if (this.readyState !== WebSocket.OPEN) { if (dontFailWhenClosed === true) return; throw new Error('not opened'); }
options = options || {};
if (typeof options.mask === 'undefined') options.mask = !this._isServer;
this._sender.pong(data, options); };
/** * Resume the client stream * * @api public */ WebSocket.prototype.resume = function resume() { if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
return this._socket.resume(); };
/** * Sends a piece of data * * @param {Object} data to be sent to the server * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean * @param {function} Optional callback which is executed after the send completes * @api public */
WebSocket.prototype.send = function send(data, options, cb) { if (typeof options === 'function') { cb = options; options = {}; }
if (this.readyState !== WebSocket.OPEN) { if (typeof cb === 'function') cb(new Error('not opened')); else throw new Error('not opened'); return; }
if (!data) data = ''; if (this._queue) { var self = this; this._queue.push(function() { self.send(data, options, cb); }); return; }
options = options || {}; options.fin = true;
if (typeof options.binary === 'undefined') { options.binary = (data instanceof ArrayBuffer || data instanceof Buffer || data instanceof Uint8Array || data instanceof Uint16Array || data instanceof Uint32Array || data instanceof Int8Array || data instanceof Int16Array || data instanceof Int32Array || data instanceof Float32Array || data instanceof Float64Array); }
if (typeof options.mask === 'undefined') options.mask = !this._isServer; if (typeof options.compress === 'undefined') options.compress = true; if (!this.extensions[PerMessageDeflate.extensionName]) { options.compress = false; }
var readable = typeof stream.Readable === 'function' ? stream.Readable : stream.Stream;
if (data instanceof readable) { startQueue(this); var self = this;
sendStream(this, data, options, function send(error) { process.nextTick(function tock() { executeQueueSends(self); });
if (typeof cb === 'function') cb(error); }); } else { this._sender.send(data, options, cb); } };
/** * Streams data through calls to a user supplied function * * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean * @param {function} 'function (error, send)' which is executed on successive ticks of which send is 'function (data, final)'. * @api public */ WebSocket.prototype.stream = function stream(options, cb) { if (typeof options === 'function') { cb = options; options = {}; }
var self = this;
if (typeof cb !== 'function') throw new Error('callback must be provided');
if (this.readyState !== WebSocket.OPEN) { if (typeof cb === 'function') cb(new Error('not opened')); else throw new Error('not opened'); return; }
if (this._queue) { this._queue.push(function () { self.stream(options, cb); }); return; }
options = options || {};
if (typeof options.mask === 'undefined') options.mask = !this._isServer; if (typeof options.compress === 'undefined') options.compress = true; if (!this.extensions[PerMessageDeflate.extensionName]) { options.compress = false; }
startQueue(this);
function send(data, final) { try { if (self.readyState !== WebSocket.OPEN) throw new Error('not opened'); options.fin = final === true; self._sender.send(data, options); if (!final) process.nextTick(cb.bind(null, null, send)); else executeQueueSends(self); } catch (e) { if (typeof cb === 'function') cb(e); else { delete self._queue; self.emit('error', e); } } }
process.nextTick(cb.bind(null, null, send)); };
/** * Immediately shuts down the connection * * @api public */ WebSocket.prototype.terminate = function terminate() { if (this.readyState === WebSocket.CLOSED) return;
if (this._socket) { this.readyState = WebSocket.CLOSING;
// End the connection
try { this._socket.end(); } catch (e) { // Socket error during end() call, so just destroy it right now
cleanupWebsocketResources.call(this, true); return; }
// Add a timeout to ensure that the connection is completely
// cleaned up within 30 seconds, even if the clean close procedure
// fails for whatever reason
// First cleanup any pre-existing timeout from an earlier "terminate" call,
// if one exists. Otherwise terminate calls in quick succession will leak timeouts
// and hold the program open for `closeTimout` time.
if (this._closeTimer) { clearTimeout(this._closeTimer); } this._closeTimer = setTimeout(cleanupWebsocketResources.bind(this, true), closeTimeout); } else if (this.readyState === WebSocket.CONNECTING) { cleanupWebsocketResources.call(this, true); } };
/** * Expose bufferedAmount * * @api public */ Object.defineProperty(WebSocket.prototype, 'bufferedAmount', { get: function get() { var amount = 0; if (this._socket) { amount = this._socket.bufferSize || 0; } return amount; } });
/** * Expose binaryType * * This deviates from the W3C interface since ws doesn't support the required * default "blob" type (instead we define a custom "nodebuffer" type). * * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
* @api public */ Object.defineProperty(WebSocket.prototype, 'binaryType', { get: function get() { return this._binaryType; }, set: function set(type) { if (type === 'arraybuffer' || type === 'nodebuffer') this._binaryType = type; else throw new SyntaxError('unsupported binaryType: must be either "nodebuffer" or "arraybuffer"'); } });
/** * Emulates the W3C Browser based WebSocket interface using function members. * * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
* @api public */ ['open', 'error', 'close', 'message'].forEach(function(method) { Object.defineProperty(WebSocket.prototype, 'on' + method, { /** * Returns the current listener * * @returns {Mixed} the set function or undefined * @api public */ get: function get() { var listener = this.listeners(method)[0]; return listener ? (listener._listener ? listener._listener : listener) : undefined; },
/** * Start listening for events * * @param {Function} listener the listener * @returns {Mixed} the set function or undefined * @api public */ set: function set(listener) { this.removeAllListeners(method); this.addEventListener(method, listener); } }); });
/** * Emulates the W3C Browser based WebSocket interface using addEventListener. * * @see https://developer.mozilla.org/en/DOM/element.addEventListener
* @see http://dev.w3.org/html5/websockets/#the-websocket-interface
* @api public */ WebSocket.prototype.addEventListener = function(method, listener) { var target = this;
function onMessage (data, flags) { if (flags.binary && this.binaryType === 'arraybuffer') data = new Uint8Array(data).buffer; listener.call(target, new MessageEvent(data, !!flags.binary, target)); }
function onClose (code, message) { listener.call(target, new CloseEvent(code, message, target)); }
function onError (event) { event.type = 'error'; event.target = target; listener.call(target, event); }
function onOpen () { listener.call(target, new OpenEvent(target)); }
if (typeof listener === 'function') { if (method === 'message') { // store a reference so we can return the original function from the
// addEventListener hook
onMessage._listener = listener; this.on(method, onMessage); } else if (method === 'close') { // store a reference so we can return the original function from the
// addEventListener hook
onClose._listener = listener; this.on(method, onClose); } else if (method === 'error') { // store a reference so we can return the original function from the
// addEventListener hook
onError._listener = listener; this.on(method, onError); } else if (method === 'open') { // store a reference so we can return the original function from the
// addEventListener hook
onOpen._listener = listener; this.on(method, onOpen); } else { this.on(method, listener); } } };
module.exports = WebSocket; module.exports.buildHostHeader = buildHostHeader
/** * W3C MessageEvent * * @see http://www.w3.org/TR/html5/comms.html
* @constructor * @api private */ function MessageEvent(dataArg, isBinary, target) { this.type = 'message'; this.data = dataArg; this.target = target; this.binary = isBinary; // non-standard.
}
/** * W3C CloseEvent * * @see http://www.w3.org/TR/html5/comms.html
* @constructor * @api private */ function CloseEvent(code, reason, target) { this.type = 'close'; this.wasClean = (typeof code === 'undefined' || code === 1000); this.code = code; this.reason = reason; this.target = target; }
/** * W3C OpenEvent * * @see http://www.w3.org/TR/html5/comms.html
* @constructor * @api private */ function OpenEvent(target) { this.type = 'open'; this.target = target; }
// Append port number to Host header, only if specified in the url
// and non-default
function buildHostHeader(isSecure, hostname, port) { var headerHost = hostname; if (hostname) { if ((isSecure && (port != 443)) || (!isSecure && (port != 80))){ headerHost = headerHost + ':' + port; } } return headerHost; }
/** * Entirely private apis, * which may or may not be bound to a sepcific WebSocket instance. */ function initAsServerClient(req, socket, upgradeHead, options) { options = new Options({ protocolVersion: protocolVersion, protocol: null, extensions: {}, maxPayload: 0 }).merge(options);
// expose state properties
this.protocol = options.value.protocol; this.protocolVersion = options.value.protocolVersion; this.extensions = options.value.extensions; this.supports.binary = (this.protocolVersion !== 'hixie-76'); this.upgradeReq = req; this.readyState = WebSocket.CONNECTING; this._isServer = true; this.maxPayload = options.value.maxPayload; // establish connection
if (options.value.protocolVersion === 'hixie-76') { establishConnection.call(this, ReceiverHixie, SenderHixie, socket, upgradeHead); } else { establishConnection.call(this, Receiver, Sender, socket, upgradeHead); } }
function initAsClient(address, protocols, options) { options = new Options({ origin: null, protocolVersion: protocolVersion, host: null, headers: null, protocol: protocols.join(','), agent: null,
// ssl-related options
pfx: null, key: null, passphrase: null, cert: null, ca: null, ciphers: null, rejectUnauthorized: null, perMessageDeflate: true, localAddress: null }).merge(options);
if (options.value.protocolVersion !== 8 && options.value.protocolVersion !== 13) { throw new Error('unsupported protocol version'); }
// verify URL and establish http class
var serverUrl = url.parse(address); var isUnixSocket = serverUrl.protocol === 'ws+unix:'; if (!serverUrl.host && !isUnixSocket) throw new Error('invalid url'); var isSecure = serverUrl.protocol === 'wss:' || serverUrl.protocol === 'https:'; var httpObj = isSecure ? https : http; var port = serverUrl.port || (isSecure ? 443 : 80); var auth = serverUrl.auth;
// prepare extensions
var extensionsOffer = {}; var perMessageDeflate; if (options.value.perMessageDeflate) { perMessageDeflate = new PerMessageDeflate(typeof options.value.perMessageDeflate !== true ? options.value.perMessageDeflate : {}, false); extensionsOffer[PerMessageDeflate.extensionName] = perMessageDeflate.offer(); }
// expose state properties
this._isServer = false; this.url = address; this.protocolVersion = options.value.protocolVersion; this.supports.binary = (this.protocolVersion !== 'hixie-76');
// begin handshake
var key = new Buffer(options.value.protocolVersion + '-' + Date.now()).toString('base64'); var shasum = crypto.createHash('sha1'); shasum.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'); var expectedServerKey = shasum.digest('base64');
var agent = options.value.agent;
var headerHost = buildHostHeader(isSecure, serverUrl.hostname, port)
var requestOptions = { port: port, host: serverUrl.hostname, headers: { 'Connection': 'Upgrade', 'Upgrade': 'websocket', 'Host': headerHost, 'Sec-WebSocket-Version': options.value.protocolVersion, 'Sec-WebSocket-Key': key } };
// If we have basic auth.
if (auth) { requestOptions.headers.Authorization = 'Basic ' + new Buffer(auth).toString('base64'); }
if (options.value.protocol) { requestOptions.headers['Sec-WebSocket-Protocol'] = options.value.protocol; }
if (options.value.host) { requestOptions.headers.Host = options.value.host; }
if (options.value.headers) { for (var header in options.value.headers) { if (options.value.headers.hasOwnProperty(header)) { requestOptions.headers[header] = options.value.headers[header]; } } }
if (Object.keys(extensionsOffer).length) { requestOptions.headers['Sec-WebSocket-Extensions'] = Extensions.format(extensionsOffer); }
if (options.isDefinedAndNonNull('pfx') || options.isDefinedAndNonNull('key') || options.isDefinedAndNonNull('passphrase') || options.isDefinedAndNonNull('cert') || options.isDefinedAndNonNull('ca') || options.isDefinedAndNonNull('ciphers') || options.isDefinedAndNonNull('rejectUnauthorized')) {
if (options.isDefinedAndNonNull('pfx')) requestOptions.pfx = options.value.pfx; if (options.isDefinedAndNonNull('key')) requestOptions.key = options.value.key; if (options.isDefinedAndNonNull('passphrase')) requestOptions.passphrase = options.value.passphrase; if (options.isDefinedAndNonNull('cert')) requestOptions.cert = options.value.cert; if (options.isDefinedAndNonNull('ca')) requestOptions.ca = options.value.ca; if (options.isDefinedAndNonNull('ciphers')) requestOptions.ciphers = options.value.ciphers; if (options.isDefinedAndNonNull('rejectUnauthorized')) requestOptions.rejectUnauthorized = options.value.rejectUnauthorized;
if (!agent) { // global agent ignores client side certificates
agent = new httpObj.Agent(requestOptions); } }
requestOptions.path = serverUrl.path || '/';
if (agent) { requestOptions.agent = agent; }
if (isUnixSocket) { requestOptions.socketPath = serverUrl.pathname; }
if (options.value.localAddress) { requestOptions.localAddress = options.value.localAddress; }
if (options.value.origin) { if (options.value.protocolVersion < 13) requestOptions.headers['Sec-WebSocket-Origin'] = options.value.origin; else requestOptions.headers.Origin = options.value.origin; }
var self = this; var req = httpObj.request(requestOptions);
req.on('error', function onerror(error) { self.emit('error', error); cleanupWebsocketResources.call(self, error); });
req.once('response', function response(res) { var error;
if (!self.emit('unexpected-response', req, res)) { error = new Error('unexpected server response (' + res.statusCode + ')'); req.abort(); self.emit('error', error); }
cleanupWebsocketResources.call(self, error); });
req.once('upgrade', function upgrade(res, socket, upgradeHead) { if (self.readyState === WebSocket.CLOSED) { // client closed before server accepted connection
self.emit('close'); self.removeAllListeners(); socket.end(); return; }
var serverKey = res.headers['sec-websocket-accept']; if (typeof serverKey === 'undefined' || serverKey !== expectedServerKey) { self.emit('error', 'invalid server key'); self.removeAllListeners(); socket.end(); return; }
var serverProt = res.headers['sec-websocket-protocol']; var protList = (options.value.protocol || "").split(/, */); var protError = null;
if (!options.value.protocol && serverProt) { protError = 'server sent a subprotocol even though none requested'; } else if (options.value.protocol && !serverProt) { protError = 'server sent no subprotocol even though requested'; } else if (serverProt && protList.indexOf(serverProt) === -1) { protError = 'server responded with an invalid protocol'; }
if (protError) { self.emit('error', protError); self.removeAllListeners(); socket.end(); return; } else if (serverProt) { self.protocol = serverProt; }
var serverExtensions = Extensions.parse(res.headers['sec-websocket-extensions']); if (perMessageDeflate && serverExtensions[PerMessageDeflate.extensionName]) { try { perMessageDeflate.accept(serverExtensions[PerMessageDeflate.extensionName]); } catch (err) { self.emit('error', 'invalid extension parameter'); self.removeAllListeners(); socket.end(); return; } self.extensions[PerMessageDeflate.extensionName] = perMessageDeflate; }
establishConnection.call(self, Receiver, Sender, socket, upgradeHead);
// perform cleanup on http resources
req.removeAllListeners(); req = null; agent = null; });
req.end(); this.readyState = WebSocket.CONNECTING; }
function establishConnection(ReceiverClass, SenderClass, socket, upgradeHead) { var ultron = this._ultron = new Ultron(socket) , called = false , self = this;
socket.setTimeout(0); socket.setNoDelay(true);
this._receiver = new ReceiverClass(this.extensions,this.maxPayload); this._socket = socket;
// socket cleanup handlers
ultron.on('end', cleanupWebsocketResources.bind(this)); ultron.on('close', cleanupWebsocketResources.bind(this)); ultron.on('error', cleanupWebsocketResources.bind(this));
// ensure that the upgradeHead is added to the receiver
function firstHandler(data) { if (called || self.readyState === WebSocket.CLOSED) return;
called = true; socket.removeListener('data', firstHandler); ultron.on('data', realHandler);
if (upgradeHead && upgradeHead.length > 0) { realHandler(upgradeHead); upgradeHead = null; }
if (data) realHandler(data); }
// subsequent packets are pushed straight to the receiver
function realHandler(data) { self.bytesReceived += data.length; self._receiver.add(data); }
ultron.on('data', firstHandler);
// if data was passed along with the http upgrade,
// this will schedule a push of that on to the receiver.
// this has to be done on next tick, since the caller
// hasn't had a chance to set event handlers on this client
// object yet.
process.nextTick(firstHandler);
// receiver event handlers
self._receiver.ontext = function ontext(data, flags) { flags = flags || {};
self.emit('message', data, flags); };
self._receiver.onbinary = function onbinary(data, flags) { flags = flags || {};
flags.binary = true; self.emit('message', data, flags); };
self._receiver.onping = function onping(data, flags) { flags = flags || {};
self.pong(data, { mask: !self._isServer, binary: flags.binary === true }, true);
self.emit('ping', data, flags); };
self._receiver.onpong = function onpong(data, flags) { self.emit('pong', data, flags || {}); };
self._receiver.onclose = function onclose(code, data, flags) { flags = flags || {};
self._closeReceived = true; self.close(code, data); };
self._receiver.onerror = function onerror(reason, errorCode) { // close the connection when the receiver reports a HyBi error code
self.close(typeof errorCode !== 'undefined' ? errorCode : 1002, ''); self.emit('error', (reason instanceof Error) ? reason : (new Error(reason))); };
// finalize the client
this._sender = new SenderClass(socket, this.extensions); this._sender.on('error', function onerror(error) { self.close(1002, ''); self.emit('error', error); });
this.readyState = WebSocket.OPEN; this.emit('open'); }
function startQueue(instance) { instance._queue = instance._queue || []; }
function executeQueueSends(instance) { var queue = instance._queue; if (typeof queue === 'undefined') return;
delete instance._queue; for (var i = 0, l = queue.length; i < l; ++i) { queue[i](); } }
function sendStream(instance, stream, options, cb) { stream.on('data', function incoming(data) { if (instance.readyState !== WebSocket.OPEN) { if (typeof cb === 'function') cb(new Error('not opened')); else { delete instance._queue; instance.emit('error', new Error('not opened')); } return; }
options.fin = false; instance._sender.send(data, options); });
stream.on('end', function end() { if (instance.readyState !== WebSocket.OPEN) { if (typeof cb === 'function') cb(new Error('not opened')); else { delete instance._queue; instance.emit('error', new Error('not opened')); } return; }
options.fin = true; instance._sender.send(null, options);
if (typeof cb === 'function') cb(null); }); }
function cleanupWebsocketResources(error) { if (this.readyState === WebSocket.CLOSED) return;
this.readyState = WebSocket.CLOSED;
clearTimeout(this._closeTimer); this._closeTimer = null;
// If the connection was closed abnormally (with an error), or if
// the close control frame was not received then the close code
// must default to 1006.
if (error || !this._closeReceived) { this._closeCode = 1006; } this.emit('close', this._closeCode || 1000, this._closeMessage || '');
if (this._socket) { if (this._ultron) this._ultron.destroy(); this._socket.on('error', function onerror() { try { this.destroy(); } catch (e) {} });
try { if (!error) this._socket.end(); else this._socket.destroy(); } catch (e) { /* Ignore termination errors */ }
this._socket = null; this._ultron = null; }
if (this._sender) { this._sender.removeAllListeners(); this._sender = null; }
if (this._receiver) { this._receiver.cleanup(); this._receiver = null; }
if (this.extensions[PerMessageDeflate.extensionName]) { this.extensions[PerMessageDeflate.extensionName].cleanup(); }
this.extensions = null;
this.removeAllListeners(); this.on('error', function onerror() {}); // catch all errors after this
delete this._queue; }
|