Skip to content

Commit

Permalink
feat: receive direct message
Browse files Browse the repository at this point in the history
  • Loading branch information
DIY0R committed Oct 11, 2024
1 parent e7df3f5 commit 4a54ffa
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 27 deletions.
1 change: 1 addition & 0 deletions lib/chunkProcessor.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
'use strict';
const { Transform } = require('stream');
const { deserialization, SEPARATE_SYMBOL } = require('./common');

Expand Down
3 changes: 2 additions & 1 deletion lib/common/createData.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
const createData = (type, data) => ({ type, data });
module.exports = { createData };
const createMessage = (type, to, data) => ({ ...createData(type, data), to });
module.exports = { createData, createMessage };
34 changes: 18 additions & 16 deletions lib/connection.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
'use strict';
const net = require('net');
const { uuid, serialization } = require('./common');
const collectMessage = require('./chunkProcessor');
const { EventEmitter } = require('stream');

class Connection {
class Connection extends EventEmitter {
#connections = new Map();
constructor(port, targetNode) {
super();
this.#start(port, targetNode);
}

Expand All @@ -17,29 +20,35 @@ class Connection {
#handleConnection(socket) {
const connectionId = uuid();
socket.setNoDelay(false);
socket.on('close', this.#deleteConnection());
socket.on('close', () => this.#deleteConnection(connectionId));
socket.on('end', () => console.log('end'));
socket.on('timeout', () => console.log('timeout'));
socket.on('error', this.#errorHandler);
socket.on('error', () => this.#errorHandler());
socket
.pipe(collectMessage())
.on('data', message => this.#onData(connectionId, message));
this.#connections.set(connectionId, socket);
this._newConnection(connectionId);
}

#deleteConnection(connectionId) {
return error => {
this.#connections.delete(connectionId);
};
}

_send(connectionId, message) {
const socket = this.#connections.get(connectionId);
if (!socket) throw new Error(`Node does not exist ${connectionId}`);
socket.write(serialization(message));
}

close() {
for (let [connectionId, socket] of this.#connections) {
socket.close();
this.#deleteConnection(connectionId);
}
}

#deleteConnection(connectionId) {
this.#connections.delete(connectionId);
this._deleteConnection(connectionId);
}

#onData(connectionId, message) {
this._onMessage(connectionId, message);
}
Expand All @@ -50,13 +59,6 @@ class Connection {
if (targetNode) this.connect(targetNode);
}

close() {
for (let [connectionId, socket] of this.#connections) {
socket.close();
this.#deleteConnection(connectionId)();
}
}

#errorHandler(err) {
console.log(err);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/constants.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
const HANDSHAKE = 'HANDSHAKE';
module.exports = { HANDSHAKE };
const DM = 'MESSAGE';
module.exports = { HANDSHAKE, DM };
47 changes: 38 additions & 9 deletions lib/messaging.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,58 @@
const { createData, uuid } = require('./common');
'use strict';
const { createData, uuid, createMessage } = require('./common');
const Connection = require('./connection');
const { HANDSHAKE } = require('./constants');
const { HANDSHAKE, DM } = require('./constants');

class Messaging extends Connection {
NODE_ID = uuid();
neighbors = new Map();

constructor(port, targetNode) {
constructor(port, targetNode, nodeId) {
super(port, targetNode);
if (nodeId) this.NODE_ID = nodeId;
}

_newConnection(connectionId) {
this._send(connectionId, createData(HANDSHAKE, this.NODE_ID));
}

#addNeighbor(connectionId, messageObject) {
this.neighbors.set(messageObject.data, connectionId);
_onMessage(connectionId, messageObject) {
const { type, data, to } = messageObject;
this[type](data);
if (type === HANDSHAKE) {
this.#addNeighbor(connectionId, data);
}
if (type === DM) {
if (to !== this.NODE_ID) return this.send(to, data);
this.emit(DM, data);
}
}

_onMessage(connectionId, messageObject) {
if (messageObject.type == HANDSHAKE) {
console.log(messageObject);
this.#addNeighbor(connectionId, messageObject);
_deleteConnection(connectionId) {
const nodeId = this.#findNodeId(connectionId);
if (nodeId) this.neighbors.delete(connectionId);
}

send(nodeId, message) {
const connectionId = this.neighbors.get(nodeId);
if (connectionId)
return this._send(
connectionId,
createMessage({ type: DM, data: message, to: nodeId }),
);
this.neighbors.forEach(connectionId => this._send(connectionId, message));
}

#findNodeId(connectionId) {
for (let [nodeId, NodeConnectionId] of this.neighbors) {
if (connectionId === NodeConnectionId) return nodeId;
}
}

#addNeighbor(connectionId, neighbor) {
this.neighbors.set(neighbor, connectionId);
console.log(this.neighbors.get(neighbor));
}
}

module.exports = Messaging;

0 comments on commit 4a54ffa

Please sign in to comment.