Skip to content

Commit 48a3ac0

Browse files
authored
{.async: (raises).} for MultistreamSelect (#1066)
1 parent 49a92e5 commit 48a3ac0

File tree

5 files changed

+132
-53
lines changed

5 files changed

+132
-53
lines changed

libp2p/multistream.nim

+77-35
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Nim-LibP2P
2-
# Copyright (c) 2023 Status Research & Development GmbH
2+
# Copyright (c) 2023-2024 Status Research & Development GmbH
33
# Licensed under either of
44
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
55
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -45,15 +45,18 @@ proc new*(T: typedesc[MultistreamSelect]): T =
4545
)
4646

4747
template validateSuffix(str: string): untyped =
48-
if str.endsWith("\n"):
49-
str.removeSuffix("\n")
50-
else:
51-
raise newException(MultiStreamError, "MultistreamSelect failed, malformed message")
52-
53-
proc select*(_: MultistreamSelect | type MultistreamSelect,
54-
conn: Connection,
55-
proto: seq[string]):
56-
Future[string] {.async.} =
48+
if str.endsWith("\n"):
49+
str.removeSuffix("\n")
50+
else:
51+
raise (ref MultiStreamError)(msg:
52+
"MultistreamSelect failed, malformed message")
53+
54+
proc select*(
55+
_: MultistreamSelect | type MultistreamSelect,
56+
conn: Connection,
57+
proto: seq[string]
58+
): Future[string] {.async: (raises: [
59+
CancelledError, LPStreamError, MultiStreamError]).} =
5760
trace "initiating handshake", conn, codec = Codec
5861
## select a remote protocol
5962
await conn.writeLp(Codec & "\n") # write handshake
@@ -66,7 +69,7 @@ proc select*(_: MultistreamSelect | type MultistreamSelect,
6669

6770
if s != Codec:
6871
notice "handshake failed", conn, codec = s
69-
raise newException(MultiStreamError, "MultistreamSelect handshake failed")
72+
raise (ref MultiStreamError)(msg: "MultistreamSelect handshake failed")
7073
else:
7174
trace "multistream handshake success", conn
7275

@@ -98,19 +101,29 @@ proc select*(_: MultistreamSelect | type MultistreamSelect,
98101
# No alternatives, fail
99102
return ""
100103

101-
proc select*(_: MultistreamSelect | type MultistreamSelect,
102-
conn: Connection,
103-
proto: string): Future[bool] {.async.} =
104+
proc select*(
105+
_: MultistreamSelect | type MultistreamSelect,
106+
conn: Connection,
107+
proto: string
108+
): Future[bool] {.async: (raises: [
109+
CancelledError, LPStreamError, MultiStreamError]).} =
104110
if proto.len > 0:
105-
return (await MultistreamSelect.select(conn, @[proto])) == proto
111+
(await MultistreamSelect.select(conn, @[proto])) == proto
106112
else:
107-
return (await MultistreamSelect.select(conn, @[])) == Codec
113+
(await MultistreamSelect.select(conn, @[])) == Codec
108114

109-
proc select*(m: MultistreamSelect, conn: Connection): Future[bool] =
115+
proc select*(
116+
m: MultistreamSelect,
117+
conn: Connection
118+
): Future[bool] {.async: (raises: [
119+
CancelledError, LPStreamError, MultiStreamError], raw: true).} =
110120
m.select(conn, "")
111121

112-
proc list*(m: MultistreamSelect,
113-
conn: Connection): Future[seq[string]] {.async.} =
122+
proc list*(
123+
m: MultistreamSelect,
124+
conn: Connection
125+
): Future[seq[string]] {.async: (raises: [
126+
CancelledError, LPStreamError, MultiStreamError]).} =
114127
## list remote protos requests on connection
115128
if not await m.select(conn):
116129
return
@@ -126,12 +139,13 @@ proc list*(m: MultistreamSelect,
126139
result = list
127140

128141
proc handle*(
129-
_: type MultistreamSelect,
130-
conn: Connection,
131-
protos: seq[string],
132-
matchers = newSeq[Matcher](),
133-
active: bool = false,
134-
): Future[string] {.async.} =
142+
_: type MultistreamSelect,
143+
conn: Connection,
144+
protos: seq[string],
145+
matchers = newSeq[Matcher](),
146+
active: bool = false
147+
): Future[string] {.async: (raises: [
148+
CancelledError, LPStreamError, MultiStreamError]).} =
135149
trace "Starting multistream negotiation", conn, handshaked = active
136150
var handshaked = active
137151
while not conn.atEof:
@@ -140,8 +154,8 @@ proc handle*(
140154

141155
if not handshaked and ms != Codec:
142156
debug "expected handshake message", conn, instead=ms
143-
raise newException(CatchableError,
144-
"MultistreamSelect handling failed, invalid first message")
157+
raise (ref MultiStreamError)(msg:
158+
"MultistreamSelect handling failed, invalid first message")
145159

146160
trace "handle: got request", conn, ms
147161
if ms.len() <= 0:
@@ -172,26 +186,30 @@ proc handle*(
172186
trace "no handlers", conn, protocol = ms
173187
await conn.writeLp(Na)
174188

175-
proc handle*(m: MultistreamSelect, conn: Connection, active: bool = false) {.async.} =
189+
proc handle*(
190+
m: MultistreamSelect,
191+
conn: Connection,
192+
active: bool = false) {.async: (raises: [CancelledError]).} =
176193
trace "Starting multistream handler", conn, handshaked = active
177194
var
178195
protos: seq[string]
179196
matchers: seq[Matcher]
180197
for h in m.handlers:
181-
if not isNil(h.match):
198+
if h.match != nil:
182199
matchers.add(h.match)
183200
for proto in h.protos:
184201
protos.add(proto)
185202

186203
try:
187204
let ms = await MultistreamSelect.handle(conn, protos, matchers, active)
188205
for h in m.handlers:
189-
if (not isNil(h.match) and h.match(ms)) or h.protos.contains(ms):
206+
if (h.match != nil and h.match(ms)) or h.protos.contains(ms):
190207
trace "found handler", conn, protocol = ms
191208

192209
var protocolHolder = h
193210
let maxIncomingStreams = protocolHolder.protocol.maxIncomingStreams
194-
if protocolHolder.openedStreams.getOrDefault(conn.peerId) >= maxIncomingStreams:
211+
if protocolHolder.openedStreams.getOrDefault(conn.peerId) >=
212+
maxIncomingStreams:
195213
debug "Max streams for protocol reached, blocking new stream",
196214
conn, protocol = ms, maxIncomingStreams
197215
return
@@ -242,8 +260,32 @@ proc addHandler*(m: MultistreamSelect,
242260
protocol: protocol,
243261
match: matcher))
244262

245-
proc start*(m: MultistreamSelect) {.async.} =
246-
await allFutures(m.handlers.mapIt(it.protocol.start()))
263+
proc start*(m: MultistreamSelect) {.async: (raises: [CancelledError]).} =
264+
let
265+
handlers = m.handlers
266+
futs = handlers.mapIt(it.protocol.start())
267+
try:
268+
await allFutures(futs)
269+
for fut in futs:
270+
await fut
271+
except CancelledError as exc:
272+
var pending: seq[Future[void].Raising([])]
273+
for i, fut in futs:
274+
if not fut.finished:
275+
pending.add noCancel fut.cancelAndWait()
276+
elif fut.completed:
277+
pending.add handlers[i].protocol.stop()
278+
else:
279+
static: doAssert typeof(fut).E is (CancelledError,)
280+
await noCancel allFutures(pending)
281+
raise exc
282+
247283

248-
proc stop*(m: MultistreamSelect) {.async.} =
249-
await allFutures(m.handlers.mapIt(it.protocol.stop()))
284+
proc stop*(m: MultistreamSelect) {.async: (raises: []).} =
285+
# Nim 1.6.18: Using `mapIt` results in a seq of `.Raising([CancelledError])`
286+
var futs = newSeqOfCap[Future[void].Raising([])](m.handlers.len)
287+
for it in m.handlers:
288+
futs.add it.protocol.stop()
289+
await noCancel allFutures(futs)
290+
for fut in futs:
291+
await fut

libp2p/protocols/connectivity/relay/relay.nim

+13-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Nim-LibP2P
2-
# Copyright (c) 2023 Status Research & Development GmbH
2+
# Copyright (c) 2023-2024 Status Research & Development GmbH
33
# Licensed under either of
44
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
55
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -361,17 +361,25 @@ proc deletesReservation(r: Relay) {.async.} =
361361
if n > r.rsvp[k]:
362362
r.rsvp.del(k)
363363

364-
method start*(r: Relay) {.async.} =
364+
method start*(
365+
r: Relay
366+
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
367+
let fut = newFuture[void]()
368+
fut.complete()
365369
if not r.reservationLoop.isNil:
366370
warn "Starting relay twice"
367-
return
371+
return fut
368372
r.reservationLoop = r.deletesReservation()
369373
r.started = true
374+
fut
370375

371-
method stop*(r: Relay) {.async.} =
376+
method stop*(r: Relay): Future[void] {.async: (raises: [], raw: true).} =
377+
let fut = newFuture[void]()
378+
fut.complete()
372379
if r.reservationLoop.isNil:
373380
warn "Stopping relay without starting it"
374-
return
381+
return fut
375382
r.started = false
376383
r.reservationLoop.cancel()
377384
r.reservationLoop = nil
385+
fut

libp2p/protocols/protocol.nim

+14-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Nim-LibP2P
2-
# Copyright (c) 2023 Status Research & Development GmbH
2+
# Copyright (c) 2023-2024 Status Research & Development GmbH
33
# Licensed under either of
44
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
55
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -31,8 +31,19 @@ type
3131
maxIncomingStreams: Opt[int]
3232

3333
method init*(p: LPProtocol) {.base, gcsafe.} = discard
34-
method start*(p: LPProtocol) {.async, base.} = p.started = true
35-
method stop*(p: LPProtocol) {.async, base.} = p.started = false
34+
35+
method start*(
36+
p: LPProtocol) {.async: (raises: [CancelledError], raw: true), base.} =
37+
let fut = newFuture[void]()
38+
fut.complete()
39+
p.started = true
40+
fut
41+
42+
method stop*(p: LPProtocol) {.async: (raises: [], raw: true), base.} =
43+
let fut = newFuture[void]()
44+
fut.complete()
45+
p.started = false
46+
fut
3647

3748
proc maxIncomingStreams*(p: LPProtocol): int =
3849
p.maxIncomingStreams.get(DefaultMaxIncomingStreams)

libp2p/protocols/pubsub/gossipsub.nim

+15-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Nim-LibP2P
2-
# Copyright (c) 2023 Status Research & Development GmbH
2+
# Copyright (c) 2023-2024 Status Research & Development GmbH
33
# Licensed under either of
44
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
55
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -701,30 +701,40 @@ proc maintainDirectPeers(g: GossipSub) {.async.} =
701701
for id, addrs in g.parameters.directPeers:
702702
await g.addDirectPeer(id, addrs)
703703

704-
method start*(g: GossipSub) {.async.} =
704+
method start*(
705+
g: GossipSub
706+
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
707+
let fut = newFuture[void]()
708+
fut.complete()
709+
705710
trace "gossipsub start"
706711

707712
if not g.heartbeatFut.isNil:
708713
warn "Starting gossipsub twice"
709-
return
714+
return fut
710715

711716
g.heartbeatFut = g.heartbeat()
712717
g.scoringHeartbeatFut = g.scoringHeartbeat()
713718
g.directPeersLoop = g.maintainDirectPeers()
714719
g.started = true
720+
fut
721+
722+
method stop*(g: GossipSub): Future[void] {.async: (raises: [], raw: true).} =
723+
let fut = newFuture[void]()
724+
fut.complete()
715725

716-
method stop*(g: GossipSub) {.async.} =
717726
trace "gossipsub stop"
718727
g.started = false
719728
if g.heartbeatFut.isNil:
720729
warn "Stopping gossipsub without starting it"
721-
return
730+
return fut
722731

723732
# stop heartbeat interval
724733
g.directPeersLoop.cancel()
725734
g.scoringHeartbeatFut.cancel()
726735
g.heartbeatFut.cancel()
727736
g.heartbeatFut = nil
737+
fut
728738

729739
method initPubSub*(g: GossipSub)
730740
{.raises: [InitializationError].} =

libp2p/protocols/rendezvous.nim

+13-5
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
# Nim-LibP2P
2-
# Copyright (c) 2023 Status Research & Development GmbH
2+
# Copyright (c) 2023-2024 Status Research & Development GmbH
33
# Licensed under either of
44
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
55
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
@@ -678,17 +678,25 @@ proc deletesRegister(rdv: RendezVous) {.async.} =
678678
libp2p_rendezvous_registered.set(int64(total))
679679
libp2p_rendezvous_namespaces.set(int64(rdv.namespaces.len))
680680

681-
method start*(rdv: RendezVous) {.async.} =
681+
method start*(
682+
rdv: RendezVous
683+
): Future[void] {.async: (raises: [CancelledError], raw: true).} =
684+
let fut = newFuture[void]()
685+
fut.complete()
682686
if not rdv.registerDeletionLoop.isNil:
683687
warn "Starting rendezvous twice"
684-
return
688+
return fut
685689
rdv.registerDeletionLoop = rdv.deletesRegister()
686690
rdv.started = true
691+
fut
687692

688-
method stop*(rdv: RendezVous) {.async.} =
693+
method stop*(rdv: RendezVous): Future[void] {.async: (raises: [], raw: true).} =
694+
let fut = newFuture[void]()
695+
fut.complete()
689696
if rdv.registerDeletionLoop.isNil:
690697
warn "Stopping rendezvous without starting it"
691-
return
698+
return fut
692699
rdv.started = false
693700
rdv.registerDeletionLoop.cancel()
694701
rdv.registerDeletionLoop = nil
702+
fut

0 commit comments

Comments
 (0)