diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 956eb51705..35ffbdc461 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -4,6 +4,7 @@ import type { Peer } from "@libp2p/interface/peer-store"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { LightPushCodec, waitForRemotePeer } from "@waku/core"; import { + ContentTopicInfo, createLightNode, Libp2pComponents, type LightNode, @@ -26,6 +27,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { let waku: LightNode; let serviceNode1: NimGoNode; let serviceNode2: NimGoNode; + const contentTopic = "/test/2/waku-light-push/utf8"; this.beforeEach(async function () { this.timeout(15000); @@ -51,7 +53,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo), - lightpush: true + lightpush: true, + relay: true }); const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); @@ -77,12 +80,18 @@ describe("getConnectedPeersForProtocolAndShard", function () { shards: [1] }; + const shardInfoServiceNode: ShardInfo = { + clusterId: 1, + shards: [2] + }; + await serviceNode1.start({ discv5Discovery: true, peerExchange: true, - clusterId: shardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo), - lightpush: true + clusterId: shardInfoServiceNode.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfoServiceNode), + lightpush: true, + relay: true }); const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); @@ -120,7 +129,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true + lightpush: true, + relay: true }); // and another node in the same cluster cluster as our node @@ -129,7 +139,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo2.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true + lightpush: true, + relay: true }); const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); @@ -170,7 +181,196 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true + lightpush: true, + relay: true + }); + + // and another node in the same cluster cluster as our node + const serviceNode2 = new NimGoNode(makeLogFileName(this) + "2"); + await serviceNode2.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo2.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo2), + lightpush: true, + relay: true + }); + + const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); + const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec); + await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec); + await waku.start(); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo2 + ); + expect(peers.length).to.be.equal(1); + }); + + it("same cluster, same shard: nodes connect (autosharding)", async function () { + this.timeout(15000); + + const shardInfo: ContentTopicInfo = { + clusterId: 1, + contentTopics: [contentTopic] + }; + + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo), + lightpush: true, + relay: true + }); + + const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo }); + await waku.start(); + await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); + await waitForRemotePeer(waku, [Protocols.LightPush]); + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo + ); + expect(peers.length).to.be.greaterThan(0); + }); + + it("same cluster, different shard: nodes connect (autosharding)", async function () { + this.timeout(15000); + + const shardInfo1: ContentTopicInfo = { + clusterId: 1, + contentTopics: [contentTopic] + }; + + const shardInfo2: ContentTopicInfo = { + clusterId: 1, + contentTopics: ["/test/5/waku-light-push/utf8"] + }; + + // Separate shard + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + lightpush: true, + relay: true + }); + + // Same shard + await serviceNode2.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo2.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo2), + lightpush: true, + relay: true + }); + + const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); + const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); + await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); + + await waku.start(); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo2 + ); + expect(peers.length).to.be.equal(1); + }); + + it("different cluster, same shard: nodes don't connect (autosharding)", async function () { + this.timeout(15000); + + const shardInfo1: ContentTopicInfo = { + clusterId: 1, + contentTopics: [contentTopic] + }; + + const shardInfo2: ContentTopicInfo = { + clusterId: 2, + contentTopics: [contentTopic] + }; + + // we start one node in a separate cluster + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + lightpush: true, + relay: true + }); + + // and another node in the same cluster cluster as our node + await serviceNode2.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo2.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo2), + lightpush: true, + relay: true + }); + + const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); + const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); + await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); + + await waku.start(); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo2 + ); + expect(peers.length).to.be.equal(1); + }); + + it("different cluster, different shard: nodes don't connect (autosharding)", async function () { + this.timeout(15000); + + const shardInfo1: ContentTopicInfo = { + clusterId: 1, + contentTopics: [contentTopic] + }; + + const shardInfo2: ContentTopicInfo = { + clusterId: 2, + contentTopics: ["/test/5/waku-light-push/utf8"] + }; + + // we start one node in a separate cluster + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + lightpush: true, + relay: true }); // and another node in the same cluster cluster as our node @@ -180,7 +380,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo2.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true + lightpush: true, + relay: true }); const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index 3d4a46bc13..1f6d155dd2 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -290,7 +290,8 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { filter: true, lightpush: true, relay: true, - pubsubTopic: [autoshardingPubsubTopic2] + pubsubTopic: [autoshardingPubsubTopic2], + clusterId: shardInfo.clusterId }); await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); await waku.dial(await nwaku2.getMultiaddrWithId()); @@ -353,10 +354,6 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () customContentTopic2, clusterId ); - const contentTopicInfo: ContentTopicInfo = { - clusterId, - contentTopics: [customContentTopic1, customContentTopic2] - }; const customEncoder1 = createEncoder({ contentTopic: customContentTopic1, pubsubTopicShardInfo: { @@ -372,11 +369,10 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes( - this, - [autoshardingPubsubTopic1, autoshardingPubsubTopic2], - contentTopicInfo - ); + [nwaku, waku] = await runNodes(this, [ + autoshardingPubsubTopic1, + autoshardingPubsubTopic2 + ]); messageCollector = new MessageCollector(nwaku); nimPeerId = await nwaku.getPeerId(); }); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 2d8f251a75..6f4e7a98e0 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -214,7 +214,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { await nwaku.start({ store: true, pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2], - relay: true + relay: true, + clusterId }); await nwaku.ensureSubscriptionsAutosharding([ customContentTopic1, @@ -287,7 +288,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { await nwaku2.start({ store: true, pubsubTopic: [autoshardingPubsubTopic2], - relay: true + relay: true, + clusterId }); await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 4bc305eca2..b393b473c9 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -89,12 +89,6 @@ export async function getConnectedPeersForProtocolAndShard( if (supportsProtocol) { if (shardInfo) { - //TODO: support auto-sharding - if (!("shards" in shardInfo)) { - throw new Error( - `Connections Manager only supports static sharding for now. Autosharding is not supported.` - ); - } const encodedPeerShardInfo = peer.metadata.get("shardInfo"); const peerShardInfo = encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo);