Skip to content

Commit 325562d

Browse files
committed
refactor: updated idle callback logic from controllerbus
Rewrote some resolvers to be simpler & more robust using the newer patterns from controllerbus. Signed-off-by: Christian Stewart <christian@aperture.us>
1 parent d90acff commit 325562d

File tree

9 files changed

+78
-191
lines changed

9 files changed

+78
-191
lines changed

daemon/api/api_peer_info.go

+13-53
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@ package bifrost_api
22

33
import (
44
"context"
5+
"slices"
6+
"strings"
57

68
"github.com/aperturerobotics/bifrost/peer"
79
peer_api "github.com/aperturerobotics/bifrost/peer/api"
810
"github.com/aperturerobotics/controllerbus/bus"
9-
"github.com/aperturerobotics/controllerbus/directive"
1011
"github.com/pkg/errors"
1112
)
1213

@@ -24,64 +25,23 @@ func (a *API) GetPeerInfo(
2425
}
2526
}
2627

27-
subCtx, subCtxCancel := context.WithCancel(ctx)
28-
defer subCtxCancel()
29-
30-
resp := &peer_api.GetPeerInfoResponse{}
31-
di, dir, err := a.bus.AddDirective(
32-
peer.NewGetPeer(peerID),
33-
bus.NewCallbackHandler(func(v directive.AttachedValue) {
34-
pi, err := peer_api.NewPeerInfo(v.GetValue().(peer.Peer))
35-
if err != nil {
36-
return
37-
}
38-
resp.LocalPeers = append(resp.LocalPeers, pi)
39-
}, func(v directive.AttachedValue) {
40-
p := v.GetValue().(peer.Peer)
41-
pi, err := peer_api.NewPeerInfo(p)
42-
if err != nil {
43-
return
44-
}
45-
for i, r := range resp.LocalPeers {
46-
if pi.PeerId == r.PeerId {
47-
resp.LocalPeers[i] = resp.LocalPeers[len(resp.LocalPeers)-1]
48-
resp.LocalPeers[len(resp.LocalPeers)-1] = nil
49-
resp.LocalPeers = resp.LocalPeers[:len(resp.LocalPeers)-1]
50-
break
51-
}
52-
}
53-
}, func() {
54-
subCtxCancel()
55-
}),
56-
)
28+
vals, _, ref, err := bus.ExecCollectValues[peer.GetPeerValue](ctx, a.bus, peer.NewGetPeer(peerID), false, nil)
5729
if err != nil {
5830
return nil, err
5931
}
60-
defer dir.Release()
32+
ref.Release()
6133

62-
errCh := make(chan error, 1)
63-
rcb := di.AddIdleCallback(func(errs []error) {
64-
if len(errs) != 0 {
65-
select {
66-
case errCh <- errs[0]:
67-
return
68-
default:
69-
}
70-
}
71-
72-
subCtxCancel()
73-
})
74-
if rcb != nil {
75-
defer rcb()
34+
resp := &peer_api.GetPeerInfoResponse{}
35+
for _, val := range vals {
36+
resp.LocalPeers = append(resp.LocalPeers, peer_api.NewPeerInfo(val))
7637
}
7738

78-
select {
79-
case <-ctx.Done():
80-
return nil, ctx.Err()
81-
case err := <-errCh:
82-
return nil, err
83-
case <-subCtx.Done():
84-
}
39+
slices.SortFunc(resp.LocalPeers, func(a, b *peer_api.PeerInfo) int {
40+
return strings.Compare(a.GetPeerId(), b.GetPeerId())
41+
})
42+
resp.LocalPeers = slices.CompactFunc(resp.LocalPeers, func(a, b *peer_api.PeerInfo) bool {
43+
return a.GetPeerId() == b.GetPeerId()
44+
})
8545

8646
return resp, nil
8747
}

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/aperturerobotics/bifrost
33
go 1.22
44

