Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat/receive-direct-message #9

Merged
merged 1 commit into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/chunkProcessor.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
'use strict';
const { Transform } = require('stream');
const { deserialization, SEPARATE_SYMBOL } = require('./common');

Expand Down
3 changes: 2 additions & 1 deletion lib/common/createData.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
const createData = (type, data) => ({ type, data });
module.exports = { createData };
const createMessage = (type, to, data) => ({ ...createData(type, data), to });
module.exports = { createData, createMessage };
34 changes: 18 additions & 16 deletions lib/connection.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
'use strict';
const net = require('net');
const { uuid, serialization } = require('./common');
const collectMessage = require('./chunkProcessor');
const { EventEmitter } = require('stream');

class Connection {
class Connection extends EventEmitter {
#connections = new Map();
constructor(port, targetNode) {
super();
this.#start(port, targetNode);
}

Expand All @@ -17,29 +20,35 @@ class Connection {
#handleConnection(socket) {
const connectionId = uuid();
socket.setNoDelay(false);
socket.on('close', this.#deleteConnection());
socket.on('close', () => this.#deleteConnection(connectionId));
socket.on('end', () => console.log('end'));
socket.on('timeout', () => console.log('timeout'));
socket.on('error', this.#errorHandler);
socket.on('error', () => this.#errorHandler());
socket
.pipe(collectMessage())
.on('data', message => this.#onData(connectionId, message));
this.#connections.set(connectionId, socket);
this._newConnection(connectionId);
}

#deleteConnection(connectionId) {
return error => {
this.#connections.delete(connectionId);
};
}

_send(connectionId, message) {
const socket = this.#connections.get(connectionId);
if (!socket) throw new Error(`Node does not exist ${connectionId}`);
socket.write(serialization(message));
}

close() {
for (let [connectionId, socket] of this.#connections) {
socket.close();
this.#deleteConnection(connectionId);
}
}

#deleteConnection(connectionId) {
this.#connections.delete(connectionId);
this._deleteConnection(connectionId);
}

#onData(connectionId, message) {
this._onMessage(connectionId, message);
}
Expand All @@ -50,13 +59,6 @@ class Connection {
if (targetNode) this.connect(targetNode);
}

close() {
for (let [connectionId, socket] of this.#connections) {
socket.close();
this.#deleteConnection(connectionId)();
}
}

#errorHandler(err) {
console.log(err);
}
Expand Down
3 changes: 2 additions & 1 deletion lib/constants.js
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
const HANDSHAKE = 'HANDSHAKE';
module.exports = { HANDSHAKE };
const DM = 'MESSAGE';
module.exports = { HANDSHAKE, DM };
47 changes: 38 additions & 9 deletions lib/messaging.js
Original file line number Diff line number Diff line change
@@ -1,29 +1,58 @@
const { createData, uuid } = require('./common');
'use strict';
const { createData, uuid, createMessage } = require('./common');
const Connection = require('./connection');
const { HANDSHAKE } = require('./constants');
const { HANDSHAKE, DM } = require('./constants');

class Messaging extends Connection {
NODE_ID = uuid();
neighbors = new Map();

constructor(port, targetNode) {
constructor(port, targetNode, nodeId) {
super(port, targetNode);
if (nodeId) this.NODE_ID = nodeId;
}

_newConnection(connectionId) {
this._send(connectionId, createData(HANDSHAKE, this.NODE_ID));
}

#addNeighbor(connectionId, messageObject) {
this.neighbors.set(messageObject.data, connectionId);
_onMessage(connectionId, messageObject) {
const { type, data, to } = messageObject;
this[type](data);
if (type === HANDSHAKE) {
this.#addNeighbor(connectionId, data);
}
if (type === DM) {
if (to !== this.NODE_ID) return this.send(to, data);
this.emit(DM, data);
}
}

_onMessage(connectionId, messageObject) {
if (messageObject.type == HANDSHAKE) {
console.log(messageObject);
this.#addNeighbor(connectionId, messageObject);
_deleteConnection(connectionId) {
const nodeId = this.#findNodeId(connectionId);
if (nodeId) this.neighbors.delete(connectionId);
}

send(nodeId, message) {
const connectionId = this.neighbors.get(nodeId);
if (connectionId)
return this._send(
connectionId,
createMessage({ type: DM, data: message, to: nodeId }),
);
this.neighbors.forEach(connectionId => this._send(connectionId, message));
}

#findNodeId(connectionId) {
for (let [nodeId, NodeConnectionId] of this.neighbors) {
if (connectionId === NodeConnectionId) return nodeId;
}
}

#addNeighbor(connectionId, neighbor) {
this.neighbors.set(neighbor, connectionId);
console.log(this.neighbors.get(neighbor));
}
}

module.exports = Messaging;
Loading