You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

591 lines
16 KiB

  1. /*!
  2. * ws: a node.js websocket client
  3. * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
  4. * MIT Licensed
  5. */
  6. var util = require('util')
  7. , Validation = require('./Validation').Validation
  8. , ErrorCodes = require('./ErrorCodes')
  9. , BufferPool = require('./BufferPool')
  10. , bufferUtil = require('./BufferUtil').BufferUtil;
  11. /**
  12. * Node version 0.4 and 0.6 compatibility
  13. */
  14. var isNodeV4 = /^v0\.4/.test(process.version);
  15. /**
  16. * HyBi Receiver implementation
  17. */
  18. function Receiver () {
  19. // memory pool for fragmented messages
  20. var fragmentedPoolPrevUsed = -1;
  21. this.fragmentedBufferPool = new BufferPool(1024, function(db, length) {
  22. return db.used + length;
  23. }, function(db) {
  24. return fragmentedPoolPrevUsed = fragmentedPoolPrevUsed >= 0 ?
  25. (fragmentedPoolPrevUsed + db.used) / 2 :
  26. db.used;
  27. });
  28. // memory pool for unfragmented messages
  29. var unfragmentedPoolPrevUsed = -1;
  30. this.unfragmentedBufferPool = new BufferPool(1024, function(db, length) {
  31. return db.used + length;
  32. }, function(db) {
  33. return unfragmentedPoolPrevUsed = unfragmentedPoolPrevUsed >= 0 ?
  34. (unfragmentedPoolPrevUsed + db.used) / 2 :
  35. db.used;
  36. });
  37. this.state = {
  38. activeFragmentedOperation: null,
  39. lastFragment: false,
  40. masked: false,
  41. opcode: 0,
  42. fragmentedOperation: false
  43. };
  44. this.overflow = [];
  45. this.headerBuffer = new Buffer(10);
  46. this.expectOffset = 0;
  47. this.expectBuffer = null;
  48. this.expectHandler = null;
  49. this.currentMessage = [];
  50. this.expectHeader(2, this.processPacket);
  51. this.dead = false;
  52. this.onerror = function() {};
  53. this.ontext = function() {};
  54. this.onbinary = function() {};
  55. this.onclose = function() {};
  56. this.onping = function() {};
  57. this.onpong = function() {};
  58. };
  59. module.exports = Receiver;
  60. /**
  61. * Add new data to the parser.
  62. *
  63. * @api public
  64. */
  65. Receiver.prototype.add = function(data) {
  66. var dataLength = data.length;
  67. if (dataLength == 0) return;
  68. if (this.expectBuffer == null) {
  69. this.overflow.push(data);
  70. return;
  71. }
  72. var toRead = Math.min(dataLength, this.expectBuffer.length - this.expectOffset);
  73. fastCopy(toRead, data, this.expectBuffer, this.expectOffset);
  74. this.expectOffset += toRead;
  75. if (toRead < dataLength) {
  76. this.overflow.push(data.slice(toRead));
  77. }
  78. while (this.expectBuffer && this.expectOffset == this.expectBuffer.length) {
  79. var bufferForHandler = this.expectBuffer;
  80. this.expectBuffer = null;
  81. this.expectOffset = 0;
  82. this.expectHandler.call(this, bufferForHandler);
  83. }
  84. }
  85. /**
  86. * Releases all resources used by the receiver.
  87. *
  88. * @api public
  89. */
  90. Receiver.prototype.cleanup = function() {
  91. this.dead = true;
  92. this.overflow = null;
  93. this.headerBuffer = null;
  94. this.expectBuffer = null;
  95. this.expectHandler = null;
  96. this.unfragmentedBufferPool = null;
  97. this.fragmentedBufferPool = null;
  98. this.state = null;
  99. this.currentMessage = null;
  100. this.onerror = null;
  101. this.ontext = null;
  102. this.onbinary = null;
  103. this.onclose = null;
  104. this.onping = null;
  105. this.onpong = null;
  106. }
  107. /**
  108. * Waits for a certain amount of header bytes to be available, then fires a callback.
  109. *
  110. * @api private
  111. */
  112. Receiver.prototype.expectHeader = function(length, handler) {
  113. if (length == 0) {
  114. handler(null);
  115. return;
  116. }
  117. this.expectBuffer = this.headerBuffer.slice(this.expectOffset, this.expectOffset + length);
  118. this.expectHandler = handler;
  119. var toRead = length;
  120. while (toRead > 0 && this.overflow.length > 0) {
  121. var fromOverflow = this.overflow.pop();
  122. if (toRead < fromOverflow.length) this.overflow.push(fromOverflow.slice(toRead));
  123. var read = Math.min(fromOverflow.length, toRead);
  124. fastCopy(read, fromOverflow, this.expectBuffer, this.expectOffset);
  125. this.expectOffset += read;
  126. toRead -= read;
  127. }
  128. }
  129. /**
  130. * Waits for a certain amount of data bytes to be available, then fires a callback.
  131. *
  132. * @api private
  133. */
  134. Receiver.prototype.expectData = function(length, handler) {
  135. if (length == 0) {
  136. handler(null);
  137. return;
  138. }
  139. this.expectBuffer = this.allocateFromPool(length, this.state.fragmentedOperation);
  140. this.expectHandler = handler;
  141. var toRead = length;
  142. while (toRead > 0 && this.overflow.length > 0) {
  143. var fromOverflow = this.overflow.pop();
  144. if (toRead < fromOverflow.length) this.overflow.push(fromOverflow.slice(toRead));
  145. var read = Math.min(fromOverflow.length, toRead);
  146. fastCopy(read, fromOverflow, this.expectBuffer, this.expectOffset);
  147. this.expectOffset += read;
  148. toRead -= read;
  149. }
  150. }
  151. /**
  152. * Allocates memory from the buffer pool.
  153. *
  154. * @api private
  155. */
  156. Receiver.prototype.allocateFromPool = !isNodeV4
  157. ? function(length, isFragmented) { return (isFragmented ? this.fragmentedBufferPool : this.unfragmentedBufferPool).get(length); }
  158. : function(length) { return new Buffer(length); };
  159. /**
  160. * Start processing a new packet.
  161. *
  162. * @api private
  163. */
  164. Receiver.prototype.processPacket = function (data) {
  165. if ((data[0] & 0x70) != 0) {
  166. this.error('reserved fields must be empty', 1002);
  167. return;
  168. }
  169. this.state.lastFragment = (data[0] & 0x80) == 0x80;
  170. this.state.masked = (data[1] & 0x80) == 0x80;
  171. var opcode = data[0] & 0xf;
  172. if (opcode === 0) {
  173. // continuation frame
  174. this.state.fragmentedOperation = true;
  175. this.state.opcode = this.state.activeFragmentedOperation;
  176. if (!(this.state.opcode == 1 || this.state.opcode == 2)) {
  177. this.error('continuation frame cannot follow current opcode', 1002);
  178. return;
  179. }
  180. }
  181. else {
  182. if (opcode < 3 && this.state.activeFragmentedOperation != null) {
  183. this.error('data frames after the initial data frame must have opcode 0', 1002);
  184. return;
  185. }
  186. this.state.opcode = opcode;
  187. if (this.state.lastFragment === false) {
  188. this.state.fragmentedOperation = true;
  189. this.state.activeFragmentedOperation = opcode;
  190. }
  191. else this.state.fragmentedOperation = false;
  192. }
  193. var handler = opcodes[this.state.opcode];
  194. if (typeof handler == 'undefined') this.error('no handler for opcode ' + this.state.opcode, 1002);
  195. else {
  196. handler.start.call(this, data);
  197. }
  198. }
  199. /**
  200. * Endprocessing a packet.
  201. *
  202. * @api private
  203. */
  204. Receiver.prototype.endPacket = function() {
  205. if (!this.state.fragmentedOperation) this.unfragmentedBufferPool.reset(true);
  206. else if (this.state.lastFragment) this.fragmentedBufferPool.reset(false);
  207. this.expectOffset = 0;
  208. this.expectBuffer = null;
  209. this.expectHandler = null;
  210. if (this.state.lastFragment && this.state.opcode === this.state.activeFragmentedOperation) {
  211. // end current fragmented operation
  212. this.state.activeFragmentedOperation = null;
  213. }
  214. this.state.lastFragment = false;
  215. this.state.opcode = this.state.activeFragmentedOperation != null ? this.state.activeFragmentedOperation : 0;
  216. this.state.masked = false;
  217. this.expectHeader(2, this.processPacket);
  218. }
  219. /**
  220. * Reset the parser state.
  221. *
  222. * @api private
  223. */
  224. Receiver.prototype.reset = function() {
  225. if (this.dead) return;
  226. this.state = {
  227. activeFragmentedOperation: null,
  228. lastFragment: false,
  229. masked: false,
  230. opcode: 0,
  231. fragmentedOperation: false
  232. };
  233. this.fragmentedBufferPool.reset(true);
  234. this.unfragmentedBufferPool.reset(true);
  235. this.expectOffset = 0;
  236. this.expectBuffer = null;
  237. this.expectHandler = null;
  238. this.overflow = [];
  239. this.currentMessage = [];
  240. }
  241. /**
  242. * Unmask received data.
  243. *
  244. * @api private
  245. */
  246. Receiver.prototype.unmask = function (mask, buf, binary) {
  247. if (mask != null && buf != null) bufferUtil.unmask(buf, mask);
  248. if (binary) return buf;
  249. return buf != null ? buf.toString('utf8') : '';
  250. }
  251. /**
  252. * Concatenates a list of buffers.
  253. *
  254. * @api private
  255. */
  256. Receiver.prototype.concatBuffers = function(buffers) {
  257. var length = 0;
  258. for (var i = 0, l = buffers.length; i < l; ++i) length += buffers[i].length;
  259. var mergedBuffer = new Buffer(length);
  260. bufferUtil.merge(mergedBuffer, buffers);
  261. return mergedBuffer;
  262. }
  263. /**
  264. * Handles an error
  265. *
  266. * @api private
  267. */
  268. Receiver.prototype.error = function (reason, protocolErrorCode) {
  269. this.reset();
  270. this.onerror(reason, protocolErrorCode);
  271. return this;
  272. }
  273. /**
  274. * Buffer utilities
  275. */
  276. function readUInt16BE(start) {
  277. return (this[start]<<8) +
  278. this[start+1];
  279. }
  280. function readUInt32BE(start) {
  281. return (this[start]<<24) +
  282. (this[start+1]<<16) +
  283. (this[start+2]<<8) +
  284. this[start+3];
  285. }
  286. function fastCopy(length, srcBuffer, dstBuffer, dstOffset) {
  287. switch (length) {
  288. default: srcBuffer.copy(dstBuffer, dstOffset, 0, length); break;
  289. case 16: dstBuffer[dstOffset+15] = srcBuffer[15];
  290. case 15: dstBuffer[dstOffset+14] = srcBuffer[14];
  291. case 14: dstBuffer[dstOffset+13] = srcBuffer[13];
  292. case 13: dstBuffer[dstOffset+12] = srcBuffer[12];
  293. case 12: dstBuffer[dstOffset+11] = srcBuffer[11];
  294. case 11: dstBuffer[dstOffset+10] = srcBuffer[10];
  295. case 10: dstBuffer[dstOffset+9] = srcBuffer[9];
  296. case 9: dstBuffer[dstOffset+8] = srcBuffer[8];
  297. case 8: dstBuffer[dstOffset+7] = srcBuffer[7];
  298. case 7: dstBuffer[dstOffset+6] = srcBuffer[6];
  299. case 6: dstBuffer[dstOffset+5] = srcBuffer[5];
  300. case 5: dstBuffer[dstOffset+4] = srcBuffer[4];
  301. case 4: dstBuffer[dstOffset+3] = srcBuffer[3];
  302. case 3: dstBuffer[dstOffset+2] = srcBuffer[2];
  303. case 2: dstBuffer[dstOffset+1] = srcBuffer[1];
  304. case 1: dstBuffer[dstOffset] = srcBuffer[0];
  305. }
  306. }
  307. /**
  308. * Opcode handlers
  309. */
  310. var opcodes = {
  311. // text
  312. '1': {
  313. start: function(data) {
  314. var self = this;
  315. // decode length
  316. var firstLength = data[1] & 0x7f;
  317. if (firstLength < 126) {
  318. opcodes['1'].getData.call(self, firstLength);
  319. }
  320. else if (firstLength == 126) {
  321. self.expectHeader(2, function(data) {
  322. opcodes['1'].getData.call(self, readUInt16BE.call(data, 0));
  323. });
  324. }
  325. else if (firstLength == 127) {
  326. self.expectHeader(8, function(data) {
  327. if (readUInt32BE.call(data, 0) != 0) {
  328. self.error('packets with length spanning more than 32 bit is currently not supported', 1008);
  329. return;
  330. }
  331. opcodes['1'].getData.call(self, readUInt32BE.call(data, 4));
  332. });
  333. }
  334. },
  335. getData: function(length) {
  336. var self = this;
  337. if (self.state.masked) {
  338. self.expectHeader(4, function(data) {
  339. var mask = data;
  340. self.expectData(length, function(data) {
  341. opcodes['1'].finish.call(self, mask, data);
  342. });
  343. });
  344. }
  345. else {
  346. self.expectData(length, function(data) {
  347. opcodes['1'].finish.call(self, null, data);
  348. });
  349. }
  350. },
  351. finish: function(mask, data) {
  352. var packet = this.unmask(mask, data, true);
  353. if (packet != null) this.currentMessage.push(packet);
  354. if (this.state.lastFragment) {
  355. var messageBuffer = this.concatBuffers(this.currentMessage);
  356. if (!Validation.isValidUTF8(messageBuffer)) {
  357. this.error('invalid utf8 sequence', 1007);
  358. return;
  359. }
  360. this.ontext(messageBuffer.toString('utf8'), {masked: this.state.masked, buffer: messageBuffer});
  361. this.currentMessage = [];
  362. }
  363. this.endPacket();
  364. }
  365. },
  366. // binary
  367. '2': {
  368. start: function(data) {
  369. var self = this;
  370. // decode length
  371. var firstLength = data[1] & 0x7f;
  372. if (firstLength < 126) {
  373. opcodes['2'].getData.call(self, firstLength);
  374. }
  375. else if (firstLength == 126) {
  376. self.expectHeader(2, function(data) {
  377. opcodes['2'].getData.call(self, readUInt16BE.call(data, 0));
  378. });
  379. }
  380. else if (firstLength == 127) {
  381. self.expectHeader(8, function(data) {
  382. if (readUInt32BE.call(data, 0) != 0) {
  383. self.error('packets with length spanning more than 32 bit is currently not supported', 1008);
  384. return;
  385. }
  386. opcodes['2'].getData.call(self, readUInt32BE.call(data, 4, true));
  387. });
  388. }
  389. },
  390. getData: function(length) {
  391. var self = this;
  392. if (self.state.masked) {
  393. self.expectHeader(4, function(data) {
  394. var mask = data;
  395. self.expectData(length, function(data) {
  396. opcodes['2'].finish.call(self, mask, data);
  397. });
  398. });
  399. }
  400. else {
  401. self.expectData(length, function(data) {
  402. opcodes['2'].finish.call(self, null, data);
  403. });
  404. }
  405. },
  406. finish: function(mask, data) {
  407. var packet = this.unmask(mask, data, true);
  408. if (packet != null) this.currentMessage.push(packet);
  409. if (this.state.lastFragment) {
  410. var messageBuffer = this.concatBuffers(this.currentMessage);
  411. this.onbinary(messageBuffer, {masked: this.state.masked, buffer: messageBuffer});
  412. this.currentMessage = [];
  413. }
  414. this.endPacket();
  415. }
  416. },
  417. // close
  418. '8': {
  419. start: function(data) {
  420. var self = this;
  421. if (self.state.lastFragment == false) {
  422. self.error('fragmented close is not supported', 1002);
  423. return;
  424. }
  425. // decode length
  426. var firstLength = data[1] & 0x7f;
  427. if (firstLength < 126) {
  428. opcodes['8'].getData.call(self, firstLength);
  429. }
  430. else {
  431. self.error('control frames cannot have more than 125 bytes of data', 1002);
  432. }
  433. },
  434. getData: function(length) {
  435. var self = this;
  436. if (self.state.masked) {
  437. self.expectHeader(4, function(data) {
  438. var mask = data;
  439. self.expectData(length, function(data) {
  440. opcodes['8'].finish.call(self, mask, data);
  441. });
  442. });
  443. }
  444. else {
  445. self.expectData(length, function(data) {
  446. opcodes['8'].finish.call(self, null, data);
  447. });
  448. }
  449. },
  450. finish: function(mask, data) {
  451. var self = this;
  452. data = self.unmask(mask, data, true);
  453. if (data && data.length == 1) {
  454. self.error('close packets with data must be at least two bytes long', 1002);
  455. return;
  456. }
  457. var code = data && data.length > 1 ? readUInt16BE.call(data, 0) : 1000;
  458. if (!ErrorCodes.isValidErrorCode(code)) {
  459. self.error('invalid error code', 1002);
  460. return;
  461. }
  462. var message = '';
  463. if (data && data.length > 2) {
  464. var messageBuffer = data.slice(2);
  465. if (!Validation.isValidUTF8(messageBuffer)) {
  466. self.error('invalid utf8 sequence', 1007);
  467. return;
  468. }
  469. message = messageBuffer.toString('utf8');
  470. }
  471. this.onclose(code, message, {masked: self.state.masked});
  472. this.reset();
  473. },
  474. },
  475. // ping
  476. '9': {
  477. start: function(data) {
  478. var self = this;
  479. if (self.state.lastFragment == false) {
  480. self.error('fragmented ping is not supported', 1002);
  481. return;
  482. }
  483. // decode length
  484. var firstLength = data[1] & 0x7f;
  485. if (firstLength < 126) {
  486. opcodes['9'].getData.call(self, firstLength);
  487. }
  488. else {
  489. self.error('control frames cannot have more than 125 bytes of data', 1002);
  490. }
  491. },
  492. getData: function(length) {
  493. var self = this;
  494. if (self.state.masked) {
  495. self.expectHeader(4, function(data) {
  496. var mask = data;
  497. self.expectData(length, function(data) {
  498. opcodes['9'].finish.call(self, mask, data);
  499. });
  500. });
  501. }
  502. else {
  503. self.expectData(length, function(data) {
  504. opcodes['9'].finish.call(self, null, data);
  505. });
  506. }
  507. },
  508. finish: function(mask, data) {
  509. this.onping(this.unmask(mask, data, true), {masked: this.state.masked, binary: true});
  510. this.endPacket();
  511. }
  512. },
  513. // pong
  514. '10': {
  515. start: function(data) {
  516. var self = this;
  517. if (self.state.lastFragment == false) {
  518. self.error('fragmented pong is not supported', 1002);
  519. return;
  520. }
  521. // decode length
  522. var firstLength = data[1] & 0x7f;
  523. if (firstLength < 126) {
  524. opcodes['10'].getData.call(self, firstLength);
  525. }
  526. else {
  527. self.error('control frames cannot have more than 125 bytes of data', 1002);
  528. }
  529. },
  530. getData: function(length) {
  531. var self = this;
  532. if (this.state.masked) {
  533. this.expectHeader(4, function(data) {
  534. var mask = data;
  535. self.expectData(length, function(data) {
  536. opcodes['10'].finish.call(self, mask, data);
  537. });
  538. });
  539. }
  540. else {
  541. this.expectData(length, function(data) {
  542. opcodes['10'].finish.call(self, null, data);
  543. });
  544. }
  545. },
  546. finish: function(mask, data) {
  547. this.onpong(this.unmask(mask, data, true), {masked: this.state.masked, binary: true});
  548. this.endPacket();
  549. }
  550. }
  551. }