From 5ed43ca7476fc4083a300ae9aba3d81f85341490 Mon Sep 17 00:00:00 2001 From: diy0r Date: Fri, 4 Oct 2024 13:05:55 +0500 Subject: [PATCH] perf: tcp message splitting with newline delimiter --- lib/chunkProcessor.js | 39 +++++++++++++++++++++++++++++++++++++++ lib/common/serdes.js | 5 +++-- lib/connection.js | 18 +++++++----------- 3 files changed, 49 insertions(+), 13 deletions(-) create mode 100644 lib/chunkProcessor.js diff --git a/lib/chunkProcessor.js b/lib/chunkProcessor.js new file mode 100644 index 0000000..3b598fc --- /dev/null +++ b/lib/chunkProcessor.js @@ -0,0 +1,39 @@ +const { Transform } = require('stream'); +const { deserialization, SEPARATE_SYMBOL } = require('./common'); + +class CollectMessage extends Transform { + #buffer = ''; + + constructor() { + super({ objectMode: true }); + } + + _transform(chunk, encoding = 'utf8', callback) { + try { + this.#buffer += chunk.toString(encoding); + this.#processBuffer(); + callback(); + } catch (err) { + callback(err); + } + } + + #processBuffer(boundary = -1) { + while ((boundary = this.#findBoundary()) !== -1) { + const message = this.#extractMessage(boundary); + const objectMessage = deserialization(message); + this.push(objectMessage); + } + } + + #findBoundary() { + return this.#buffer.indexOf(SEPARATE_SYMBOL); + } + + #extractMessage(boundary) { + const message = this.#buffer.slice(0, boundary); + this.#buffer = this.#buffer.slice(boundary + SEPARATE_SYMBOL.length); + return message; + } +} +module.exports = { collectMessage: new CollectMessage() }; diff --git a/lib/common/serdes.js b/lib/common/serdes.js index c68e56d..46681fc 100644 --- a/lib/common/serdes.js +++ b/lib/common/serdes.js @@ -1,3 +1,4 @@ -const serialization = message => JSON.stringify(message); +const SEPARATE_SYMBOL = '\n'; +const serialization = message => JSON.stringify(message) + SEPARATE_SYMBOL; const deserialization = message => JSON.parse(message); -module.exports = { serialization, deserialization }; +module.exports = { serialization, deserialization, SEPARATE_SYMBOL }; diff --git a/lib/connection.js b/lib/connection.js index be19713..aba15f4 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -1,11 +1,7 @@ const net = require('net'); -const { - uuid, - serialization, - createData, - deserialization, -} = require('./common'); +const { uuid, serialization, createData } = require('./common'); const { HANDSHAKE } = require('./constants'); +const { collectMessage } = require('./chunkProcessor'); class Connection { NODE_ID = uuid(); @@ -24,12 +20,13 @@ class Connection { /**@param {net.Socket} socket*/ #handleConnection(socket) { - console.log({ localPort: socket.localPort }); const connectionId = uuid(); + socket.setNoDelay(true); socket.on('close', this.#deleteConnection(connectionId)); - socket.on('error', this.#errorHandler); + socket.on('end', () => console.log('end')); socket.on('timeout', () => console.log('timeout')); - socket.on('data', this.#onData.bind(this)); + socket.on('error', this.#errorHandler); + socket.pipe(collectMessage).on('data', this.#onData.bind(this)); this.#connections.set(connectionId, socket); this.#write(connectionId, createData(HANDSHAKE, this.NODE_ID)); } @@ -41,14 +38,13 @@ class Connection { } #write(connectionId, message) { - console.log(new Date().toISOString()); const socket = this.#connections.get(connectionId); if (!socket) throw new Error(`Node does not exist ${connectionId}`); socket.write(serialization(message)); } #onData(message) { - console.log(this.NODE_ID, deserialization(message)); + console.log(message); } #start(port, targetNode = null) {