Skip to content

Commit b585727

Browse files
committed
fix merge + tests
1 parent 5aa42eb commit b585727

File tree

2 files changed

+14
-5
lines changed

2 files changed

+14
-5
lines changed

libbeat/publisher/queue/memqueue/runloop.go

+11-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package memqueue
1919

2020
import (
21-
"fmt"
2221
"time"
2322

2423
"github.com/elastic/beats/v7/libbeat/common/fifo"
@@ -153,8 +152,9 @@ func (l *runLoop) runIteration() {
153152
select {
154153
case <-l.broker.closeChan:
155154
l.closing = true
156-
// Get requests are handled immediately during shutdown
155+
// Get and push requests are handled immediately during shutdown
157156
l.maybeUnblockGetRequest()
157+
l.maybeUnblockPushRequests()
158158

159159
case <-l.broker.ctx.Done():
160160
// The queue is fully shut down, do nothing
@@ -205,11 +205,10 @@ func (l *runLoop) getRequestShouldBlock(req *getRequest) bool {
205205
// limit) or if we have at least the requested number available.
206206
if l.broker.useByteLimits() {
207207
availableBytes := l.byteCount - l.consumedByteCount
208-
return req.byteCount <= 0 || availableBytes >= req.byteCount
208+
return req.byteCount > 0 && availableBytes < req.byteCount
209209
}
210210
availableEntries := l.eventCount - l.consumedEventCount
211-
fmt.Printf("hi fae, getRequestShouldBlock for %v entries while there are %v available\n", req.entryCount, availableEntries)
212-
return req.entryCount <= 0 || availableEntries >= req.entryCount
211+
return req.entryCount > 0 && availableEntries < req.entryCount
213212
}
214213

215214
// Respond to the given get request without blocking or waiting for more events
@@ -316,7 +315,14 @@ func (l *runLoop) canFitPushRequest(req pushRequest) bool {
316315
func (l *runLoop) maybeUnblockPushRequests() {
317316
for !l.pendingPushRequests.Empty() {
318317
req := l.pendingPushRequests.First()
318+
if l.closing {
319+
// If the queue is closing, reject all pending requests
320+
req.resp <- false
321+
continue
322+
}
319323
if !l.canFitPushRequest(req) {
324+
// We're out of space, the rest of the blocked requests will have
325+
// to wait.
320326
break
321327
}
322328
l.doInsert(req)

libbeat/publisher/queue/memqueue/runloop_test.go

+3
Original file line numberDiff line numberDiff line change
@@ -124,10 +124,12 @@ func TestObserverAddEvent(t *testing.T) {
124124
rl := &runLoop{
125125
observer: queue.NewQueueObserver(reg),
126126
buf: newCircularBuffer(100),
127+
broker: &broker{},
127128
}
128129
request := pushRequest{
129130
event: publisher.Event{},
130131
eventSize: 123,
132+
resp: make(chan bool, 1),
131133
}
132134
rl.doInsert(request)
133135
assertRegistryUint(t, reg, "queue.added.events", 1, "Queue insert should report added event")
@@ -143,6 +145,7 @@ func TestObserverConsumeEvents(t *testing.T) {
143145
observer: queue.NewQueueObserver(reg),
144146
buf: newCircularBuffer(bufSize),
145147
eventCount: 50,
148+
broker: &broker{},
146149
}
147150
// Initialize the queue entries to a test byte size
148151
for i := 0; i < bufSize; i++ {

0 commit comments

Comments
 (0)