Skip to content

Commit

Permalink
perf: tcp message splitting with newline delimiter
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Oct 4, 2024
1 parent c6e8bf5 commit 5ed43ca
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 13 deletions.
39 changes: 39 additions & 0 deletions lib/chunkProcessor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
const { Transform } = require('stream');
const { deserialization, SEPARATE_SYMBOL } = require('./common');

class CollectMessage extends Transform {
#buffer = '';

constructor() {
super({ objectMode: true });
}

_transform(chunk, encoding = 'utf8', callback) {
try {
this.#buffer += chunk.toString(encoding);
this.#processBuffer();
callback();
} catch (err) {
callback(err);
}
}

#processBuffer(boundary = -1) {
while ((boundary = this.#findBoundary()) !== -1) {
const message = this.#extractMessage(boundary);
const objectMessage = deserialization(message);
this.push(objectMessage);
}
}

#findBoundary() {
return this.#buffer.indexOf(SEPARATE_SYMBOL);
}

#extractMessage(boundary) {
const message = this.#buffer.slice(0, boundary);
this.#buffer = this.#buffer.slice(boundary + SEPARATE_SYMBOL.length);
return message;
}
}
module.exports = { collectMessage: new CollectMessage() };
5 changes: 3 additions & 2 deletions lib/common/serdes.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
const serialization = message => JSON.stringify(message);
const SEPARATE_SYMBOL = '\n';
const serialization = message => JSON.stringify(message) + SEPARATE_SYMBOL;
const deserialization = message => JSON.parse(message);
module.exports = { serialization, deserialization };
module.exports = { serialization, deserialization, SEPARATE_SYMBOL };
18 changes: 7 additions & 11 deletions lib/connection.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
const net = require('net');
const {
uuid,
serialization,
createData,
deserialization,
} = require('./common');
const { uuid, serialization, createData } = require('./common');
const { HANDSHAKE } = require('./constants');
const { collectMessage } = require('./chunkProcessor');

class Connection {
NODE_ID = uuid();
Expand All @@ -24,12 +20,13 @@ class Connection {

/**@param {net.Socket} socket*/
#handleConnection(socket) {
console.log({ localPort: socket.localPort });
const connectionId = uuid();
socket.setNoDelay(true);
socket.on('close', this.#deleteConnection(connectionId));
socket.on('error', this.#errorHandler);
socket.on('end', () => console.log('end'));
socket.on('timeout', () => console.log('timeout'));
socket.on('data', this.#onData.bind(this));
socket.on('error', this.#errorHandler);
socket.pipe(collectMessage).on('data', this.#onData.bind(this));
this.#connections.set(connectionId, socket);
this.#write(connectionId, createData(HANDSHAKE, this.NODE_ID));
}
Expand All @@ -41,14 +38,13 @@ class Connection {
}

#write(connectionId, message) {
console.log(new Date().toISOString());
const socket = this.#connections.get(connectionId);
if (!socket) throw new Error(`Node does not exist ${connectionId}`);
socket.write(serialization(message));
}

#onData(message) {
console.log(this.NODE_ID, deserialization(message));
console.log(message);
}

#start(port, targetNode = null) {
Expand Down

0 comments on commit 5ed43ca

Please sign in to comment.