Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update lightclient examples to use waku prod fleet and readme #3237

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 80 additions & 53 deletions examples/filter_subscriber.nim
Original file line number Diff line number Diff line change
@@ -1,30 +1,39 @@
## Example showing how a resource restricted client may
## subscribe to messages without relay
import
std/[tables, sequtils],
stew/byteutils,
stew/shims/net,
chronicles,
chronos,
confutils,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr

import chronicles, chronos, stew/byteutils, results
import waku/[common/logging, node/peer_manager, waku_core, waku_filter_v2/client]
import
waku/[
common/logging,
node/peer_manager,
waku_core,
waku_node,
waku_enr,
discovery/waku_discv5,
factory/builder,
waku_relay,
waku_filter_v2/client,
]

# careful if running pub and sub in the same machine
const wakuPort = 50000

const clusterId = 1
const shardId = @[0'u16]

const
FilterPeer =
"/ip4/34.16.1.67/tcp/30303/p2p/16Uiu2HAmDCp8XJ9z1ev18zuv8NHekAsjNyezAvmMfFEJkiharitG"
# node-01.gc-us-central1-a.waku.test.status.im on waku.test
FilterPubsubTopic = PubsubTopic("/waku/2/rs/0/0")
"/ip4/64.225.80.192/tcp/30303/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb"
FilterPubsubTopic = PubsubTopic("/waku/2/rs/1/0")
FilterContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")

proc unsubscribe(
wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic,
) {.async.} =
notice "unsubscribing from filter"
let unsubscribeRes =
await wfc.unsubscribe(filterPeer, filterPubsubTopic, @[filterContentTopic])
if unsubscribeRes.isErr:
notice "unsubscribe request failed", err = unsubscribeRes.error
else:
notice "unsubscribe request successful"

proc messagePushHandler(
pubsubTopic: PubsubTopic, message: WakuMessage
) {.async, gcsafe.} =
Expand All @@ -35,22 +44,61 @@ proc messagePushHandler(
contentTopic = message.contentTopic,
timestamp = message.timestamp

proc maintainSubscription(
wfc: WakuFilterClient,
filterPeer: RemotePeerInfo,
filterPubsubTopic: PubsubTopic,
filterContentTopic: ContentTopic,
) {.async.} =
proc setupAndSubscribe(rng: ref HmacDrbgContext) {.async.} =
# use notice to filter all waku messaging
setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT)

notice "starting subscriber", wakuPort = wakuPort
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[]
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(relay = true)

let relayShards = RelayShards.init(clusterId, shardId).valueOr:
error "Relay shards initialization failed", error = error
quit(QuitFailure)

var enrBuilder = EnrBuilder.init(nodeKey)
enrBuilder.withWakuRelaySharding(relayShards).expect(
"Building ENR with relay sharding failed"
)

let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
error "failed to create enr record", error = recordRes.error
quit(QuitFailure)
else:
recordRes.get()

var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodeKey)
builder.withRecord(record)
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
let node = builder.build().tryGet()

node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
waitFor node.mountFilterClient()

await node.start()

node.peerManager.start()

node.wakuFilterClient.registerPushHandler(messagePushHandler)

let filterPeer = parsePeerInfo(FilterPeer).get()

while true:
notice "maintaining subscription"
# First use filter-ping to check if we have an active subscription
let pingRes = await wfc.ping(filterPeer)
let pingRes = await node.wakuFilterClient.ping(filterPeer)
if pingRes.isErr():
# No subscription found. Let's subscribe.
notice "no subscription found. Sending subscribe request"

let subscribeRes =
await wfc.subscribe(filterPeer, filterPubsubTopic, @[filterContentTopic])
let subscribeRes = await node.wakuFilterClient.subscribe(
filterPeer, FilterPubsubTopic, @[FilterContentTopic]
)

if subscribeRes.isErr():
notice "subscribe request failed. Quitting.", err = subscribeRes.error
Expand All @@ -62,28 +110,7 @@ proc maintainSubscription(

await sleepAsync(60.seconds) # Subscription maintenance interval

proc setupAndSubscribe(rng: ref HmacDrbgContext) =
let filterPeer = parsePeerInfo(FilterPeer).get()

setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT)
notice "starting filter subscriber"

var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wfc = WakuFilterClient.new(pm, rng)

# Mount filter client protocol
switch.mount(wfc)

wfc.registerPushHandler(messagePushHandler)

# Start maintaining subscription
asyncSpawn maintainSubscription(
wfc, filterPeer, FilterPubsubTopic, FilterContentTopic
)