55
require (
6-
github.com/aperturerobotics/controllerbus v0.38.1-0.20240404200601-a5d6ce4d901a // latest
6+
github.com/aperturerobotics/controllerbus v0.39.0 // latest
77
github.com/aperturerobotics/entitygraph v0.8.0
88
github.com/aperturerobotics/starpc v0.28.0 // latest
99
github.com/aperturerobotics/timestamp v0.8.2

go.sum

+2-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ github.com/aperturerobotics/bifrost-nats-client v1.10.1-0.20200831103200-24c3d04
66
github.com/aperturerobotics/bifrost-nats-client v1.10.1-0.20200831103200-24c3d0464e58/go.mod h1:ougcjYEZDYV8pVtaNbA5sgYDukkYHyKtSsW/T3B13j0=
77
github.com/aperturerobotics/bifrost-nats-server/v2 v2.1.8-0.20221228081037-b7c2df0c151f h1:bmScByQNGDPPy9T+zdwu816XaCbFtD5UDyqZMRiHJ80=
88
github.com/aperturerobotics/bifrost-nats-server/v2 v2.1.8-0.20221228081037-b7c2df0c151f/go.mod h1:kIcZtLpq4UIZzOqduYLm1mYU1nuMBtN6XuDCtQ21QT8=
9-
github.com/aperturerobotics/controllerbus v0.38.1-0.20240404200601-a5d6ce4d901a h1:Q/ie1VPcbX3wHe5EaEVRid5kfoY/ickF7AT8c4U4x3s=
10-
github.com/aperturerobotics/controllerbus v0.38.1-0.20240404200601-a5d6ce4d901a/go.mod h1:bdu9olnPmETpX29HupgamOCAD/lOHo0pyWbk3CfasVs=
9+
github.com/aperturerobotics/controllerbus v0.39.0 h1:ddBcxItjGJjYWt3MazfrJIcD5McERWZvpBa7tMrH8CM=
10+
github.com/aperturerobotics/controllerbus v0.39.0/go.mod h1:63kbiV8sosp0mYO2vuUXOjYnUf04DScQM96Rq3nCWwo=
1111
github.com/aperturerobotics/entitygraph v0.8.0 h1:1tEsGPAk4kzJRLrwxn69JZbQreSOkqHgr3Ue5BGpE0M=
1212
github.com/aperturerobotics/entitygraph v0.8.0/go.mod h1:F9CfzYNTnIj9d7z79+2l0owNqsuVfz/U2cTPZbEcFTw=
1313
github.com/aperturerobotics/logrus v1.9.4-0.20240119050608-13332fb58195 h1:uyeD1J23j/kFiCFO7rx+GQA4tCqOEy3IJyMK4f6vamE=

package.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
"prettier": "^3.1.1",
5151
"ts-poet": "6.7.0",
5252
"ts-proto": "^1.171.0",
53-
"typescript": "^5.1.6"
53+
"typescript": "^5.4.4"
5454
},
5555
"dependencies": {
5656
"@aperturerobotics/ts-proto-common-types": "^0.3.0",

peer/api/api.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ import (
55
)
66

77
// NewPeerInfo builds peer info from a peer.
8-
func NewPeerInfo(p peer.Peer) (*PeerInfo, error) {
8+
func NewPeerInfo(p peer.Peer) *PeerInfo {
99
pi := &PeerInfo{}
1010
pi.PeerId = peer.IDB58Encode(p.GetPeerID())
11-
return pi, nil
11+
return pi
1212
}

rpc/access/client-resolver.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ ClientLoop:
108108
valID, _ = handler.AddValue(val)
109109
}
110110
if resp.GetIdle() {
111-
handler.MarkIdle()
111+
handler.MarkIdle(true)
112112
}
113113
}
114114
}

