@@ -21,16 +21,56 @@ import ../pubsub
21
21
logScope:
22
22
topics = " libp2p gossipsub"
23
23
24
- declareGauge (libp2p_gossipsub_peers_scores, " the scores of the peers in gossipsub" , labels = [" agent" ])
25
- declareCounter (libp2p_gossipsub_bad_score_disconnection, " the number of peers disconnected by gossipsub" , labels = [" agent" ])
26
- declareGauge (libp2p_gossipsub_peers_score_firstMessageDeliveries, " Detailed gossipsub scoring metric" , labels = [" agent" ])
27
- declareGauge (libp2p_gossipsub_peers_score_meshMessageDeliveries, " Detailed gossipsub scoring metric" , labels = [" agent" ])
28
- declareGauge (libp2p_gossipsub_peers_score_meshFailurePenalty, " Detailed gossipsub scoring metric" , labels = [" agent" ])
29
- declareGauge (libp2p_gossipsub_peers_score_invalidMessageDeliveries, " Detailed gossipsub scoring metric" , labels = [" agent" ])
30
- declareGauge (libp2p_gossipsub_peers_score_appScore, " Detailed gossipsub scoring metric" , labels = [" agent" ])
31
- declareGauge (libp2p_gossipsub_peers_score_behaviourPenalty, " Detailed gossipsub scoring metric" , labels = [" agent" ])
32
- declareGauge (libp2p_gossipsub_peers_score_colocationFactor, " Detailed gossipsub scoring metric" , labels = [" agent" ])
33
- declarePublicCounter (libp2p_gossipsub_peers_rate_limit_hits, " The number of times peers were above their rate limit" , labels = [" agent" ])
24
+ declareGauge (
25
+ libp2p_gossipsub_peers_scores,
26
+ " the scores of the peers in gossipsub" ,
27
+ labels = [" agent" ],
28
+ )
29
+ declareCounter (
30
+ libp2p_gossipsub_bad_score_disconnection,
31
+ " the number of peers disconnected by gossipsub" ,
32
+ labels = [" agent" ],
33
+ )
34
+ declareGauge (
35
+ libp2p_gossipsub_peers_score_firstMessageDeliveries,
36
+ " Detailed gossipsub scoring metric" ,
37
+ labels = [" agent" ],
38
+ )
39
+ declareGauge (
40
+ libp2p_gossipsub_peers_score_meshMessageDeliveries,
41
+ " Detailed gossipsub scoring metric" ,
42
+ labels = [" agent" ],
43
+ )
44
+ declareGauge (
45
+ libp2p_gossipsub_peers_score_meshFailurePenalty,
46
+ " Detailed gossipsub scoring metric" ,
47
+ labels = [" agent" ],
48
+ )
49
+ declareGauge (
50
+ libp2p_gossipsub_peers_score_invalidMessageDeliveries,
51
+ " Detailed gossipsub scoring metric" ,
52
+ labels = [" agent" ],
53
+ )
54
+ declareGauge (
55
+ libp2p_gossipsub_peers_score_appScore,
56
+ " Detailed gossipsub scoring metric" ,
57
+ labels = [" agent" ],
58
+ )
59
+ declareGauge (
60
+ libp2p_gossipsub_peers_score_behaviourPenalty,
61
+ " Detailed gossipsub scoring metric" ,
62
+ labels = [" agent" ],
63
+ )
64
+ declareGauge (
65
+ libp2p_gossipsub_peers_score_colocationFactor,
66
+ " Detailed gossipsub scoring metric" ,
67
+ labels = [" agent" ],
68
+ )
69
+ declarePublicCounter (
70
+ libp2p_gossipsub_peers_rate_limit_hits,
71
+ " The number of times peers were above their rate limit" ,
72
+ labels = [" agent" ],
73
+ )
34
74
35
75
proc init * (_: type [TopicParams ]): TopicParams =
36
76
TopicParams (
@@ -50,36 +90,42 @@ proc init*(_: type[TopicParams]): TopicParams =
50
90
meshFailurePenaltyWeight: - 1.0 ,
51
91
meshFailurePenaltyDecay: 0.5 ,
52
92
invalidMessageDeliveriesWeight: - 1.0 ,
53
- invalidMessageDeliveriesDecay: 0.5
93
+ invalidMessageDeliveriesDecay: 0.5 ,
54
94
)
55
95
56
96
proc withPeerStats * (
57
97
g: GossipSub ,
58
98
peerId: PeerId ,
59
- action: proc (stats: var PeerStats ) {.gcsafe , raises : [].}) =
99
+ action: proc (stats: var PeerStats ) {.gcsafe , raises : [].},
100
+ ) =
60
101
# # Add or update peer statistics for a particular peer id - the statistics
61
102
# # are retained across multiple connections until they expire
62
- g.peerStats.withValue (peerId, stats) do :
103
+ g.peerStats.withValue (peerId, stats):
63
104
action (stats[])
64
105
do :
65
- action (g.peerStats.mgetOrPut (peerId, PeerStats (
66
- expire: Moment .now () + g.parameters.retainScore
67
- )))
106
+ action (
107
+ g.peerStats.mgetOrPut (
108
+ peerId, PeerStats (expire: Moment .now () + g.parameters.retainScore)
109
+ )
110
+ )
68
111
69
112
func `/` (a, b: Duration ): float64 =
70
113
let
71
114
fa = float64 (a.nanoseconds)
72
115
fb = float64 (b.nanoseconds)
73
116
fa / fb
74
117
75
- func byScore * (x,y: PubSubPeer ): int = system.cmp (x.score, y.score)
118
+ func byScore * (x, y: PubSubPeer ): int =
119
+ system.cmp (x.score, y.score)
76
120
77
121
proc colocationFactor (g: GossipSub , peer: PubSubPeer ): float64 =
78
- let address = peer.address.valueOr: return 0.0
122
+ let
123
+ address =
124
+ peer.address.valueOr:
125
+ return 0.0
79
126
80
127
g.peersInIP.mgetOrPut (address, initHashSet [PeerId ]()).incl (peer.peerId)
81
- let
82
- ipPeers = g.peersInIP.getOrDefault (address).len ().float64
128
+ let ipPeers = g.peersInIP.getOrDefault (address).len ().float64
83
129
if ipPeers > g.parameters.ipColocationFactorThreshold:
84
130
trace " colocationFactor over threshold" , peer, address, ipPeers
85
131
let over = ipPeers - g.parameters.ipColocationFactorThreshold
@@ -97,7 +143,7 @@ proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
97
143
98
144
proc disconnectIfBadScorePeer * (g: GossipSub , peer: PubSubPeer , score: float64 ) =
99
145
if g.parameters.disconnectBadPeers and score < g.parameters.graylistThreshold and
100
- peer.peerId notin g.parameters.directPeers:
146
+ peer.peerId notin g.parameters.directPeers:
101
147
debug " disconnecting bad score peer" , peer, score = peer.score
102
148
asyncSpawn (g.disconnectPeer (peer))
103
149
libp2p_gossipsub_bad_score_disconnection.inc (labelValues = [peer.getAgent ()])
@@ -112,7 +158,7 @@ proc updateScores*(g: GossipSub) = # avoid async
112
158
113
159
for peerId, stats in g.peerStats.mpairs:
114
160
let peer = g.peers.getOrDefault (peerId)
115
- if isNil (peer) or not (peer.connected):
161
+ if isNil (peer) or not (peer.connected):
116
162
if now > stats.expire:
117
163
evicting.add (peerId)
118
164
trace " evicted peer from memory" , peer = peerId
@@ -149,32 +195,48 @@ proc updateScores*(g: GossipSub) = # avoid async
149
195
else :
150
196
info.meshMessageDeliveriesActive = false
151
197
152
- topicScore += info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight
198
+ topicScore +=
199
+ info.firstMessageDeliveries * topicParams.firstMessageDeliveriesWeight
153
200
trace " p2" , peer, p2 = info.firstMessageDeliveries, topic, topicScore
154
201
155
202
if info.meshMessageDeliveriesActive:
156
203
if info.meshMessageDeliveries < topicParams.meshMessageDeliveriesThreshold:
157
- let deficit = topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries
204
+ let
205
+ deficit =
206
+ topicParams.meshMessageDeliveriesThreshold - info.meshMessageDeliveries
158
207
let p3 = deficit * deficit
159
208
trace " p3" , peer, p3, topic, topicScore
160
209
topicScore += p3 * topicParams.meshMessageDeliveriesWeight
161
210
162
211
topicScore += info.meshFailurePenalty * topicParams.meshFailurePenaltyWeight
163
212
trace " p3b" , peer, p3b = info.meshFailurePenalty, topic, topicScore
164
213
165
- topicScore += info.invalidMessageDeliveries * info.invalidMessageDeliveries * topicParams.invalidMessageDeliveriesWeight
166
- trace " p4" , p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries, topic, topicScore
214
+ topicScore +=
215
+ info.invalidMessageDeliveries * info.invalidMessageDeliveries *
216
+ topicParams.invalidMessageDeliveriesWeight
217
+ trace " p4" ,
218
+ p4 = info.invalidMessageDeliveries * info.invalidMessageDeliveries,
219
+ topic,
220
+ topicScore
167
221
168
222
trace " updated peer topic's scores" , peer, topic, info, topicScore
169
223
170
224
score += topicScore * topicParams.topicWeight
171
225
172
226
# Score metrics
173
227
let agent = peer.getAgent ()
174
- libp2p_gossipsub_peers_score_firstMessageDeliveries.inc (info.firstMessageDeliveries, labelValues = [agent])
175
- libp2p_gossipsub_peers_score_meshMessageDeliveries.inc (info.meshMessageDeliveries, labelValues = [agent])
176
- libp2p_gossipsub_peers_score_meshFailurePenalty.inc (info.meshFailurePenalty, labelValues = [agent])
177
- libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc (info.invalidMessageDeliveries, labelValues = [agent])
228
+ libp2p_gossipsub_peers_score_firstMessageDeliveries.inc (
229
+ info.firstMessageDeliveries, labelValues = [agent]
230
+ )
231
+ libp2p_gossipsub_peers_score_meshMessageDeliveries.inc (
232
+ info.meshMessageDeliveries, labelValues = [agent]
233
+ )
234
+ libp2p_gossipsub_peers_score_meshFailurePenalty.inc (
235
+ info.meshFailurePenalty, labelValues = [agent]
236
+ )
237
+ libp2p_gossipsub_peers_score_invalidMessageDeliveries.inc (
238
+ info.invalidMessageDeliveries, labelValues = [agent]
239
+ )
178
240
179
241
# Score decay
180
242
info.firstMessageDeliveries *= topicParams.firstMessageDeliveriesDecay
@@ -199,18 +261,22 @@ proc updateScores*(g: GossipSub) = # avoid async
199
261
200
262
score += peer.appScore * g.parameters.appSpecificWeight
201
263
202
-
203
264
# The value of the parameter is the square of the counter and is mixed with a negative weight.
204
- score += peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight
265
+ score +=
266
+ peer.behaviourPenalty * peer.behaviourPenalty * g.parameters.behaviourPenaltyWeight
205
267
206
268
let colocationFactor = g.colocationFactor (peer)
207
269
score += colocationFactor * g.parameters.ipColocationFactorWeight
208
270
209
271
# Score metrics
210
272
let agent = peer.getAgent ()
211
273
libp2p_gossipsub_peers_score_appScore.inc (peer.appScore, labelValues = [agent])
212
- libp2p_gossipsub_peers_score_behaviourPenalty.inc (peer.behaviourPenalty, labelValues = [agent])
213
- libp2p_gossipsub_peers_score_colocationFactor.inc (colocationFactor, labelValues = [agent])
274
+ libp2p_gossipsub_peers_score_behaviourPenalty.inc (
275
+ peer.behaviourPenalty, labelValues = [agent]
276
+ )
277
+ libp2p_gossipsub_peers_score_colocationFactor.inc (
278
+ colocationFactor, labelValues = [agent]
279
+ )
214
280
215
281
# decay behaviourPenalty
216
282
peer.behaviourPenalty *= g.parameters.behaviourPenaltyDecay
@@ -240,15 +306,19 @@ proc scoringHeartbeat*(g: GossipSub) {.async.} =
240
306
trace " running scoring heartbeat" , instance = cast [int ](g)
241
307
g.updateScores ()
242
308
243
- proc punishInvalidMessage * (g: GossipSub , peer: PubSubPeer , msg: Message ) {.async .} =
309
+ proc punishInvalidMessage * (g: GossipSub , peer: PubSubPeer , msg: Message ) {.async .} =
244
310
let uselessAppBytesNum = msg.data.len
245
311
peer.overheadRateLimitOpt.withValue (overheadRateLimit):
246
312
if not overheadRateLimit.tryConsume (uselessAppBytesNum):
247
- debug " Peer sent invalid message and it's above rate limit" , peer, uselessAppBytesNum
248
- libp2p_gossipsub_peers_rate_limit_hits.inc (labelValues = [peer.getAgent ()]) # let's just measure at the beginning for test purposes.
313
+ debug " Peer sent invalid message and it's above rate limit" ,
314
+ peer, uselessAppBytesNum
315
+ libp2p_gossipsub_peers_rate_limit_hits.inc (labelValues = [peer.getAgent ()])
316
+ # let's just measure at the beginning for test purposes.
249
317
if g.parameters.disconnectPeerAboveRateLimit:
250
318
await g.disconnectPeer (peer)
251
- raise newException (PeerRateLimitError , " Peer disconnected because it's above rate limit." )
319
+ raise newException (
320
+ PeerRateLimitError , " Peer disconnected because it's above rate limit."
321
+ )
252
322
253
323
let topic = msg.topicId
254
324
# TODO : REVIEW
@@ -263,34 +333,31 @@ proc addCapped*[T](stat: var T, diff, cap: T) =
263
333
stat += min (diff, cap - stat)
264
334
265
335
proc rewardDelivered * (
266
- g: GossipSub ,
267
- peer: PubSubPeer ,
268
- topics: openArray [string ],
269
- first: bool ,
270
- delay = ZeroDuration ,
336
+ g: GossipSub , peer: PubSubPeer , topic: string , first: bool , delay = ZeroDuration
271
337
) =
272
- for tt in topics:
273
- let t = tt
274
- if t notin g.topics:
275
- continue
338
+ let t = topic
339
+ if t notin g.topics:
340
+ return
276
341
277
- let tt = t
342
+ let topic = t
278
343
279
- let topicParams = g.topicParams.mgetOrPut (t, TopicParams .init ())
280
- # if in mesh add more delivery score
344
+ let topicParams = g.topicParams.mgetOrPut (t, TopicParams .init ())
345
+ # if in mesh add more delivery score
281
346
282
- if delay > topicParams.meshMessageDeliveriesWindow:
283
- # Too old
284
- continue
347
+ if delay > topicParams.meshMessageDeliveriesWindow:
348
+ # Too old
349
+ return
285
350
286
- g.withPeerStats (peer.peerId) do (stats: var PeerStats ):
287
- stats.topicInfos.withValue (tt, tstats):
288
- if first:
289
- tstats[].firstMessageDeliveries.addCapped (
290
- 1 , topicParams.firstMessageDeliveriesCap)
291
-
292
- if tstats[].inMesh:
293
- tstats[].meshMessageDeliveries.addCapped (
294
- 1 , topicParams.meshMessageDeliveriesCap)
295
- do : # make sure we don't lose this information
296
- stats.topicInfos[tt] = TopicInfo (meshMessageDeliveries: 1 )
351
+ g.withPeerStats (peer.peerId) do (stats: var PeerStats ):
352
+ stats.topicInfos.withValue (topic, tstats):
353
+ if first:
354
+ tstats[].firstMessageDeliveries.addCapped (
355
+ 1 , topicParams.firstMessageDeliveriesCap
356
+ )
357
+
358
+ if tstats[].inMesh:
359
+ tstats[].meshMessageDeliveries.addCapped (
360
+ 1 , topicParams.meshMessageDeliveriesCap
361
+ )
362
+ do :
363
+ stats.topicInfos[topic] = TopicInfo (meshMessageDeliveries: 1 )
0 commit comments