Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: investigate interop test failures #1792

Merged
merged 10 commits into from
Jan 18, 2024
4 changes: 4 additions & 0 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ class Filter extends BaseProtocol implements IReceiver {
})
)[0];

if (!peer) {
throw new Error("No peer found to initiate subscription.");
}

const subscription =
this.getActiveSubscription(pubsubTopic, peer.id.toString()) ??
this.setActiveSubscription(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,10 +386,14 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () {

this.beforeEach(async function () {
this.timeout(15000);
[nwaku, waku] = await runNodes(this, [
customPubsubTopic1,
customPubsubTopic2
]);
[nwaku, waku] = await runNodes(
this,
[customPubsubTopic1, customPubsubTopic2],
{
clusterId: 3,
shards: [1, 2]
}
);
subscription = await waku.filter.createSubscription(customPubsubTopic1);
messageCollector = new MessageCollector();
});
Expand Down
22 changes: 16 additions & 6 deletions packages/tests/tests/filter/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,22 @@ export async function runNodes(
log.error("jswaku node failed to start:", error);
}

if (waku) {
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
await nwaku.ensureSubscriptions(pubsubTopics);
return [nwaku, waku];
} else {
if (!waku) {
throw new Error("Failed to initialize waku");
}

await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
await nwaku.ensureSubscriptions(pubsubTopics);

const wakuConnections = waku.libp2p.getConnections();
const nwakuPeers = await nwaku.peers();

if (wakuConnections.length < 1 || nwakuPeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and nwaku: ${nwakuPeers.length}`
);
}

return [nwaku, waku];
}
30 changes: 12 additions & 18 deletions packages/tests/tests/store/multiple_pubsub.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,13 +342,22 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
nwaku = new ServiceNode(makeLogFileName(this));
await nwaku.start({
store: true,
pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2],
relay: true
relay: true,
pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2]
});
await nwaku.ensureSubscriptions([
customShardedPubsubTopic1,
customShardedPubsubTopic2
]);

waku = await startAndConnectLightNode(
nwaku,
[customShardedPubsubTopic1, customShardedPubsubTopic2],
{
clusterId: 3,
shards: [1, 2]
}
);
});

afterEach(async function () {
Expand All @@ -363,10 +372,7 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
customContentTopic1,
customShardedPubsubTopic1
);
waku = await startAndConnectLightNode(nwaku, [
customShardedPubsubTopic1,
customShardedPubsubTopic2
]);

const messages = await processQueriedMessages(
waku,
[customDecoder1],
Expand Down Expand Up @@ -397,11 +403,6 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
customShardedPubsubTopic2
);

waku = await startAndConnectLightNode(nwaku, [
customShardedPubsubTopic1,
customShardedPubsubTopic2
]);

const customMessages = await processQueriedMessages(
waku,
[customDecoder1],
Expand Down Expand Up @@ -451,13 +452,6 @@ describe("Waku Store (named sharding), custom pubsub topic", function () {
customShardedPubsubTopic2
);

waku = await createLightNode({
staticNoiseKey: NOISE_KEY_1,
pubsubTopics: [customShardedPubsubTopic1, customShardedPubsubTopic2]
});
await waku.start();

await waku.dial(await nwaku.getMultiaddrWithId());
await waku.dial(await nwaku2.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);

Expand Down
17 changes: 14 additions & 3 deletions packages/tests/tests/store/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,27 @@ export async function startAndConnectLightNode(
shardInfo?: ShardingParams
): Promise<LightNode> {
const waku = await createLightNode({
pubsubTopics: shardInfo ? undefined : pubsubTopics,
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
...((pubsubTopics.length !== 1 ||
pubsubTopics[0] !== DefaultPubsubTopic) && {
shardInfo: shardInfo
}),
pubsubTopics: shardInfo ? undefined : pubsubTopics,
staticNoiseKey: NOISE_KEY_1
})
});
await waku.start();
await waku.dial(await instance.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);

const wakuConnections = waku.libp2p.getConnections();
const nwakuPeers = await instance.peers();

if (wakuConnections.length < 1 || nwakuPeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and nwaku: ${nwakuPeers.length}`
);
}

log.info("Waku node created");
return waku;
}
Expand Down
Loading