rpc/access/server.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ func (s *AccessRpcServiceServer) LookupRpcService(
4242
var sendQueue []*LookupRpcServiceResponse
4343
var disposed bool
4444
var resErr error
45+
var resIdle bool
4546

4647
serverID := req.GetServerId()
4748
if s.serverIdCb != nil {
@@ -98,18 +99,23 @@ func (s *AccessRpcServiceServer) LookupRpcService(
9899
}
99100
defer ref.Release()
100101

101-
defer di.AddIdleCallback(func(resErrs []error) {
102+
defer di.AddIdleCallback(func(isIdle bool, resErrs []error) {
102103
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
103104
if resErr == nil {
104105
for _, err := range resErrs {
105106
if err != nil {
106107
resErr = err
108+
broadcast()
107109
break
108110
}
109111
}
110112
}
113+
if isIdle == resIdle {
114+
return
115+
}
116+
resIdle = isIdle
111117
sendQueue = append(sendQueue, &LookupRpcServiceResponse{
112-
Idle: true,
118+
Idle: isIdle,
113119
})
114120
broadcast()
115121
})
@@ -124,11 +130,17 @@ func (s *AccessRpcServiceServer) LookupRpcService(
124130

125131
var currSendQueue []*LookupRpcServiceResponse
126132
var currDisposed bool
133+
var currResErr error
134+
var currIdle bool
127135
bcast.HoldLock(func(broadcast func(), getWaitCh func() <-chan struct{}) {
128136
waitCh = getWaitCh()
129137
currSendQueue, currDisposed = sendQueue, disposed
138+
currResErr, currIdle = resErr, resIdle
130139
sendQueue = nil
131140
})
141+
if currIdle && currResErr != nil && currResErr != context.Canceled {
142+
return currResErr
143+
}
132144
for _, msg := range currSendQueue {
133145
if err := strm.Send(msg); err != nil {
134146
return err

tptaddr/controller/res-establish-link.go

+40-125
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,11 @@ package tptaddr_controller
22

33
import (
44
"context"
5-
"sync"
6-
"sync/atomic"
75

86
"github.com/aperturerobotics/bifrost/link"
97
"github.com/aperturerobotics/bifrost/tptaddr"
108
"github.com/aperturerobotics/controllerbus/bus"
119
"github.com/aperturerobotics/controllerbus/directive"
12-
"github.com/aperturerobotics/util/broadcast"
1310
)
1411

1512
// establishLinkResolver resolves establishLink directives
@@ -40,143 +37,61 @@ func (c *Controller) resolveEstablishLinkWithPeer(
4037
// The resolver will not be retried after returning an error.
4138
// Values will be maintained from the previous call.
4239
func (o *establishLinkResolver) Resolve(ctx context.Context, handler directive.ResolverHandler) error {
43-
var mtx sync.Mutex
44-
var bcast broadcast.Broadcast
45-
var lookupIdle bool
46-
var incomingDialers []string
47-
var delDialers []string
48-
4940
// Create LookupTptAddr directive.
5041
// When a new address is added, add a directive to dial that address.
51-
lookupDi, lookupRef, err := o.c.bus.AddDirective(
42+
di, ref, err := bus.ExecWatchTransformEffect[tptaddr.LookupTptAddrValue, tptaddr.DialTptAddr](
43+
ctx,
44+
func(ctx context.Context, val directive.TypedAttachedValue[tptaddr.LookupTptAddrValue]) (tptaddr.DialTptAddr, bool, error) {
45+
return tptaddr.NewDialTptAddr(val.GetValue(), o.dir.EstablishLinkSourcePeerId(), o.dir.EstablishLinkTargetPeerId()), true, nil
46+
},
47+
func(val directive.TransformedAttachedValue[tptaddr.LookupTptAddrValue, tptaddr.DialTptAddr]) func() {
48+
// spawn a new resolver for this
49+
// note: we won't return any values to the directive. we expect the link controller to do this.
50+
return handler.AddResolver(directive.NewTransformResolver[struct{}](
51+
o.c.bus,
52+
val.GetTransformedValue(),
53+
func(ctx context.Context, val directive.AttachedValue) (rval struct{}, rel func(), ok bool, err error) {
54+
return struct{}{}, nil, false, nil
55+
},
56+
), nil)
57+
},
58+
o.c.bus,
5259
tptaddr.NewLookupTptAddr(o.dir.EstablishLinkTargetPeerId()),
53-
bus.NewCallbackHandler(func(av directive.AttachedValue) {
54-
val, ok := av.GetValue().(tptaddr.LookupTptAddrValue)
55-
if !ok || val == "" {
56-
return
57-
}
58-
mtx.Lock()
59-
incomingDialers = append(incomingDialers, val)
60-
bcast.Broadcast()
61-
mtx.Unlock()
62-
}, func(av directive.AttachedValue) {
63-
val, ok := av.GetValue().(tptaddr.LookupTptAddrValue)
64-
if !ok || val == "" {
65-
return
66-
}
67-
mtx.Lock()
68-
delDialers = append(delDialers, val)
69-
bcast.Broadcast()
70-
mtx.Unlock()
71-
}, nil),
7260
)
7361
if err != nil {
7462
return err
7563
}
76-
defer lookupRef.Release()
64+
defer ref.Release()
7765

78-
// handle lookup becoming idle
79-
defer lookupDi.AddIdleCallback(func(resErrs []error) {
80-
mtx.Lock()
81-
if !lookupIdle {
82-
lookupIdle = true
83-
bcast.Broadcast()
66+
// mark idle based on the lookup tpt addr value directive
67+
// handle any non-nil resolver errors if isIdle
68+
errCh := make(chan error, 1)
69+
handleErr := func(err error) {
70+
select {
71+
case errCh <- err:
72+
default:
8473
}
85-
mtx.Unlock()
86-
})()
87-
88-
// The below fields are controlled by the below loop.
89-
type dialerInfo struct {
90-
// n is the number of references
91-
n int
92-
// relRef releases the reference
93-
relRef func()
94-
// idle indicates this dialer is idle
95-
idle atomic.Bool
9674
}
97-
dialers := make(map[string]*dialerInfo)
98-
99-
defer func() {
100-
mtx.Lock()
101-
for k, dialer := range dialers {
102-
if dialer.relRef != nil {
103-
dialer.relRef()
104-
}
105-
delete(dialers, k)
106-
}
107-
mtx.Unlock()
108-
}()
109-
110-
for {
111-
mtx.Lock()
112-
wait := bcast.GetWaitCh()
113-
for i := range incomingDialers {
114-
incomingTptAddr := incomingDialers[i]
115-
info := dialers[incomingTptAddr]
116-
if info == nil {
117-
info = &dialerInfo{n: 1}
118-
dialers[incomingTptAddr] = info
119-
dialInst, dialRef, err := o.c.bus.AddDirective(tptaddr.NewDialTptAddr(
120-
incomingTptAddr,
121-
o.dir.EstablishLinkSourcePeerId(),
122-
o.dir.EstablishLinkTargetPeerId(),
123-
), nil)
75+
defer di.AddIdleCallback(func(isIdle bool, resolverErrs []error) {
76+
if isIdle {
77+
for _, err := range resolverErrs {
12478
if err != nil {
125-
o.c.le.WithError(err).Warn("unable to dial transport address")
126-
} else {
127-
relIdleCb := dialInst.AddIdleCallback(func(_ []error) {
128-
if !info.idle.Swap(true) {
129-
bcast.Broadcast()
130-
}
131-
})
132-
info.relRef = func() {
133-
relIdleCb()
134-
dialRef.Release()
135-
}
79+
handleErr(err)
80+
return
13681
}
137-
} else {
138-
info.n++
13982
}
14083
}
141-
incomingDialers = nil
142-
143-
for i := range delDialers {
144-
delDialerID := delDialers[i]
145-
info, ok := dialers[delDialerID]
146-
if !ok {
147-
continue
148-
}
149-
info.n--
150-
if info.n <= 0 {
151-
if info.relRef != nil {
152-
info.relRef()
153-
}
154-
info.idle.Store(true)
155-
delete(dialers, delDialerID)
156-
}
157-
}
158-
delDialers = nil
159-
160-
// check if everything is idle
161-
if lookupIdle {
162-
allDialersIdle := true
163-
for _, dialer := range dialers {
164-
if !dialer.idle.Load() {
165-
allDialersIdle = false
166-
break
167-
}
168-
}
169-
if allDialersIdle {
170-
handler.MarkIdle()
171-
}
172-
}
173-
mtx.Unlock()
84+
handler.MarkIdle(isIdle)
85+
})()
86+
defer di.AddDisposeCallback(func() {
87+
handleErr(bus.ErrDirectiveDisposed)
88+
})()
17489

175-
select {
176-
case <-ctx.Done():
177-
return context.Canceled
178-
case <-wait:
179-
}
90+
select {
91+
case <-ctx.Done():
92+
return context.Canceled
93+
case err := <-errCh:
94+
return err
18095
}
18196
}
18297

0 commit comments

Comments
 (0)