Skip to content

Commit

Permalink
fix: allow autosharding nodes to get peers (#1785)
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner authored Jan 17, 2024
1 parent 941b02b commit 4dd451b
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 26 deletions.
217 changes: 209 additions & 8 deletions packages/tests/tests/getPeers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Peer } from "@libp2p/interface/peer-store";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { LightPushCodec, waitForRemotePeer } from "@waku/core";
import {
ContentTopicInfo,
createLightNode,
Libp2pComponents,
type LightNode,
Expand All @@ -26,6 +27,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
let waku: LightNode;
let serviceNode1: NimGoNode;
let serviceNode2: NimGoNode;
const contentTopic = "/test/2/waku-light-push/utf8";

this.beforeEach(async function () {
this.timeout(15000);
Expand All @@ -51,7 +53,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
lightpush: true
lightpush: true,
relay: true
});

const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
Expand All @@ -77,12 +80,18 @@ describe("getConnectedPeersForProtocolAndShard", function () {
shards: [1]
};

const shardInfoServiceNode: ShardInfo = {
clusterId: 1,
shards: [2]
};

await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
lightpush: true
clusterId: shardInfoServiceNode.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfoServiceNode),
lightpush: true,
relay: true
});

const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
Expand Down Expand Up @@ -120,7 +129,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true
lightpush: true,
relay: true
});

// and another node in the same cluster cluster as our node
Expand All @@ -129,7 +139,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true
lightpush: true,
relay: true
});

const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
Expand Down Expand Up @@ -170,7 +181,196 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true
lightpush: true,
relay: true
});

// and another node in the same cluster cluster as our node
const serviceNode2 = new NimGoNode(makeLogFileName(this) + "2");
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});

const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId();

waku = await createLightNode({ shardInfo: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec);
await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);

const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo2
);
expect(peers.length).to.be.equal(1);
});

it("same cluster, same shard: nodes connect (autosharding)", async function () {
this.timeout(15000);

const shardInfo: ContentTopicInfo = {
clusterId: 1,
contentTopics: [contentTopic]
};

await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
lightpush: true,
relay: true
});

const serviceNodeMa = await serviceNode1.getMultiaddrWithId();

waku = await createLightNode({ shardInfo });
await waku.start();
await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec);
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo
);
expect(peers.length).to.be.greaterThan(0);
});

it("same cluster, different shard: nodes connect (autosharding)", async function () {
this.timeout(15000);

const shardInfo1: ContentTopicInfo = {
clusterId: 1,
contentTopics: [contentTopic]
};

const shardInfo2: ContentTopicInfo = {
clusterId: 1,
contentTopics: ["/test/5/waku-light-push/utf8"]
};

// Separate shard
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});

// Same shard
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});

const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();

waku = await createLightNode({ shardInfo: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);

await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);

const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo2
);
expect(peers.length).to.be.equal(1);
});

it("different cluster, same shard: nodes don't connect (autosharding)", async function () {
this.timeout(15000);

const shardInfo1: ContentTopicInfo = {
clusterId: 1,
contentTopics: [contentTopic]
};

const shardInfo2: ContentTopicInfo = {
clusterId: 2,
contentTopics: [contentTopic]
};

// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});

// and another node in the same cluster cluster as our node
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});

const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();

waku = await createLightNode({ shardInfo: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);

await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);

const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo2
);
expect(peers.length).to.be.equal(1);
});

it("different cluster, different shard: nodes don't connect (autosharding)", async function () {
this.timeout(15000);

const shardInfo1: ContentTopicInfo = {
clusterId: 1,
contentTopics: [contentTopic]
};

const shardInfo2: ContentTopicInfo = {
clusterId: 2,
contentTopics: ["/test/5/waku-light-push/utf8"]
};

// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});

// and another node in the same cluster cluster as our node
Expand All @@ -180,7 +380,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true
lightpush: true,
relay: true
});

const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
Expand Down
16 changes: 6 additions & 10 deletions packages/tests/tests/light-push/multiple_pubsub.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [autoshardingPubsubTopic2]
pubsubTopic: [autoshardingPubsubTopic2],
clusterId: shardInfo.clusterId
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
await waku.dial(await nwaku2.getMultiaddrWithId());
Expand Down Expand Up @@ -353,10 +354,6 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
customContentTopic2,
clusterId
);
const contentTopicInfo: ContentTopicInfo = {
clusterId,
contentTopics: [customContentTopic1, customContentTopic2]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: {
Expand All @@ -372,11 +369,10 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()

this.beforeEach(async function () {
this.timeout(15000);
[nwaku, waku] = await runNodes(
this,
[autoshardingPubsubTopic1, autoshardingPubsubTopic2],
contentTopicInfo
);
[nwaku, waku] = await runNodes(this, [
autoshardingPubsubTopic1,
autoshardingPubsubTopic2
]);
messageCollector = new MessageCollector(nwaku);
nimPeerId = await nwaku.getPeerId();
});
Expand Down
6 changes: 4 additions & 2 deletions packages/tests/tests/store/multiple_pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
await nwaku.start({
store: true,
pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2],
relay: true
relay: true,
clusterId
});
await nwaku.ensureSubscriptionsAutosharding([
customContentTopic1,
Expand Down Expand Up @@ -287,7 +288,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
await nwaku2.start({
store: true,
pubsubTopic: [autoshardingPubsubTopic2],
relay: true
relay: true,
clusterId
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);

Expand Down
6 changes: 0 additions & 6 deletions packages/utils/src/libp2p/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ export async function getConnectedPeersForProtocolAndShard(

if (supportsProtocol) {
if (shardInfo) {
//TODO: support auto-sharding
if (!("shards" in shardInfo)) {
throw new Error(
`Connections Manager only supports static sharding for now. Autosharding is not supported.`
);
}
const encodedPeerShardInfo = peer.metadata.get("shardInfo");
const peerShardInfo =
encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo);
Expand Down

0 comments on commit 4dd451b

Please sign in to comment.