From d030ace6871c3f57bfe91545715a95bab1a3c82e Mon Sep 17 00:00:00 2001 From: diy0r Date: Sat, 5 Oct 2024 18:02:14 +0500 Subject: [PATCH 1/2] fix: create a transform stream for each socket --- lib/chunkProcessor.js | 2 +- lib/connection.js | 35 +++++++++++++++++++---------------- 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/lib/chunkProcessor.js b/lib/chunkProcessor.js index 3b598fc..e0a2769 100644 --- a/lib/chunkProcessor.js +++ b/lib/chunkProcessor.js @@ -36,4 +36,4 @@ class CollectMessage extends Transform { return message; } } -module.exports = { collectMessage: new CollectMessage() }; +module.exports = () => new CollectMessage(); diff --git a/lib/connection.js b/lib/connection.js index aba15f4..824281e 100644 --- a/lib/connection.js +++ b/lib/connection.js @@ -1,14 +1,12 @@ const net = require('net'); -const { uuid, serialization, createData } = require('./common'); -const { HANDSHAKE } = require('./constants'); -const { collectMessage } = require('./chunkProcessor'); +const { uuid, serialization } = require('./common'); +const collectMessage = require('./chunkProcessor'); class Connection { - NODE_ID = uuid(); + _onConnect = null; + _onData = null; - #neighbors = new Map(); #connections = new Map(); - constructor(port, targetNode) { this.#start(port, targetNode); } @@ -26,25 +24,27 @@ class Connection { socket.on('end', () => console.log('end')); socket.on('timeout', () => console.log('timeout')); socket.on('error', this.#errorHandler); - socket.pipe(collectMessage).on('data', this.#onData.bind(this)); + socket + .pipe(collectMessage()) + .on('data', message => this.#onData(connectionId, message)); this.#connections.set(connectionId, socket); - this.#write(connectionId, createData(HANDSHAKE, this.NODE_ID)); + this._onConnect?.(connectionId); } #deleteConnection(connectionId) { - return hadError => { + return error => { this.#connections.delete(connectionId); }; } - #write(connectionId, message) { + _send(connectionId, message) { const socket = this.#connections.get(connectionId); if (!socket) throw new Error(`Node does not exist ${connectionId}`); socket.write(serialization(message)); } - #onData(message) { - console.log(message); + #onData(connectionId, message) { + this._onData?.(connectionId, message); } #start(port, targetNode = null) { @@ -53,12 +53,15 @@ class Connection { if (targetNode) this.connect(targetNode); } - #errorHandler(err) { - console.log(err); + close() { + for (let [connectionId, socket] of this.#connections) { + socket.close(); + this.#deleteConnection(connectionId); + } } - close() { - for (let [_, socket] of this.#connections) socket.destroy(); + #errorHandler(err) { + console.log(err); } } From 3e60b89d99fb6f54d581e10d513c96f027856502 Mon Sep 17 00:00:00 2001 From: diy0r Date: Sat, 5 Oct 2024 18:12:54 +0500 Subject: [PATCH 2/2] refactor: separate message handling --- lib/index.js | 4 ++-- lib/messaging.js | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) create mode 100644 lib/messaging.js diff --git a/lib/index.js b/lib/index.js index b2a7a06..b70fa1c 100644 --- a/lib/index.js +++ b/lib/index.js @@ -1,2 +1,2 @@ -const Connection = require('./connection'); -module.exports = { Connection }; +const Messaging = require('./messaging'); +module.exports = { ClapPeer: Messaging }; diff --git a/lib/messaging.js b/lib/messaging.js new file mode 100644 index 0000000..c291c5d --- /dev/null +++ b/lib/messaging.js @@ -0,0 +1,34 @@ +const { createData, uuid } = require('./common'); +const Connection = require('./connection'); +const { HANDSHAKE } = require('./constants'); + +class Messaging extends Connection { + NODE_ID = uuid(); + #neighbors = new Map(); + + constructor(port, targetNode) { + super(port, targetNode); + this.#start(); + } + + #newConnection(connectionId) { + this._send(connectionId, createData(HANDSHAKE, this.NODE_ID)); + } + + #addNeighbor(connectionId, messageObject) { + this.#neighbors.set(messageObject.data, connectionId); + } + + #onMessage(connectionId, messageObject) { + if (messageObject.type == HANDSHAKE) { + this.#addNeighbor(connectionId, messageObject); + } + } + + #start() { + this._onConnect = this.#newConnection; + this._onData = this.#onMessage; + } +} + +module.exports = Messaging;