diff --git a/package-lock.json b/package-lock.json index 3f4bb5b45d..e43daa1dd9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26279,6 +26279,7 @@ "@noble/hashes": "^1.3.2", "@waku/enr": "^0.0.19", "@waku/interfaces": "0.0.20", + "@waku/message-hash": "0.1.9", "@waku/proto": "0.0.5", "@waku/utils": "0.0.13", "debug": "^4.3.4", @@ -26641,6 +26642,7 @@ "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", + "@waku/interfaces": "0.0.20", "chai": "^4.3.10", "debug": "^4.3.4", "uint8arrays": "^4.0.4" @@ -26650,7 +26652,6 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.20", "cspell": "^7.3.2", "npm-run-all": "^4.1.5", "rollup": "^4.6.0" @@ -29779,6 +29780,7 @@ "@waku/build-utils": "*", "@waku/enr": "^0.0.19", "@waku/interfaces": "0.0.20", + "@waku/message-hash": "0.1.9", "@waku/proto": "0.0.5", "@waku/utils": "0.0.13", "chai": "^4.3.10", diff --git a/packages/core/package.json b/packages/core/package.json index 3ad4bbbaf2..e13373bff9 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -75,6 +75,7 @@ "@noble/hashes": "^1.3.2", "@waku/enr": "^0.0.19", "@waku/interfaces": "0.0.20", + "@waku/message-hash": "0.1.9", "@waku/proto": "0.0.5", "@waku/utils": "0.0.13", "debug": "^4.3.4", diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index bed4d8fc89..3e7c03005e 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -26,6 +26,7 @@ import { StreamManager } from "./stream_manager.js"; export class BaseProtocol implements IBaseProtocol { public readonly addLibp2pEventListener: Libp2p["addEventListener"]; public readonly removeLibp2pEventListener: Libp2p["removeEventListener"]; + readonly NUM_PEERS_TO_USE = 3; protected streamManager: StreamManager; constructor( diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 06caa77b51..3686c477b2 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -11,13 +11,13 @@ import type { IProtoMessage, IReceiver, Libp2p, - PeerIdStr, ProtocolCreateOptions, PubsubTopic, SingleShardInfo, Unsubscribe } from "@waku/interfaces"; import { DefaultPubsubTopic } from "@waku/interfaces"; +import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; import { ensurePubsubTopicIsConfigured, @@ -50,10 +50,14 @@ export const FilterCodecs = { PUSH: "/vac/waku/filter-push/2.0.0-beta1" }; +/** + * A subscription object refers to a subscription to a given pubsub topic. + */ class Subscription { - private readonly peer: Peer; + readonly peers: Peer[]; private readonly pubsubTopic: PubsubTopic; private newStream: (peer: Peer) => Promise; + readonly receivedMessagesHashStr: string[] = []; private subscriptionCallbacks: Map< ContentTopic, @@ -62,10 +66,10 @@ class Subscription { constructor( pubsubTopic: PubsubTopic, - remotePeer: Peer, + remotePeers: Peer[], newStream: (peer: Peer) => Promise ) { - this.peer = remotePeer; + this.peers = remotePeers; this.pubsubTopic = pubsubTopic; this.newStream = newStream; this.subscriptionCallbacks = new Map(); @@ -89,51 +93,64 @@ class Subscription { const decodersGroupedByCT = groupByContentTopic(decodersArray); const contentTopics = Array.from(decodersGroupedByCT.keys()); - const stream = await this.newStream(this.peer); + const promises = this.peers.map(async (peer) => { + const stream = await this.newStream(peer); - const request = FilterSubscribeRpc.createSubscribeRequest( - this.pubsubTopic, - contentTopics - ); - - try { - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) + const request = FilterSubscribeRpc.createSubscribeRequest( + this.pubsubTopic, + contentTopics ); - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` + try { + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) ); - } - const { statusCode, requestId, statusDesc } = - FilterSubscribeResponse.decode(res[0].slice()); + if (!res || !res.length) { + throw Error( + `No response received for request ${request.requestId}: ${res}` + ); + } - if (statusCode < 200 || statusCode >= 300) { + const { statusCode, requestId, statusDesc } = + FilterSubscribeResponse.decode(res[0].slice()); + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + ); + } + + log.info( + "Subscribed to peer ", + peer.id.toString(), + "for content topics", + contentTopics + ); + } catch (e) { throw new Error( - `Filter subscribe request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + "Error subscribing to peer: " + + peer.id.toString() + + " for content topics: " + + contentTopics + + ": " + + e ); } + }); + const results = await Promise.allSettled(promises); - log.info( - "Subscribed to peer ", - this.peer.id.toString(), - "for content topics", - contentTopics - ); - } catch (e) { - throw new Error( - "Error subscribing to peer: " + - this.peer.id.toString() + - " for content topics: " + - contentTopics + - ": " + - e + const errors = results.filter((result) => result.status === "rejected"); + + if (errors && errors.length) { + // TODO: handle renewing faulty peers with new peers + log.warn( + "Some subscriptions failed. These will be refreshed with new peers", + errors ); } @@ -155,104 +172,166 @@ class Subscription { } async unsubscribe(contentTopics: ContentTopic[]): Promise { - const stream = await this.newStream(this.peer); - const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest( - this.pubsubTopic, - contentTopics - ); + const promises = this.peers.map(async (peer) => { + const stream = await this.newStream(peer); + const unsubscribeRequest = FilterSubscribeRpc.createUnsubscribeRequest( + this.pubsubTopic, + contentTopics + ); - try { - await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink); - } catch (error) { - throw new Error("Error subscribing: " + error); - } + try { + await pipe([unsubscribeRequest.encode()], lp.encode, stream.sink); + } catch (error) { + throw new Error("Error subscribing: " + error); + } - contentTopics.forEach((contentTopic: string) => { - this.subscriptionCallbacks.delete(contentTopic); + contentTopics.forEach((contentTopic: string) => { + this.subscriptionCallbacks.delete(contentTopic); + }); }); - } - async ping(): Promise { - const stream = await this.newStream(this.peer); + const results = await Promise.allSettled(promises); - const request = FilterSubscribeRpc.createSubscriberPingRequest(); + const errors = results.filter((result) => result.status === "rejected"); - try { - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) + if (errors && errors.length) { + // TODO: handle renewing faulty peers with new peers + log.warn( + "Some subscriptions failed. These will be refreshed with new peers", + errors ); + } + } - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` + async ping(): Promise { + const promises = this.peers.map(async (peer) => { + const stream = await this.newStream(peer); + + const request = FilterSubscribeRpc.createSubscriberPingRequest(); + + try { + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) ); - } - const { statusCode, requestId, statusDesc } = - FilterSubscribeResponse.decode(res[0].slice()); + if (!res || !res.length) { + throw Error( + `No response received for request ${request.requestId}: ${res}` + ); + } - if (statusCode < 200 || statusCode >= 300) { - throw new Error( - `Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}` - ); + const { statusCode, requestId, statusDesc } = + FilterSubscribeResponse.decode(res[0].slice()); + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter ping request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + ); + } + + log.info(`Ping successful for peer ${peer.id.toString()}`); + } catch (error) { + log.error("Error pinging: ", error); + throw new Error("Error pinging: " + error); } + }); + + const results = await Promise.allSettled(promises); - log.info("Ping successful"); - } catch (error) { - log.error("Error pinging: ", error); - throw new Error("Error pinging: " + error); + const errors = results.filter((result) => result.status === "rejected"); + + if (errors && errors.length) { + // TODO: handle renewing faulty peers with new peers + log.warn( + "Some subscriptions failed. These will be refreshed with new peers", + errors + ); } } async unsubscribeAll(): Promise { - const stream = await this.newStream(this.peer); - - const request = FilterSubscribeRpc.createUnsubscribeAllRequest( - this.pubsubTopic - ); + const promises = this.peers.map(async (peer) => { + const stream = await this.newStream(peer); - try { - const res = await pipe( - [request.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) + const request = FilterSubscribeRpc.createUnsubscribeAllRequest( + this.pubsubTopic ); - if (!res || !res.length) { - throw Error( - `No response received for request ${request.requestId}: ${res}` + try { + const res = await pipe( + [request.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) ); - } - const { statusCode, requestId, statusDesc } = - FilterSubscribeResponse.decode(res[0].slice()); + if (!res || !res.length) { + throw Error( + `No response received for request ${request.requestId}: ${res}` + ); + } + + const { statusCode, requestId, statusDesc } = + FilterSubscribeResponse.decode(res[0].slice()); + + if (statusCode < 200 || statusCode >= 300) { + throw new Error( + `Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + ); + } - if (statusCode < 200 || statusCode >= 300) { + this.subscriptionCallbacks.clear(); + log.info( + `Unsubscribed from all content topics for pubsub topic ${this.pubsubTopic}` + ); + } catch (error) { throw new Error( - `Filter unsubscribe all request ${requestId} failed with status code ${statusCode}: ${statusDesc}` + "Error unsubscribing from all content topics: " + error ); } + }); + + const results = await Promise.allSettled(promises); - this.subscriptionCallbacks.clear(); - log.info("Unsubscribed from all content topics"); - } catch (error) { - throw new Error("Error unsubscribing from all content topics: " + error); + const errors = results.filter((result) => result.status === "rejected"); + + if (errors && errors.length) { + // TODO: handle renewing faulty peers with new peers + log.warn( + "Some subscriptions failed. These will be refreshed with new peers", + errors + ); } } async processMessage(message: WakuMessage): Promise { - const contentTopic = message.contentTopic; + const hashedMessageStr = messageHashStr( + this.pubsubTopic, + message as IProtoMessage + ); + if (this.receivedMessagesHashStr.includes(hashedMessageStr)) { + log.info("Message already received, skipping"); + return; + } + this.receivedMessagesHashStr.push(hashedMessageStr); + + const { contentTopic } = message; const subscriptionCallback = this.subscriptionCallbacks.get(contentTopic); if (!subscriptionCallback) { log.error("No subscription callback available for ", contentTopic); return; } + log.info( + "Processing message with content topic ", + contentTopic, + " on pubsub topic ", + this.pubsubTopic + ); await pushMessage(subscriptionCallback, this.pubsubTopic, message); } } @@ -260,21 +339,18 @@ class Subscription { class Filter extends BaseProtocol implements IReceiver { private readonly pubsubTopics: PubsubTopic[] = []; private activeSubscriptions = new Map(); - private readonly NUM_PEERS_PROTOCOL = 1; private getActiveSubscription( - pubsubTopic: PubsubTopic, - peerIdStr: PeerIdStr + pubsubTopic: PubsubTopic ): Subscription | undefined { - return this.activeSubscriptions.get(`${pubsubTopic}_${peerIdStr}`); + return this.activeSubscriptions.get(pubsubTopic); } private setActiveSubscription( pubsubTopic: PubsubTopic, - peerIdStr: PeerIdStr, subscription: Subscription ): Subscription { - this.activeSubscriptions.set(`${pubsubTopic}_${peerIdStr}`, subscription); + this.activeSubscriptions.set(pubsubTopic, subscription); return subscription; } @@ -290,6 +366,12 @@ class Filter extends BaseProtocol implements IReceiver { this.activeSubscriptions = new Map(); } + /** + * Creates a new subscription to the given pubsub topic. + * The subscription is made to multiple peers for decentralization. + * @param pubsubTopicShardInfo The pubsub topic to subscribe to. + * @returns The subscription object. + */ async createSubscription( pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic ): Promise { @@ -302,19 +384,20 @@ class Filter extends BaseProtocol implements IReceiver { //TODO: get a relevant peer for the topic/shard // https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230 - const peer = ( - await this.getPeers({ - maxBootstrapPeers: 1, - numPeers: this.NUM_PEERS_PROTOCOL - }) - )[0]; + const peers = await this.getPeers({ + maxBootstrapPeers: 1, + numPeers: this.NUM_PEERS_TO_USE + }); + log.info( + `Creating filter subscription with ${peers.length} peers: `, + peers.map((peer) => peer.id.toString()) + ); const subscription = - this.getActiveSubscription(pubsubTopic, peer.id.toString()) ?? + this.getActiveSubscription(pubsubTopic) ?? this.setActiveSubscription( pubsubTopic, - peer.id.toString(), - new Subscription(pubsubTopic, peer, this.getStream.bind(this, peer)) + new Subscription(pubsubTopic, peers, this.getStream.bind(this)) ); return subscription; @@ -361,8 +444,11 @@ class Filter extends BaseProtocol implements IReceiver { } private onRequest(streamData: IncomingStreamData): void { + const { connection, stream } = streamData; + const { remotePeer } = connection; + log.info(`Received message from ${remotePeer.toString()}`); try { - pipe(streamData.stream, lp.decode, async (source) => { + pipe(stream, lp.decode, async (source) => { for await (const bytes of source) { const response = FilterPushRpc.decode(bytes.slice()); @@ -378,11 +464,7 @@ class Filter extends BaseProtocol implements IReceiver { return; } - const peerIdStr = streamData.connection.remotePeer.toString(); - const subscription = this.getActiveSubscription( - pubsubTopic, - peerIdStr - ); + const subscription = this.getActiveSubscription(pubsubTopic); if (!subscription) { log.error( diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 773e033565..002c27ad55 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -45,7 +45,6 @@ type PreparePushMessageResult = */ class LightPush extends BaseProtocol implements ILightPush { private readonly pubsubTopics: PubsubTopic[]; - private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(LightPushCodec, libp2p.components); @@ -111,7 +110,7 @@ class LightPush extends BaseProtocol implements ILightPush { //TODO: get a relevant peer for the topic/shard const peers = await this.getPeers({ maxBootstrapPeers: 1, - numPeers: this.NUM_PEERS_PROTOCOL + numPeers: this.NUM_PEERS_TO_USE }); if (!peers.length) { @@ -177,6 +176,8 @@ class LightPush extends BaseProtocol implements ILightPush { }); const results = await Promise.allSettled(promises); + + // TODO: handle renewing faulty peers with new peers const errors = results .filter( ( diff --git a/packages/message-hash/src/index.ts b/packages/message-hash/src/index.ts index ac7982e5b6..4bf9842e19 100644 --- a/packages/message-hash/src/index.ts +++ b/packages/message-hash/src/index.ts @@ -1,6 +1,6 @@ import { sha256 } from "@noble/hashes/sha256"; import type { IProtoMessage } from "@waku/interfaces"; -import { concat, utf8ToBytes } from "@waku/utils/bytes"; +import { bytesToUtf8, concat, utf8ToBytes } from "@waku/utils/bytes"; /** * Deterministic Message Hashing as defined in @@ -27,3 +27,12 @@ export function messageHash( } return sha256(bytes); } + +export function messageHashStr( + pubsubTopic: string, + message: IProtoMessage +): string { + const hash = messageHash(pubsubTopic, message); + const hashStr = bytesToUtf8(hash); + return hashStr; +} diff --git a/packages/tests/src/index.ts b/packages/tests/src/index.ts index d56386239b..c219913417 100644 --- a/packages/tests/src/index.ts +++ b/packages/tests/src/index.ts @@ -13,3 +13,4 @@ export * from "./node/node.js"; export * from "./teardown.js"; export * from "./message_collector.js"; export * from "./utils.js"; +export * from "./waitForConnections.js"; diff --git a/packages/tests/src/waitForConnections.ts b/packages/tests/src/waitForConnections.ts new file mode 100644 index 0000000000..b0fc063214 --- /dev/null +++ b/packages/tests/src/waitForConnections.ts @@ -0,0 +1,20 @@ +import type { LightNode } from "@waku/interfaces"; +export async function waitForConnections( + numPeers: number, + waku: LightNode +): Promise { + let connectionsLen = waku.libp2p.getConnections().length; + if (connectionsLen >= numPeers) { + return; + } + await new Promise((resolve) => { + const cb = (): void => { + connectionsLen++; + if (connectionsLen >= numPeers) { + waku.libp2p.removeEventListener("peer:identify", cb); + resolve(); + } + }; + waku.libp2p.addEventListener("peer:identify", cb); + }); +} diff --git a/packages/tests/tests/filter/redundant-subscribe.node.spec.ts b/packages/tests/tests/filter/redundant-subscribe.node.spec.ts new file mode 100644 index 0000000000..9573972255 --- /dev/null +++ b/packages/tests/tests/filter/redundant-subscribe.node.spec.ts @@ -0,0 +1,61 @@ +import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { DefaultPubsubTopic } from "@waku/interfaces"; +import { expect } from "chai"; + +import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js"; + +import { + messagePayload, + messageText, + runMultipleNodes, + TestContentTopic, + TestDecoder, + TestEncoder +} from "./utils.js"; + +describe("Waku Filter V2: Subscribe: Redundant", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(100000); + let waku: LightNode; + let serviceNodes: NimGoNode[]; + let subscription: IFilterSubscription; + let messageCollector: MessageCollector; + + this.beforeEach(async function () { + this.timeout(15000); + [serviceNodes, waku] = await runMultipleNodes( + this, + [DefaultPubsubTopic], + undefined, + 3 + ); + subscription = await waku.filter.createSubscription(); + messageCollector = new MessageCollector(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await tearDownNodes(serviceNodes, waku); + }); + + it("Subscribe and receive messages via lightPush", async function () { + expect(waku.libp2p.getConnections()).has.length(3); + + await subscription.subscribe([TestDecoder], messageCollector.callback); + + await waku.lightPush.send(TestEncoder, messagePayload); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + + // either of the service nodes need to have the message + await Promise.race( + serviceNodes.map(async (node) => + expect(await node.messages()).to.have.length(1) + ) + ); + }); +}); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index c7a929cfa2..669f30c03a 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -3,6 +3,7 @@ import { DefaultPubsubTopic, IFilterSubscription, LightNode, + ProtocolCreateOptions, Protocols, ShardingParams } from "@waku/interfaces"; @@ -60,7 +61,7 @@ export async function runNodes( { retries: 3 } ); - const waku_options = { + const waku_options: ProtocolCreateOptions = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, pubsubTopics: shardInfo ? undefined : pubsubTopics, @@ -88,3 +89,63 @@ export async function runNodes( throw new Error("Failed to initialize waku"); } } + +export async function runMultipleNodes( + context: Context, + //TODO: change this to use `ShardInfo` instead of `string[]` + pubsubTopics: string[], + shardInfo?: ShardingParams, + numServiceNodes = 1 +): Promise<[NimGoNode[], LightNode]> { + // create numServiceNodes nodes + const serviceNodes = [...Array(numServiceNodes).keys()].map( + (num) => new NimGoNode(makeLogFileName(context) + `-${num}`) + ); + + await Promise.all( + serviceNodes.map(async (nwaku) => { + await nwaku.start( + { + filter: true, + lightpush: true, + relay: true, + pubsubTopic: pubsubTopics + }, + { retries: 3 } + ); + return nwaku; + }) + ); + + const waku_options: ProtocolCreateOptions = { + staticNoiseKey: NOISE_KEY_1, + libp2p: { + addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } + }, + pubsubTopics: shardInfo ? undefined : pubsubTopics, + ...((pubsubTopics.length !== 1 || + pubsubTopics[0] !== DefaultPubsubTopic) && { + shardInfo: shardInfo + }) + }; + + log.info("Starting js waku node with :", JSON.stringify(waku_options)); + let waku: LightNode | undefined; + try { + waku = await createLightNode(waku_options); + await waku.start(); + } catch (error) { + log.error("jswaku node failed to start:", error); + } + + if (waku) { + for (const node of serviceNodes) { + await waku.dial(await node.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + await node.ensureSubscriptions(pubsubTopics); + } + return [serviceNodes, waku]; + } else { + throw new Error("Failed to initialize waku"); + } +}