forked from IBM/sarama
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmockbroker_test.go
167 lines (147 loc) · 4.18 KB
/
mockbroker_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
package sarama
import (
"encoding/binary"
"errors"
"io"
"net"
"strconv"
"testing"
"time"
)
// mockBroker is a mock Kafka broker. It consists of a TCP server on a kernel-selected localhost port that
// accepts a single connection. It reads Kafka requests from that connection and returns each response
// from the channel provided at creation-time (if a response has a len of 0, nothing is sent, if a response
// the server sleeps for 250ms instead of reading a request).
//
// When running tests with one of these, it is strongly recommended to specify a timeout to `go test` so that if the broker hangs
// waiting for a response, the test panics.
//
// It is not necessary to prefix message length or correlation ID to your response bytes, the server does that
// automatically as a convenience.
type mockBroker struct {
brokerID int32
port int32
stopper chan bool
expectations chan encoder
listener net.Listener
t *testing.T
latency time.Duration
}
func (b *mockBroker) SetLatency(latency time.Duration) {
b.latency = latency
}
func (b *mockBroker) BrokerID() int32 {
return b.brokerID
}
func (b *mockBroker) Port() int32 {
return b.port
}
func (b *mockBroker) Addr() string {
return b.listener.Addr().String()
}
func (b *mockBroker) Close() {
if len(b.expectations) > 0 {
b.t.Errorf("Not all expectations were satisfied in mockBroker with ID=%d! Still waiting on %d", b.BrokerID(), len(b.expectations))
}
close(b.expectations)
<-b.stopper
}
func (b *mockBroker) serverLoop() (ok bool) {
var (
err error
conn net.Conn
)
defer close(b.stopper)
if conn, err = b.listener.Accept(); err != nil {
return b.serverError(err, conn)
}
reqHeader := make([]byte, 4)
resHeader := make([]byte, 8)
for expectation := range b.expectations {
_, err = io.ReadFull(conn, reqHeader)
if err != nil {
return b.serverError(err, conn)
}
body := make([]byte, binary.BigEndian.Uint32(reqHeader))
if len(body) < 10 {
return b.serverError(errors.New("Kafka request too short."), conn)
}
if _, err = io.ReadFull(conn, body); err != nil {
return b.serverError(err, conn)
}
if b.latency > 0 {
time.Sleep(b.latency)
}
response, err := encode(expectation)
if err != nil {
return false
}
if len(response) == 0 {
continue
}
binary.BigEndian.PutUint32(resHeader, uint32(len(response)+4))
binary.BigEndian.PutUint32(resHeader[4:], binary.BigEndian.Uint32(body[4:]))
if _, err = conn.Write(resHeader); err != nil {
return b.serverError(err, conn)
}
if _, err = conn.Write(response); err != nil {
return b.serverError(err, conn)
}
}
if err = conn.Close(); err != nil {
return b.serverError(err, nil)
}
if err = b.listener.Close(); err != nil {
b.t.Error(err)
return false
}
return true
}
func (b *mockBroker) serverError(err error, conn net.Conn) bool {
b.t.Error(err)
if conn != nil {
if err := conn.Close(); err != nil {
b.t.Error(err)
}
}
if err := b.listener.Close(); err != nil {
b.t.Error(err)
}
return false
}
// newMockBroker launches a fake Kafka broker. It takes a *testing.T as provided by the
// test framework and a channel of responses to use. If an error occurs it is
// simply logged to the *testing.T and the broker exits.
func newMockBroker(t *testing.T, brokerID int32) *mockBroker {
return newMockBrokerAddr(t, brokerID, "localhost:0")
}
// newMockBrokerAddr behaves like newMockBroker but listens on the address you give
// it rather than just some ephemeral port.
func newMockBrokerAddr(t *testing.T, brokerID int32, addr string) *mockBroker {
var err error
broker := &mockBroker{
stopper: make(chan bool),
t: t,
brokerID: brokerID,
expectations: make(chan encoder, 512),
}
broker.listener, err = net.Listen("tcp", addr)
if err != nil {
t.Fatal(err)
}
Logger.Printf("mockbroker/%d listening on %s\n", brokerID, broker.listener.Addr().String())
_, portStr, err := net.SplitHostPort(broker.listener.Addr().String())
if err != nil {
t.Fatal(err)
}
tmp, err := strconv.ParseInt(portStr, 10, 32)
if err != nil {
t.Fatal(err)
}
broker.port = int32(tmp)
go broker.serverLoop()
return broker
}
func (b *mockBroker) Returns(e encoder) {
b.expectations <- e
}