Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow autosharding nodes to get peers #1785

Merged
merged 1 commit into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 209 additions & 8 deletions packages/tests/tests/getPeers.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { Peer } from "@libp2p/interface/peer-store";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { LightPushCodec, waitForRemotePeer } from "@waku/core";
import {
ContentTopicInfo,
createLightNode,
Libp2pComponents,
type LightNode,
Expand All @@ -26,6 +27,7 @@ describe("getConnectedPeersForProtocolAndShard", function () {
let waku: LightNode;
let serviceNode1: NimGoNode;
let serviceNode2: NimGoNode;
const contentTopic = "/test/2/waku-light-push/utf8";

this.beforeEach(async function () {
this.timeout(15000);
Expand All @@ -51,7 +53,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
lightpush: true
lightpush: true,
relay: true
});

const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
Expand All @@ -77,12 +80,18 @@ describe("getConnectedPeersForProtocolAndShard", function () {
shards: [1]
};

const shardInfoServiceNode: ShardInfo = {
clusterId: 1,
shards: [2]
};

await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
lightpush: true
clusterId: shardInfoServiceNode.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfoServiceNode),
lightpush: true,
relay: true
});

const serviceNodeMa = await serviceNode1.getMultiaddrWithId();
Expand Down Expand Up @@ -120,7 +129,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true
lightpush: true,
relay: true
});

// and another node in the same cluster cluster as our node
Expand All @@ -129,7 +139,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true
lightpush: true,
relay: true
});

const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
Expand Down Expand Up @@ -170,7 +181,196 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true
lightpush: true,
relay: true
});

// and another node in the same cluster cluster as our node
const serviceNode2 = new NimGoNode(makeLogFileName(this) + "2");
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});

const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId();

waku = await createLightNode({ shardInfo: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec);
await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);

const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo2
);
expect(peers.length).to.be.equal(1);
});

it("same cluster, same shard: nodes connect (autosharding)", async function () {
this.timeout(15000);

const shardInfo: ContentTopicInfo = {
clusterId: 1,
contentTopics: [contentTopic]
};

await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo),
lightpush: true,
relay: true
});

const serviceNodeMa = await serviceNode1.getMultiaddrWithId();

waku = await createLightNode({ shardInfo });
await waku.start();
await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec);
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo
);
expect(peers.length).to.be.greaterThan(0);
});

it("same cluster, different shard: nodes connect (autosharding)", async function () {
this.timeout(15000);

const shardInfo1: ContentTopicInfo = {
clusterId: 1,
contentTopics: [contentTopic]
};

const shardInfo2: ContentTopicInfo = {
clusterId: 1,
contentTopics: ["/test/5/waku-light-push/utf8"]
};

// Separate shard
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});

// Same shard
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});

const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();

waku = await createLightNode({ shardInfo: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);

await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);

const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo2
);
expect(peers.length).to.be.equal(1);
});

it("different cluster, same shard: nodes don't connect (autosharding)", async function () {
this.timeout(15000);

const shardInfo1: ContentTopicInfo = {
clusterId: 1,
contentTopics: [contentTopic]
};

const shardInfo2: ContentTopicInfo = {
clusterId: 2,
contentTopics: [contentTopic]
};

// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});

// and another node in the same cluster cluster as our node
await serviceNode2.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true,
relay: true
});

const serviceNode1Ma = await serviceNode1.getMultiaddrWithId();
const serviceNode2Ma = await serviceNode2.getMultiaddrWithId();

waku = await createLightNode({ shardInfo: shardInfo2 });
await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec);
await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec);

await waku.start();
await waitForRemotePeer(waku, [Protocols.LightPush]);

const peers = await getConnectedPeersForProtocolAndShard(
waku.libp2p.getConnections(),
waku.libp2p.peerStore,
waku.libp2p.getProtocols(),
shardInfo2
);
expect(peers.length).to.be.equal(1);
});

it("different cluster, different shard: nodes don't connect (autosharding)", async function () {
this.timeout(15000);

const shardInfo1: ContentTopicInfo = {
clusterId: 1,
contentTopics: [contentTopic]
};

const shardInfo2: ContentTopicInfo = {
clusterId: 2,
contentTopics: ["/test/5/waku-light-push/utf8"]
};

// we start one node in a separate cluster
await serviceNode1.start({
discv5Discovery: true,
peerExchange: true,
clusterId: shardInfo1.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo1),
lightpush: true,
relay: true
});

// and another node in the same cluster cluster as our node
Expand All @@ -180,7 +380,8 @@ describe("getConnectedPeersForProtocolAndShard", function () {
peerExchange: true,
clusterId: shardInfo2.clusterId,
pubsubTopic: shardInfoToPubsubTopics(shardInfo2),
lightpush: true
lightpush: true,
relay: true
});

const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId();
Expand Down
16 changes: 6 additions & 10 deletions packages/tests/tests/light-push/multiple_pubsub.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,8 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () {
filter: true,
lightpush: true,
relay: true,
pubsubTopic: [autoshardingPubsubTopic2]
pubsubTopic: [autoshardingPubsubTopic2],
clusterId: shardInfo.clusterId
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);
await waku.dial(await nwaku2.getMultiaddrWithId());
Expand Down Expand Up @@ -353,10 +354,6 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()
customContentTopic2,
clusterId
);
const contentTopicInfo: ContentTopicInfo = {
clusterId,
contentTopics: [customContentTopic1, customContentTopic2]
};
const customEncoder1 = createEncoder({
contentTopic: customContentTopic1,
pubsubTopicShardInfo: {
Expand All @@ -372,11 +369,10 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function ()

this.beforeEach(async function () {
this.timeout(15000);
[nwaku, waku] = await runNodes(
this,
[autoshardingPubsubTopic1, autoshardingPubsubTopic2],
contentTopicInfo
);
[nwaku, waku] = await runNodes(this, [
autoshardingPubsubTopic1,
autoshardingPubsubTopic2
]);
messageCollector = new MessageCollector(nwaku);
nimPeerId = await nwaku.getPeerId();
});
Expand Down
6 changes: 4 additions & 2 deletions packages/tests/tests/store/multiple_pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
await nwaku.start({
store: true,
pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2],
relay: true
relay: true,
clusterId
});
await nwaku.ensureSubscriptionsAutosharding([
customContentTopic1,
Expand Down Expand Up @@ -287,7 +288,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () {
await nwaku2.start({
store: true,
pubsubTopic: [autoshardingPubsubTopic2],
relay: true
relay: true,
clusterId
});
await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]);

Expand Down
6 changes: 0 additions & 6 deletions packages/utils/src/libp2p/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,6 @@ export async function getConnectedPeersForProtocolAndShard(

if (supportsProtocol) {
if (shardInfo) {
//TODO: support auto-sharding
if (!("shards" in shardInfo)) {
throw new Error(
`Connections Manager only supports static sharding for now. Autosharding is not supported.`
);
}
const encodedPeerShardInfo = peer.metadata.get("shardInfo");
const peerShardInfo =
encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo);
Expand Down
Loading