-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathinproc_test.go
154 lines (136 loc) · 3.63 KB
/
inproc_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package inproc
import (
"context"
"testing"
"github.com/aperturerobotics/bifrost/link"
"github.com/aperturerobotics/bifrost/peer"
"github.com/aperturerobotics/bifrost/stream"
stream_echo "github.com/aperturerobotics/bifrost/stream/echo"
"github.com/aperturerobotics/bifrost/testbed"
"github.com/aperturerobotics/bifrost/transport/common/dialer"
transport_controller "github.com/aperturerobotics/bifrost/transport/controller"
"github.com/aperturerobotics/controllerbus/bus"
"github.com/aperturerobotics/controllerbus/controller/loader"
"github.com/aperturerobotics/controllerbus/controller/resolver"
"github.com/aperturerobotics/controllerbus/directive"
"github.com/sirupsen/logrus"
)
// buildTestbed builds a new testbed for udp.
func buildTestbed(t *testing.T, ctx context.Context) (*testbed.Testbed, *logrus.Entry) {
log := logrus.New()
log.SetLevel(logrus.DebugLevel)
le := logrus.NewEntry(log)
tb, err := testbed.NewTestbed(ctx, le, testbed.TestbedOpts{})
if err != nil {
t.Fatal(err.Error())
}
tb.StaticResolver.AddFactory(NewFactory(tb.Bus))
return tb, le
}
func execPeer(ctx context.Context, t *testing.T, tb *testbed.Testbed, conf *Config) (
*transport_controller.Controller,
*Inproc,
directive.Reference,
) {
peerId, err := peer.IDFromPrivateKey(tb.PrivKey)
if err != nil {
t.Fatal(err.Error())
}
if conf == nil {
conf = &Config{}
}
conf.TransportPeerId = peerId.String()
tpci1, _, tp1Ref, err := loader.WaitExecControllerRunning(
ctx,
tb.Bus,
resolver.NewLoadControllerWithConfig(conf),
nil,
)
if err != nil {
t.Fatal(err.Error())
}
tpc1 := tpci1.(*transport_controller.Controller)
tpt1, err := tpc1.GetTransport(ctx)
if err != nil {
t.Fatal(err.Error())
}
return tpc1, tpt1.(*Inproc), tp1Ref
}
// TestEstablishLink tests creating a UDP link with two in-memory nodes.
func TestEstablishLink(t *testing.T) {
ctx, ctxCancel := context.WithCancel(context.Background())
defer ctxCancel()
tb1, le1 := buildTestbed(t, ctx)
le1 = le1.WithField("testbed", 0)
tb2, le2 := buildTestbed(t, ctx)
le2 = le2.WithField("testbed", 1)
tpc1, tp1, tp1Ref := execPeer(ctx, t, tb1, nil)
peerId1 := tp1.GetPeerID()
defer tp1Ref.Release()
tpc2, tp2, tp2Ref := execPeer(ctx, t, tb2, &Config{
Dialers: map[string]*dialer.DialerOpts{
peerId1.String(): {
Address: tp1.LocalAddr().String(),
},
},
})
peerId2 := tp2.GetPeerID()
defer tp2Ref.Release()
le1.Infof("constructed peer 1 with id %s", peerId1.String())
le2.Infof("constructed peer 2 with id %s", peerId2.String())
tp2.ConnectToInproc(ctx, tp1)
tp1.ConnectToInproc(ctx, tp2)
// Attempt to open a link between them.
lnk2to1, _, lnk1Ref, err := bus.ExecOneOff(
ctx,
tb2.Bus,
link.NewEstablishLinkWithPeer("", peerId1),
nil,
nil,
)
if err != nil {
t.Fatal(err.Error())
}
defer lnk1Ref.Release()
le1.Infof(
"opened link from 2 -> 1 with id %v",
lnk2to1.GetValue().(link.Link).GetUUID(),
)
msv1, _, ms1Ref, err := bus.ExecOneOff(
ctx,
tb1.Bus,
link.NewOpenStreamWithPeer(
stream_echo.DefaultProtocolID,
peerId1,
peerId2,
0,
stream.OpenOpts{},
),
nil,
nil,
)
if err != nil {
t.Fatal(err.Error())
}
defer ms1Ref.Release()
mns1 := msv1.GetValue().(link.MountedStream)
ms1 := mns1.GetStream()
data := []byte("testing 1234")
_, err = ms1.Write(data)
if err != nil {
t.Fatal(err.Error())
}
outData := make([]byte, len(data)*2)
on, oe := ms1.Read(outData)
if oe != nil {
t.Fatal(oe.Error())
}
if on != len(data) {
t.Fatalf("length incorrect received %v != %v", on, len(data))
}
outData = outData[:on]
le1.Infof("echoed data successfully: %v", string(outData))
ms1Ref.Release()
_ = tpc1
_ = tpc2
}