when isMainModule:
let rng = newRng()
setupAndSubscribe(rng)
let rng = crypto.newRng()
asyncSpawn setupAndSubscribe(rng)
runForever()
128 changes: 89 additions & 39 deletions examples/lightpush_publisher.nim
Original file line number Diff line number Diff line change
@@ -1,57 +1,107 @@
## Example showing how a resource restricted client may
## use lightpush to publish messages without relay
import
std/[tables, times, sequtils],
stew/byteutils,
stew/shims/net,
chronicles,
results,
chronos,
confutils,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr

import chronicles, chronos, stew/byteutils, results
import waku/[common/logging, node/peer_manager, waku_core, waku_lightpush/client]
import
waku/[
common/logging,
node/peer_manager,
waku_core,
waku_node,
waku_enr,
discovery/waku_discv5,
factory/builder,
]

proc now*(): Timestamp =
getNanosecondTime(getTime().toUnixFloat())

# careful if running pub and sub in the same machine
const wakuPort = 60000

const clusterId = 1
const shardId = @[0'u16]

const
LightpushPeer =
"/ip4/178.128.141.171/tcp/30303/p2p/16Uiu2HAkykgaECHswi3YKJ5dMLbq2kPVCo89fcyTd38UcQD6ej5W"
# node-01.do-ams3.waku.test.status.im on waku.test
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/0/0")
"/ip4/64.225.80.192/tcp/30303/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb"
LightpushPubsubTopic = PubsubTopic("/waku/2/rs/1/0")
LightpushContentTopic = ContentTopic("/examples/1/light-pubsub-example/proto")

proc publishMessages(
wlc: WakuLightpushClient,
lightpushPeer: RemotePeerInfo,
lightpushPubsubTopic: PubsubTopic,
lightpushContentTopic: ContentTopic,
) {.async.} =
proc setupAndPublish(rng: ref HmacDrbgContext) {.async.} =
# use notice to filter all waku messaging
setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT)

notice "starting publisher", wakuPort = wakuPort
let
nodeKey = crypto.PrivateKey.random(Secp256k1, rng[]).get()
ip = parseIpAddress("0.0.0.0")
flags = CapabilitiesBitfield.init(relay = true)

let relayShards = RelayShards.init(clusterId, shardId).valueOr:
error "Relay shards initialization failed", error = error
quit(QuitFailure)

var enrBuilder = EnrBuilder.init(nodeKey)
enrBuilder.withWakuRelaySharding(relayShards).expect(
"Building ENR with relay sharding failed"
)

let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
error "failed to create enr record", error = recordRes.error
quit(QuitFailure)
else:
recordRes.get()

var builder = WakuNodeBuilder.init()
builder.withNodeKey(nodeKey)
builder.withRecord(record)
builder.withNetworkConfigurationDetails(ip, Port(wakuPort)).tryGet()
let node = builder.build().tryGet()

node.mountMetadata(clusterId).expect("failed to mount waku metadata protocol")
node.mountLightPushClient()

await node.start()
node.peerManager.start()

notice "publisher service started"
while true:
let text = "hi there i'm a lightpush publisher"
let text = "hi there i'm a publisher"
let message = WakuMessage(
payload: toBytes(text), # content of the message
contentTopic: lightpushContentTopic, # content topic to publish to
contentTopic: LightpushContentTopic, # content topic to publish to
ephemeral: true, # tell store nodes to not store it
timestamp: getNowInNanosecondTime(),
timestamp: now(),
) # current timestamp

let wlpRes = await wlc.publish(lightpushPubsubTopic, message, lightpushPeer)

if wlpRes.isOk():
notice "published message using lightpush", message = message
else:
notice "failed to publish message using lightpush", err = wlpRes.error()

await sleepAsync(5000) # Publish every 5 seconds

proc setupAndPublish(rng: ref HmacDrbgContext) =
let lightpushPeer = parsePeerInfo(LightpushPeer).get()
let lightpushPeer = parsePeerInfo(LightpushPeer).get()

setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT)
notice "starting lightpush publisher"
let res =
await node.lightpushPublish(some(LightpushPubsubTopic), message, lightpushPeer)

var
switch = newStandardSwitch()
pm = PeerManager.new(switch)
wlc = WakuLightpushClient.new(pm, rng)
if res.isOk:
notice "published message",
text = text,
timestamp = message.timestamp,
psTopic = LightpushPubsubTopic,
contentTopic = LightpushContentTopic
else:
error "failed to publish message", error = res.error

# Start maintaining subscription
asyncSpawn publishMessages(
wlc, lightpushPeer, LightpushPubsubTopic, LightpushContentTopic
)
await sleepAsync(5000)

when isMainModule:
let rng = newRng()
setupAndPublish(rng)
let rng = crypto.newRng()
asyncSpawn setupAndPublish(rng)
runForever()