Skip to content

Commit 920e114

Browse files
committed
fix(webrtc): improve link dialer implementation
Signed-off-by: Christian Stewart <christian@aperture.us>
1 parent c97e12d commit 920e114

File tree

2 files changed

+43
-23
lines changed

2 files changed

+43
-23
lines changed

transport/webrtc/session.go

+11-9
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
transport_quic "github.com/aperturerobotics/bifrost/transport/common/quic"
1313
"github.com/aperturerobotics/bifrost/util/rwc"
1414
"github.com/aperturerobotics/util/broadcast"
15-
"github.com/aperturerobotics/util/ccontainer"
1615
"github.com/aperturerobotics/util/keyed"
1716
"github.com/aperturerobotics/util/routine"
1817
"github.com/aperturerobotics/util/scrub"
@@ -49,8 +48,9 @@ type sessionTracker struct {
4948
xmitRoutine *routine.StateRoutineContainer[*outgoingSignal]
5049
// linkRoutine is the routine that manages the Quic link when the session dcOpen.
5150
linkRoutine *routine.StateRoutineContainer[datachannel.ReadWriteCloser]
52-
// linkCtr contains the current link
53-
linkCtr *ccontainer.CContainer[*transport_quic.Link]
51+
// link contains the current link, if any
52+
// w.bcast is broadcasted when this changes
53+
link *transport_quic.Link
5454
}
5555

5656
// newSessionTracker constructs a new sessionTracker.
@@ -71,7 +71,6 @@ func (w *WebRTC) newSessionTracker(peerIDStr string) (keyed.Routine, *sessionTra
7171
}
7272

7373
sess.errCh = make(chan error, 1)
74-
sess.linkCtr = ccontainer.NewCContainer[*transport_quic.Link](nil)
7574

7675
sess.linkRoutine = routine.NewStateRoutineContainer[datachannel.ReadWriteCloser](
7776
func(t1, t2 datachannel.ReadWriteCloser) bool { return t1 == t2 },
@@ -192,11 +191,11 @@ func (s *sessionTracker) executeLink(ctx context.Context, dcRwc datachannel.Read
192191
if wasClosed.Swap(true) {
193192
return
194193
}
195-
s.linkCtr.SwapValue(func(v *transport_quic.Link) *transport_quic.Link {
196-
if v == nextLink {
197-
return nil
194+
s.w.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
195+
if s.link == nextLink {
196+
s.link = nil
197+
broadcast()
198198
}
199-
return v
200199
})
201200
go s.w.handler.HandleLinkLost(nextLink)
202201
_ = dcRwc.Close()
@@ -218,7 +217,10 @@ func (s *sessionTracker) executeLink(ctx context.Context, dcRwc datachannel.Read
218217
}
219218

220219
// Link established.
221-
s.linkCtr.SetValue(nextLink)
220+
s.w.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
221+
s.link = nextLink
222+
broadcast()
223+
})
222224
s.w.handler.HandleLinkEstablished(nextLink)
223225

224226
// Cleanup link on exit

transport/webrtc/webrtc.go

+32-14
Original file line numberDiff line numberDiff line change
@@ -234,27 +234,45 @@ func (w *WebRTC) DialPeer(
234234
return false, nil
235235
}
236236

237-
// Add the session reference.
238-
ref, tkr, existed, err := w.addSessionTrackerRef(peerIDStr)
237+
var ref *keyed.KeyedRef[string, *sessionTracker]
238+
var waitCh <-chan struct{}
239+
var tkr *sessionTracker
240+
var lnk *transport_quic.Link
241+
242+
w.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
243+
// Add the session reference.
244+
var existed bool
245+
ref, tkr, existed, err = w.addSessionTrackerRef(peerIDStr)
246+
// Notify signal handlers if it didn't exist
247+
if err == nil && !existed {
248+
broadcast()
249+
}
250+
if tkr != nil {
251+
lnk = tkr.link
252+
}
253+
waitCh = getWaitCh()
254+
})
255+
if ref != nil {
256+
defer ref.Release()
257+
}
239258
if err != nil {
240259
return false, err
241260
}
242-
defer ref.Release()
243261

244-
// Notify signal handlers if it didn't exist
245-
if !existed {
246-
w.bcast.Broadcast()
247-
}
262+
// Wait for the link to be established
263+
for lnk == nil {
264+
select {
265+
case <-ctx.Done():
266+
return false, context.Canceled
267+
case <-waitCh:
268+
}
248269

249-
// Wait for the session to be established.
250-
// TODO: what if the session tracker is disposed?
251-
_, err = tkr.linkCtr.WaitValue(ctx, nil)
252-
if err != nil {
253-
return false, err
270+
w.bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
271+
lnk = tkr.link
272+
waitCh = getWaitCh()
273+
})
254274
}
255275

256-
// Release the ref
257-
ref.Release()
258276
return false, nil
259277
}
260278

0 commit comments

Comments
 (0)