From 7a18dc3b48eb48f6c9ca17531f6d3c48ea7d2be5 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Tue, 16 Jan 2024 21:33:36 -0800 Subject: [PATCH] fix: allow autosharding nodes to get peers --- packages/tests/tests/getPeers.spec.ts | 27 +++++++++++++------ .../light-push/multiple_pubsub.node.spec.ts | 16 +++++------ .../tests/tests/store/multiple_pubsub.spec.ts | 6 +++-- packages/utils/src/libp2p/index.ts | 21 ++++++++++++--- 4 files changed, 46 insertions(+), 24 deletions(-) diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 956eb51705..03fd40969f 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -51,7 +51,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo), - lightpush: true + lightpush: true, + relay: true }); const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); @@ -77,12 +78,18 @@ describe("getConnectedPeersForProtocolAndShard", function () { shards: [1] }; + const shardInfoServiceNode: ShardInfo = { + clusterId: 1, + shards: [2] + }; + await serviceNode1.start({ discv5Discovery: true, peerExchange: true, - clusterId: shardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo), - lightpush: true + clusterId: shardInfoServiceNode.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfoServiceNode), + lightpush: true, + relay: true }); const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); @@ -120,7 +127,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true + lightpush: true, + relay: true }); // and another node in the same cluster cluster as our node @@ -129,7 +137,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo2.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true + lightpush: true, + relay: true }); const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); @@ -170,7 +179,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true + lightpush: true, + relay: true }); // and another node in the same cluster cluster as our node @@ -180,7 +190,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo2.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true + lightpush: true, + relay: true }); const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index 3d4a46bc13..1f6d155dd2 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -290,7 +290,8 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { filter: true, lightpush: true, relay: true, - pubsubTopic: [autoshardingPubsubTopic2] + pubsubTopic: [autoshardingPubsubTopic2], + clusterId: shardInfo.clusterId }); await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); await waku.dial(await nwaku2.getMultiaddrWithId()); @@ -353,10 +354,6 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () customContentTopic2, clusterId ); - const contentTopicInfo: ContentTopicInfo = { - clusterId, - contentTopics: [customContentTopic1, customContentTopic2] - }; const customEncoder1 = createEncoder({ contentTopic: customContentTopic1, pubsubTopicShardInfo: { @@ -372,11 +369,10 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes( - this, - [autoshardingPubsubTopic1, autoshardingPubsubTopic2], - contentTopicInfo - ); + [nwaku, waku] = await runNodes(this, [ + autoshardingPubsubTopic1, + autoshardingPubsubTopic2 + ]); messageCollector = new MessageCollector(nwaku); nimPeerId = await nwaku.getPeerId(); }); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 2d8f251a75..6f4e7a98e0 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -214,7 +214,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { await nwaku.start({ store: true, pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2], - relay: true + relay: true, + clusterId }); await nwaku.ensureSubscriptionsAutosharding([ customContentTopic1, @@ -287,7 +288,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { await nwaku2.start({ store: true, pubsubTopic: [autoshardingPubsubTopic2], - relay: true + relay: true, + clusterId }); await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 4bc305eca2..52a9e391a3 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -4,6 +4,7 @@ import type { ShardingParams } from "@waku/interfaces"; import { bytesToUtf8 } from "../bytes/index.js"; import { decodeRelayShard } from "../common/relay_shard_codec.js"; +import { contentTopicToShardIndex } from "../index.js"; /** * Returns a pseudo-random peer that supports the given protocol. @@ -89,17 +90,29 @@ export async function getConnectedPeersForProtocolAndShard( if (supportsProtocol) { if (shardInfo) { - //TODO: support auto-sharding + let shards; if (!("shards" in shardInfo)) { - throw new Error( - `Connections Manager only supports static sharding for now. Autosharding is not supported.` + if (!("contentTopics" in shardInfo)) { + throw new Error( + "Missing configuration for static or auto sharding" + ); + } + shards = shardInfo.contentTopics.map((topic) => + contentTopicToShardIndex(topic) ); + } else { + shards = shardInfo.shards; } const encodedPeerShardInfo = peer.metadata.get("shardInfo"); const peerShardInfo = encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo); - if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) { + if ( + peerShardInfo && + shardInfo.clusterId === peerShardInfo.clusterId && + (!("contentTopics" in shardInfo) || + shards.some((shard) => peerShardInfo.shards.includes(shard))) + ) { return peer; } } else {