Skip to content

Commit

Permalink
reintroduce and deprecate named sharding
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Dec 8, 2023
1 parent 5a4774a commit 2ab556e
Show file tree
Hide file tree
Showing 20 changed files with 862 additions and 121 deletions.
14 changes: 9 additions & 5 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ class Filter extends BaseProtocol implements IReceiver {
constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodecs.SUBSCRIBE, libp2p.components);

this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics =
options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo);

libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => {
log.error("Failed to register ", FilterCodecs.PUSH, e);
Expand All @@ -291,11 +292,14 @@ class Filter extends BaseProtocol implements IReceiver {
}

async createSubscription(
pubsubTopicShardInfo?: SingleShardInfo
pubsubTopicShardInfo?: SingleShardInfo | string
): Promise<Subscription> {
const pubsubTopic = pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic;
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic;

ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics);

Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class LightPush extends BaseProtocol implements ILightPush {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(LightPushCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics =
options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo);
}

private async preparePushMessage(
Expand Down
31 changes: 20 additions & 11 deletions packages/core/src/lib/message/version_0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,25 @@ export class Encoder implements IEncoder {
* messages.
*/
export function createEncoder(
{ pubsubTopicShardInfo, contentTopic, ephemeral, metaSetter }: EncoderOptions,
{
pubsubTopic,
pubsubTopicShardInfo,
contentTopic,
ephemeral,
metaSetter
}: EncoderOptions,
autosharding = false,
clusterId = 0
): Encoder {
return new Encoder(
contentTopic,
ephemeral,
autosharding
? contentTopicToPubsubTopic(contentTopic, clusterId)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
pubsubTopic ??
(autosharding
? contentTopicToPubsubTopic(contentTopic, clusterId)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic),
metaSetter
);
}
Expand Down Expand Up @@ -195,16 +202,18 @@ export class Decoder implements IDecoder<DecodedMessage> {
*/
export function createDecoder(
contentTopic: string,
pubsubTopic?: PubsubTopic,
pubsubTopicShardInfo?: SingleShardInfo,
autosharding = false,
clusterId = 0
): Decoder {
return new Decoder(
autosharding
? contentTopicToPubsubTopic(contentTopic, clusterId)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
pubsubTopic ??
(autosharding
? contentTopicToPubsubTopic(contentTopic, clusterId)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic),
contentTopic
);
}
3 changes: 2 additions & 1 deletion packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,8 @@ class Store extends BaseProtocol implements IStore {

constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(StoreCodec, libp2p.components);
this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo);
this.pubsubTopics =
options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo);
}

/**
Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/lib/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ export class WakuNode implements Waku {

constructor(
options: WakuOptions,
pubsubTopics: PubsubTopic[] = [],
libp2p: Libp2p,
pubsubShardInfo?: ShardInfo,
store?: (libp2p: Libp2p) => IStore,
Expand All @@ -64,7 +65,8 @@ export class WakuNode implements Waku {
relay?: (libp2p: Libp2p) => IRelay
) {
if (!pubsubShardInfo) {
this.pubsubTopics = [DefaultPubsubTopic];
this.pubsubTopics =
pubsubTopics.length > 0 ? pubsubTopics : [DefaultPubsubTopic];
} else {
this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo);
}
Expand Down
6 changes: 6 additions & 0 deletions packages/interfaces/src/enr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ export interface Waku2 {
export interface ShardInfo {
clusterId: number;
shards: number[];
/**
* @deprecated
*/
pubsubTopics?: string[];
contentTopics?: string[];
application?: string;
version?: string;
}

export interface IEnr extends Map<ENRKey, ENRValue> {
Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ export interface IFilterSubscription {
export type IFilter = IReceiver &
IBaseProtocol & {
createSubscription(
pubsubTopicShardInfo?: SingleShardInfo,
pubsubTopicShardInfo?: SingleShardInfo | string,
peerId?: PeerId
): Promise<IFilterSubscription>;
};
4 changes: 4 additions & 0 deletions packages/interfaces/src/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ export interface IMetaSetter {
}

export interface EncoderOptions {
/**
* @deprecated
*/
pubsubTopic?: PubsubTopic;
pubsubTopicShardInfo?: SingleShardInfo;
/** The content topic to set on outgoing messages. */
contentTopic: string;
Expand Down
6 changes: 6 additions & 0 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { ShardInfo } from "./enr.js";
import type { CreateLibp2pOptions } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";
import { PubsubTopic } from "./misc.js";

export enum Protocols {
Relay = "relay",
Expand All @@ -22,6 +23,11 @@ export interface IBaseProtocol {
}

export type ProtocolCreateOptions = {
/**
* @deprecated
* Waku will stop supporting named sharding. Only static sharding and autosharding will be supported moving forward.
*/
pubsubTopics?: PubsubTopic[];
/**
* Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future.
* The format to specify a shard is:
Expand Down
22 changes: 15 additions & 7 deletions packages/message-encryption/src/ecies.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ class Encoder implements IEncoder {
}

export interface EncoderOptions extends BaseEncoderOptions {
/**
* @deprecated
*/
pubsubTopic?: PubsubTopic;
/** Indicates if autosharding is enabled */
autosharding?: boolean;
/** The public key to encrypt the payload for. */
Expand All @@ -106,6 +110,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
*/
export function createEncoder({
autosharding = false,
pubsubTopic,
pubsubTopicShardInfo,
contentTopic,
publicKey,
Expand All @@ -114,11 +119,12 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
autosharding
? contentTopicToPubsubTopic(contentTopic)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
pubsubTopic ??
(autosharding
? contentTopicToPubsubTopic(contentTopic)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic),
contentTopic,
publicKey,
sigPrivKey,
Expand Down Expand Up @@ -206,11 +212,13 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
privateKey: Uint8Array,
pubsubTopicShardInfo?: SingleShardInfo,
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic,
autosharding = false
): Decoder {
return new Decoder(
autosharding
typeof pubsubTopicShardInfo == "string"
? DefaultPubsubTopic
: autosharding
? contentTopicToPubsubTopic(contentTopic)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
Expand Down
18 changes: 11 additions & 7 deletions packages/message-encryption/src/symmetric.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ export interface EncoderOptions extends BaseEncoderOptions {
* in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/).
*/
export function createEncoder({
pubsubTopic = DefaultPubsubTopic,
autosharding = false,
pubsubTopicShardInfo,
contentTopic,
Expand All @@ -114,11 +115,12 @@ export function createEncoder({
metaSetter
}: EncoderOptions): Encoder {
return new Encoder(
autosharding
? contentTopicToPubsubTopic(contentTopic)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic,
pubsubTopic ??
(autosharding
? contentTopicToPubsubTopic(contentTopic)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
: DefaultPubsubTopic),
contentTopic,
symKey,
sigPrivKey,
Expand Down Expand Up @@ -206,11 +208,13 @@ class Decoder extends DecoderV0 implements IDecoder<DecodedMessage> {
export function createDecoder(
contentTopic: string,
symKey: Uint8Array,
pubsubTopicShardInfo?: SingleShardInfo,
pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic,
autosharding = false
): Decoder {
return new Decoder(
autosharding
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: autosharding
? contentTopicToPubsubTopic(contentTopic)
: pubsubTopicShardInfo
? singleShardInfoToPubsubTopic(pubsubTopicShardInfo)
Expand Down
8 changes: 5 additions & 3 deletions packages/relay/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,11 @@ class Relay implements IRelay {
}

this.gossipSub = libp2p.services.pubsub as GossipSub;
this.pubsubTopics = options?.shardInfo
? new Set(shardInfoToPubsubTopics(options.shardInfo))
: new Set([DefaultPubsubTopic]);
this.pubsubTopics = new Set(
options?.shardInfo
? shardInfoToPubsubTopics(options.shardInfo)
: options?.pubsubTopics ?? [DefaultPubsubTopic]
);

if (this.gossipSub.isStarted()) {
this.subscribeToAllTopics();
Expand Down
Loading

0 comments on commit 2ab556e

Please sign in to comment.