Skip to content

Commit 0911cb2

Browse files
authored
chore(gossipsub): cleanups (#1096)
1 parent 3ca49a2 commit 0911cb2

File tree

4 files changed

+50
-50
lines changed

4 files changed

+50
-50
lines changed

libp2p/protocols/pubsub/floodsub.nim

+5-6
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ proc addSeen*(f: FloodSub, saltedId: SaltedId): bool =
5757
proc firstSeen*(f: FloodSub, saltedId: SaltedId): Moment =
5858
f.seen.addedAt(saltedId)
5959

60-
proc handleSubscribe*(f: FloodSub,
61-
peer: PubSubPeer,
62-
topic: string,
63-
subscribe: bool) =
60+
proc handleSubscribe(f: FloodSub,
61+
peer: PubSubPeer,
62+
topic: string,
63+
subscribe: bool) =
6464
logScope:
6565
peer
6666
topic
@@ -106,10 +106,9 @@ method unsubscribePeer*(f: FloodSub, peer: PeerId) =
106106
method rpcHandler*(f: FloodSub,
107107
peer: PubSubPeer,
108108
data: seq[byte]) {.async.} =
109-
110109
var rpcMsg = decodeRpcMsg(data).valueOr:
111110
debug "failed to decode msg from peer", peer, err = error
112-
raise newException(CatchableError, "")
111+
raise newException(CatchableError, "Peer msg couldn't be decoded")
113112

114113
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
115114
# trigger hooks

libp2p/protocols/pubsub/gossipsub.nim

+41-37
Original file line numberDiff line numberDiff line change
@@ -266,10 +266,10 @@ method unsubscribePeer*(g: GossipSub, peer: PeerId) =
266266

267267
procCall FloodSub(g).unsubscribePeer(peer)
268268

269-
proc handleSubscribe*(g: GossipSub,
270-
peer: PubSubPeer,
271-
topic: string,
272-
subscribe: bool) =
269+
proc handleSubscribe(g: GossipSub,
270+
peer: PubSubPeer,
271+
topic: string,
272+
subscribe: bool) =
273273
logScope:
274274
peer
275275
topic
@@ -395,9 +395,7 @@ proc validateAndRelay(g: GossipSub,
395395

396396
g.floodsub.withValue(topic, peers): toSendPeers.incl(peers[])
397397
g.mesh.withValue(topic, peers): toSendPeers.incl(peers[])
398-
399-
# add direct peers
400-
toSendPeers.incl(g.subscribedDirectPeers.getOrDefault(topic))
398+
g.subscribedDirectPeers.withValue(topic, peers): toSendPeers.incl(peers[])
401399

402400
# Don't send it to source peer, or peers that
403401
# sent it during validation
@@ -468,6 +466,11 @@ method rpcHandler*(g: GossipSub,
468466
var rpcMsg = decodeRpcMsg(data).valueOr:
469467
debug "failed to decode msg from peer", peer, err = error
470468
await rateLimit(g, peer, msgSize)
469+
# Raising in the handler closes the gossipsub connection (but doesn't
470+
# disconnect the peer!)
471+
# TODO evaluate behaviour penalty values
472+
peer.behaviourPenalty += 0.1
473+
471474
raise newException(CatchableError, "Peer msg couldn't be decoded")
472475

473476
when defined(libp2p_expensive_metrics):
@@ -477,12 +480,13 @@ method rpcHandler*(g: GossipSub,
477480
trace "decoded msg from peer", peer, msg = rpcMsg.shortLog
478481
await rateLimit(g, peer, g.messageOverhead(rpcMsg, msgSize))
479482

480-
# trigger hooks
483+
# trigger hooks - these may modify the message
481484
peer.recvObservers(rpcMsg)
482485

483486
if rpcMsg.ping.len in 1..<64 and peer.pingBudget > 0:
484487
g.send(peer, RPCMsg(pong: rpcMsg.ping), isHighPriority = true)
485488
peer.pingBudget.dec
489+
486490
for i in 0..<min(g.topicsHigh, rpcMsg.subscriptions.len):
487491
template sub: untyped = rpcMsg.subscriptions[i]
488492
g.handleSubscribe(peer, sub.topic, sub.subscribe)
@@ -502,16 +506,14 @@ method rpcHandler*(g: GossipSub,
502506
if msgIdResult.isErr:
503507
debug "Dropping message due to failed message id generation",
504508
error = msgIdResult.error
505-
# TODO: descore peers due to error during message validation (malicious?)
509+
await g.punishInvalidMessage(peer, msg)
506510
continue
507511

508512
let
509513
msgId = msgIdResult.get
510514
msgIdSalted = g.salt(msgId)
511515
topic = msg.topic
512516

513-
# addSeen adds salt to msgId to avoid
514-
# remote attacking the hash function
515517
if g.addSeen(msgIdSalted):
516518
trace "Dropping already-seen message", msgId = shortLog(msgId), peer
517519

@@ -599,25 +601,24 @@ method onTopicSubscription*(g: GossipSub, topic: string, subscribed: bool) =
599601

600602
g.mesh.del(topic)
601603

602-
603604
# Send unsubscribe (in reverse order to sub/graft)
604605
procCall PubSub(g).onTopicSubscription(topic, subscribed)
605606

606607
method publish*(g: GossipSub,
607608
topic: string,
608609
data: seq[byte]): Future[int] {.async.} =
609-
# base returns always 0
610-
discard await procCall PubSub(g).publish(topic, data)
611-
612610
logScope:
613611
topic
614612

615-
trace "Publishing message on topic", data = data.shortLog
616-
617613
if topic.len <= 0: # data could be 0/empty
618614
debug "Empty topic, skipping publish"
619615
return 0
620616

617+
# base returns always 0
618+
discard await procCall PubSub(g).publish(topic, data)
619+
620+
trace "Publishing message on topic", data = data.shortLog
621+
621622
var peers: HashSet[PubSubPeer]
622623

623624
# add always direct peers
@@ -630,38 +631,39 @@ method publish*(g: GossipSub,
630631
# With flood publishing enabled, the mesh is used when propagating messages from other peers,
631632
# but a peer's own messages will always be published to all known peers in the topic, limited
632633
# to the amount of peers we can send it to in one heartbeat
633-
var maxPeersToFlodOpt: Opt[int64]
634-
if g.parameters.bandwidthEstimatebps > 0:
635-
let
636-
bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate
637-
msToTransmit = max(data.len div bandwidth, 1)
638-
maxPeersToFlodOpt = Opt.some(max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow))
634+
635+
let maxPeersToFlood =
636+
if g.parameters.bandwidthEstimatebps > 0:
637+
let
638+
bandwidth = (g.parameters.bandwidthEstimatebps) div 8 div 1000 # Divisions are to convert it to Bytes per ms TODO replace with bandwidth estimate
639+
msToTransmit = max(data.len div bandwidth, 1)
640+
max(g.parameters.heartbeatInterval.milliseconds div msToTransmit, g.parameters.dLow)
641+
else:
642+
int.high() # unlimited
639643

640644
for peer in g.gossipsub.getOrDefault(topic):
641-
maxPeersToFlodOpt.withValue(maxPeersToFlod):
642-
if peers.len >= maxPeersToFlod: break
645+
if peers.len >= maxPeersToFlood: break
646+
643647
if peer.score >= g.parameters.publishThreshold:
644648
trace "publish: including flood/high score peer", peer
645649
peers.incl(peer)
646650

647-
if peers.len < g.parameters.dLow:
648-
# not subscribed, or bad mesh, send to fanout peers
649-
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
650-
if fanoutPeers.len < g.parameters.dLow:
651-
g.replenishFanout(topic)
652-
fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
651+
elif peers.len < g.parameters.dLow:
652+
# not subscribed or bad mesh, send to fanout peers
653+
# when flood-publishing, fanout won't help since all potential peers have
654+
# already been added
653655

656+
g.replenishFanout(topic) # Make sure fanout is populated
657+
658+
var fanoutPeers = g.fanout.getOrDefault(topic).toSeq()
654659
g.rng.shuffle(fanoutPeers)
655660

656661
for fanPeer in fanoutPeers:
657662
peers.incl(fanPeer)
658663
if peers.len > g.parameters.d: break
659664

660-
# even if we couldn't publish,
661-
# we still attempted to publish
662-
# on the topic, so it makes sense
663-
# to update the last topic publish
664-
# time
665+
# Attempting to publish counts as fanout send (even if the message
666+
# ultimately is not sent)
665667
g.lastFanoutPubSub[topic] = Moment.fromNow(g.parameters.fanoutTTL)
666668

667669
if peers.len == 0:
@@ -690,7 +692,9 @@ method publish*(g: GossipSub,
690692
trace "Created new message", msg = shortLog(msg), peers = peers.len
691693

692694
if g.addSeen(g.salt(msgId)):
693-
# custom msgid providers might cause this
695+
# If the message was received or published recently, don't re-publish it -
696+
# this might happen when not using sequence id:s and / or with a custom
697+
# message id provider
694698
trace "Dropping already-seen message"
695699
return 0
696700

libp2p/protocols/pubsub/gossipsub/behavior.nim

+4-5
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ proc getPeers(prune: ControlPrune, peer: PubSubPeer): seq[(PeerId, Option[PeerRe
204204

205205
routingRecords
206206

207-
208207
proc handlePrune*(g: GossipSub, peer: PubSubPeer, prunes: seq[ControlPrune]) =
209208
for prune in prunes:
210209
let topic = prune.topicID
@@ -611,10 +610,10 @@ proc getGossipPeers*(g: GossipSub): Table[PubSubPeer, ControlMessage] =
611610
x notin gossipPeers and
612611
x.score >= g.parameters.gossipThreshold
613612

614-
var target = g.parameters.dLazy
615-
let factor = (g.parameters.gossipFactor.float * allPeers.len.float).int
616-
if factor > target:
617-
target = min(factor, allPeers.len)
613+
# https://github.com/libp2p/specs/blob/98c5aa9421703fc31b0833ad8860a55db15be063/pubsub/gossipsub/gossipsub-v1.1.md#adaptive-gossip-dissemination
614+
let
615+
factor = (g.parameters.gossipFactor.float * allPeers.len.float).int
616+
target = max(g.parameters.dLazy, factor)
618617

619618
if target < allPeers.len:
620619
g.rng.shuffle(allPeers)

libp2p/protocols/pubsub/gossipsub/types.nim

-2
Original file line numberDiff line numberDiff line change
@@ -172,8 +172,6 @@ type
172172
subscribedDirectPeers*: PeerTable # directpeers that we keep alive
173173
backingOff*: BackoffTable # peers to backoff from when replenishing the mesh
174174
lastFanoutPubSub*: Table[string, Moment] # last publish time for fanout topics
175-
gossip*: Table[string, seq[ControlIHave]] # pending gossip
176-
control*: Table[string, ControlMessage] # pending control messages
177175
mcache*: MCache # messages cache
178176
validationSeen*: ValidationSeenTable # peers who sent us message in validation
179177
heartbeatFut*: Future[void] # cancellation future for heartbeat interval

0 commit comments

Comments
 (0)