Skip to content

Commit fe4ff79

Browse files
feat: message prioritization with immediate peer-published dispatch and queuing for other msgs (#1015)
1 parent aa4ebb0 commit fe4ff79

File tree

7 files changed

+196
-71
lines changed

7 files changed

+196
-71
lines changed

libp2p/protocols/pubsub/floodsub.nim

+2-2
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ method rpcHandler*(f: FloodSub,
157157

158158
# In theory, if topics are the same in all messages, we could batch - we'd
159159
# also have to be careful to only include validated messages
160-
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
160+
f.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
161161
trace "Forwared message to peers", peers = toSendPeers.len
162162

163163
f.updateMetrics(rpcMsg)
@@ -219,7 +219,7 @@ method publish*(f: FloodSub,
219219
return 0
220220

221221
# Try to send to all peers that are known to be interested
222-
f.broadcast(peers, RPCMsg(messages: @[msg]))
222+
f.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
223223

224224
when defined(libp2p_expensive_metrics):
225225
libp2p_pubsub_messages_published.inc(labelValues = [topic])

libp2p/protocols/pubsub/gossipsub.nim

+31-20
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
220220
for topic, info in stats[].topicInfos.mpairs:
221221
info.firstMessageDeliveries = 0
222222

223+
pubSubPeer.stopSendNonPriorityTask()
224+
223225
procCall FloodSub(g).unsubscribePeer(peer)
224226

225227
proc handleSubscribe*(g: GossipSub,
@@ -279,31 +281,40 @@ proc handleControl(g: GossipSub, peer: PubSubPeer, control: ControlMessage) =
279281
respControl.prune.add(g.handleGraft(peer, control.graft))
280282
let messages = g.handleIWant(peer, control.iwant)
281283

282-
if
283-
respControl.prune.len > 0 or
284-
respControl.iwant.len > 0 or
285-
messages.len > 0:
286-
# iwant and prunes from here, also messages
284+
let
285+
isPruneNotEmpty = respControl.prune.len > 0
286+
isIWantNotEmpty = respControl.iwant.len > 0
287+
288+
if isPruneNotEmpty or isIWantNotEmpty:
289+
290+
if isIWantNotEmpty:
291+
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
292+
293+
if isPruneNotEmpty:
294+
for prune in respControl.prune:
295+
if g.knownTopics.contains(prune.topicId):
296+
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
297+
else:
298+
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
299+
300+
trace "sending control message", msg = shortLog(respControl), peer
301+
g.send(
302+
peer,
303+
RPCMsg(control: some(respControl)), isHighPriority = true)
287304

305+
if messages.len > 0:
288306
for smsg in messages:
289307
for topic in smsg.topicIds:
290308
if g.knownTopics.contains(topic):
291309
libp2p_pubsub_broadcast_messages.inc(labelValues = [topic])
292310
else:
293311
libp2p_pubsub_broadcast_messages.inc(labelValues = ["generic"])
294312

295-
libp2p_pubsub_broadcast_iwant.inc(respControl.iwant.len.int64)
296-
297-
for prune in respControl.prune:
298-
if g.knownTopics.contains(prune.topicId):
299-
libp2p_pubsub_broadcast_prune.inc(labelValues = [prune.topicId])
300-
else:
301-
libp2p_pubsub_broadcast_prune.inc(labelValues = ["generic"])
302-
303-
trace "sending control message", msg = shortLog(respControl), peer
313+
# iwant replies have lower priority
314+
trace "sending iwant reply messages", peer
304315
g.send(
305316
peer,
306-
RPCMsg(control: some(respControl), messages: messages))
317+
RPCMsg(messages: messages), isHighPriority = false)
307318

308319
proc validateAndRelay(g: GossipSub,
309320
msg: Message,
@@ -356,7 +367,7 @@ proc validateAndRelay(g: GossipSub,
356367
if msg.data.len > msgId.len * 10:
357368
g.broadcast(toSendPeers, RPCMsg(control: some(ControlMessage(
358369
idontwant: @[ControlIWant(messageIds: @[msgId])]
359-
))))
370+
))), isHighPriority = true)
360371

361372
for peer in toSendPeers:
362373
for heDontWant in peer.heDontWants:
@@ -370,7 +381,7 @@ proc validateAndRelay(g: GossipSub,
370381

371382
# In theory, if topics are the same in all messages, we could batch - we'd
372383
# also have to be careful to only include validated messages
373-
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]))
384+
g.broadcast(toSendPeers, RPCMsg(messages: @[msg]), isHighPriority = false)
374385
trace "forwarded message to peers", peers = toSendPeers.len, msgId, peer
375386
for topic in msg.topicIds:
376387
if topic notin g.topics: continue
@@ -441,7 +452,7 @@ method rpcHandler*(g: GossipSub,
441452
peer.recvObservers(rpcMsg)
442453

443454
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
444-
g.send(peer, RPCMsg(pong: rpcMsg.ping))
455+
g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true)
445456
peer.pingBudget.dec
446457
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
447458
template sub: untyped = rpcMsg.subscriptions[i]
@@ -551,7 +562,7 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
551562
topicID: topic,
552563
peers: g.peerExchangeList(topic),
553564
backoff: g.parameters.unsubscribeBackoff.seconds.uint64)])))
554-
g.broadcast(mpeers, msg)
565+
g.broadcast(mpeers, msg, isHighPriority = true)
555566

556567
for peer in mpeers:
557568
g.pruned(peer, topic, backoff = some(g.parameters.unsubscribeBackoff))
@@ -655,7 +666,7 @@ method publish*(g: GossipSub,
655666

656667
g.mcache.put(msgId, msg)
657668

658-
g.broadcast(peers, RPCMsg(messages: @[msg]))
669+
g.broadcast(peers, RPCMsg(messages: @[msg]), isHighPriority = true)
659670

660671
if g.knownTopics.contains(topic):
661672
libp2p_pubsub_messages_published.inc(peers.len.int64, labelValues = [topic])

libp2p/protocols/pubsub/gossipsub/behavior.nim

+4-4
Original file line numberDiff line numberDiff line change
@@ -530,14 +530,14 @@ proc rebalanceMesh*(g: GossipSub, topic: string, metrics: ptr MeshMetrics = nil)
530530
# Send changes to peers after table updates to avoid stale state
531531
if grafts.len > 0:
532532
let graft = RPCMsg(control: some(ControlMessage(graft: @[ControlGraft(topicID: topic)])))
533-
g.broadcast(grafts, graft)
533+
g.broadcast(grafts, graft, isHighPriority = true)
534534
if prunes.len > 0:
535535
let prune = RPCMsg(control: some(ControlMessage(
536536
prune: @[ControlPrune(
537537
topicID: topic,
538538
peers: g.peerExchangeList(topic),
539539
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
540-
g.broadcast(prunes, prune)
540+
g.broadcast(prunes, prune, isHighPriority = true)
541541

542542
proc dropFanoutPeers*(g: GossipSub) {.raises: [].} =
543543
# drop peers that we haven't published to in
@@ -669,7 +669,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
669669
topicID: t,
670670
peers: g.peerExchangeList(t),
671671
backoff: g.parameters.pruneBackoff.seconds.uint64)])))
672-
g.broadcast(prunes, prune)
672+
g.broadcast(prunes, prune, isHighPriority = true)
673673

674674
# pass by ptr in order to both signal we want to update metrics
675675
# and as well update the struct for each topic during this iteration
@@ -691,7 +691,7 @@ proc onHeartbeat(g: GossipSub) {.raises: [].} =
691691
libp2p_pubsub_broadcast_ihave.inc(labelValues = [ihave.topicId])
692692
else:
693693
libp2p_pubsub_broadcast_ihave.inc(labelValues = ["generic"])
694-
g.send(peer, RPCMsg(control: some(control)))
694+
g.send(peer, RPCMsg(control: some(control)), isHighPriority = true)
695695

696696
g.mcache.shift() # shift the cache
697697

libp2p/protocols/pubsub/pubsub.nim

+24-8
Original file line numberDiff line numberDiff line change
@@ -138,18 +138,34 @@ method unsubscribePeer*(p: PubSub, peerId: PeerId) {.base, gcsafe.} =
138138

139139
libp2p_pubsub_peers.set(p.peers.len.int64)
140140

141-
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg) {.raises: [].} =
142-
## Attempt to send `msg` to remote peer
141+
proc send*(p: PubSub, peer: PubSubPeer, msg: RPCMsg, isHighPriority: bool) {.raises: [].} =
142+
## This procedure attempts to send a `msg` (of type `RPCMsg`) to the specified remote peer in the PubSub network.
143143
##
144+
## Parameters:
145+
## - `p`: The `PubSub` instance.
146+
## - `peer`: An instance of `PubSubPeer` representing the peer to whom the message should be sent.
147+
## - `msg`: The `RPCMsg` instance that contains the message to be sent.
148+
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
149+
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
150+
## priority messages have been sent.
144151

145152
trace "sending pubsub message to peer", peer, msg = shortLog(msg)
146-
peer.send(msg, p.anonymize)
153+
asyncSpawn peer.send(msg, p.anonymize, isHighPriority)
147154

148155
proc broadcast*(
149156
p: PubSub,
150157
sendPeers: auto, # Iteratble[PubSubPeer]
151-
msg: RPCMsg) {.raises: [].} =
152-
## Attempt to send `msg` to the given peers
158+
msg: RPCMsg,
159+
isHighPriority: bool) {.raises: [].} =
160+
## This procedure attempts to send a `msg` (of type `RPCMsg`) to a specified group of peers in the PubSub network.
161+
##
162+
## Parameters:
163+
## - `p`: The `PubSub` instance.
164+
## - `sendPeers`: An iterable of `PubSubPeer` instances representing the peers to whom the message should be sent.
165+
## - `msg`: The `RPCMsg` instance that contains the message to be broadcast.
166+
## - `isHighPriority`: A boolean indicating whether the message should be treated as high priority.
167+
## High priority messages are sent immediately, while low priority messages are queued and sent only after all high
168+
## priority messages have been sent.
153169

154170
let npeers = sendPeers.len.int64
155171
for sub in msg.subscriptions:
@@ -195,19 +211,19 @@ proc broadcast*(
195211

196212
if anyIt(sendPeers, it.hasObservers):
197213
for peer in sendPeers:
198-
p.send(peer, msg)
214+
p.send(peer, msg, isHighPriority)
199215
else:
200216
# Fast path that only encodes message once
201217
let encoded = encodeRpcMsg(msg, p.anonymize)
202218
for peer in sendPeers:
203-
asyncSpawn peer.sendEncoded(encoded)
219+
asyncSpawn peer.sendEncoded(encoded, isHighPriority)
204220

205221
proc sendSubs*(p: PubSub,
206222
peer: PubSubPeer,
207223
topics: openArray[string],
208224
subscribe: bool) =
209225
## send subscriptions to remote peer
210-
p.send(peer, RPCMsg.withSubs(topics, subscribe))
226+
p.send(peer, RPCMsg.withSubs(topics, subscribe), isHighPriority = true)
211227

212228
for topic in topics:
213229
if subscribe:

0 commit comments

Comments
 (0)