Skip to content

Commit 3643a8f

Browse files
committed
Add docs / parameter checks
1 parent b585727 commit 3643a8f

File tree

6 files changed

+56
-17
lines changed

6 files changed

+56
-17
lines changed

libbeat/docs/queueconfig.asciidoc

+23
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,17 @@ queue.mem:
6767
flush.timeout: 5s
6868
------------------------------------------------------------------------------
6969

70+
Here is an alternate configuration that measures queue size in bytes rather
71+
than event count. In this case, the output must set `bulk_max_bytes`
72+
instead of `bulk_max_size` to control the batch size:
73+
74+
[source,yaml]
75+
------------------------------------------------------------------------------
76+
queue.mem:
77+
bytes: 32MB
78+
flush.timeout: 10s
79+
------------------------------------------------------------------------------
80+
7081
[float]
7182
=== Configuration options
7283

@@ -80,6 +91,16 @@ Number of events the queue can store.
8091

8192
The default value is 3200 events.
8293

94+
[float]
95+
[[queue-mem-bytes-option]]
96+
===== `bytes`
97+
98+
Number of bytes the queue can store. This option is only available for outputs
99+
that support byte-based event buffers (currently just the Elasticsearch output).
100+
The queue should set either `events` or `bytes` but not both.
101+
102+
The default is 0, indicating the queue should use the `events` limit instead.
103+
83104
[float]
84105
[[queue-mem-flush-min-events-option]]
85106
===== `flush.min_events`
@@ -92,6 +113,8 @@ publishing.
92113
If 0 or 1, sets the maximum number of events per batch to half the queue size, and sets
93114
the queue to synchronous mode (equivalent to `flush.timeout` of 0).
94115

116+
This value is ignored when `bytes` is set.
117+
95118
The default value is 1600.
96119

97120
[float]

libbeat/publisher/pipeline/client_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,12 @@ func TestClient(t *testing.T) {
8888
l := logp.L()
8989

9090
// a small in-memory queue with a very short flush interval
91-
q := memqueue.NewQueue(l, nil, memqueue.Settings{
91+
q, err := memqueue.NewQueue(l, nil, memqueue.Settings{
9292
Events: 5,
9393
MaxGetRequest: 1,
9494
FlushTimeout: time.Millisecond,
9595
}, 5, nil)
96+
require.NoError(t, err, "Queue creation must succeed")
9697

9798
// model a processor that we're going to make produce errors after
9899
p := &testProcessor{}
@@ -201,7 +202,8 @@ func TestClientWaitClose(t *testing.T) {
201202
}
202203
logp.TestingSetup()
203204

204-
q := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0, nil)
205+
q, err := memqueue.NewQueue(logp.L(), nil, memqueue.Settings{Events: 1}, 0, nil)
206+
require.NoError(t, err, "Queue creation must succeed")
205207
pipeline := makePipeline(Settings{}, q)
206208
defer pipeline.Close()
207209

libbeat/publisher/pipeline/controller.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,9 @@ func (c *outputController) createQueueIfNeeded(outGrp outputs.Group) {
286286
if err != nil {
287287
logger.Errorf("queue creation failed, falling back to default memory queue, check your queue configuration")
288288
s, _ := memqueue.SettingsForUserConfig(nil)
289-
queue = memqueue.NewQueue(logger, queueObserver, s, c.inputQueueSize, outGrp.EncoderFactory)
289+
// Memqueue creation can only fail when it's configured for byte-based limits,
290+
// so we don't need to handle the fallback error.
291+
queue, _ = memqueue.NewQueue(logger, queueObserver, s, c.inputQueueSize, outGrp.EncoderFactory)
290292
}
291293
c.queue = queue
292294

libbeat/publisher/queue/memqueue/broker.go

+16-9
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package memqueue
1919

2020
import (
2121
"context"
22+
"errors"
2223
"io"
2324
"time"
2425

@@ -142,7 +143,7 @@ func FactoryForSettings(settings Settings) queue.QueueFactory {
142143
inputQueueSize int,
143144
encoderFactory queue.EncoderFactory,
144145
) (queue.Queue, error) {
145-
return NewQueue(logger, observer, settings, inputQueueSize, encoderFactory), nil
146+
return NewQueue(logger, observer, settings, inputQueueSize, encoderFactory)
146147
}
147148
}
148149

@@ -155,14 +156,16 @@ func NewQueue(
155156
settings Settings,
156157
inputQueueSize int,
157158
encoderFactory queue.EncoderFactory,
158-
) *broker {
159-
b := newQueue(logger, observer, settings, inputQueueSize, encoderFactory)
159+
) (*broker, error) {
160+
b, err := newQueue(logger, observer, settings, inputQueueSize, encoderFactory)
160161

161-
// Start the queue workers
162-
go b.runLoop.run()
163-
go b.ackLoop.run()
162+
if err == nil {
163+
// Start the queue workers
164+
go b.runLoop.run()
165+
go b.ackLoop.run()
166+
}
164167

165-
return b
168+
return b, err
166169
}
167170

168171
// newQueue does most of the work of creating a queue from the given
@@ -175,7 +178,7 @@ func newQueue(
175178
settings Settings,
176179
inputQueueSize int,
177180
encoderFactory queue.EncoderFactory,
178-
) *broker {
181+
) (*broker, error) {
179182
if observer == nil {
180183
observer = queue.NewQueueObserver(nil)
181184
}
@@ -190,6 +193,10 @@ func newQueue(
190193
settings.MaxGetRequest = (settings.Events + 1) / 2
191194
}
192195

196+
if settings.Bytes > 0 && encoderFactory == nil {
197+
return nil, errors.New("queue.mem.bytes is set but the output doesn't support byte-based event buffers")
198+
}
199+
193200
// Can't request more than the full queue
194201
if settings.Events > 0 && settings.MaxGetRequest > settings.Events {
195202
settings.MaxGetRequest = settings.Events
@@ -222,7 +229,7 @@ func newQueue(
222229

223230
observer.MaxEvents(settings.Events)
224231

225-
return b
232+
return b, nil
226233
}
227234

228235
func (b *broker) Close() error {

libbeat/publisher/queue/memqueue/queue_test.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,13 @@ func TestProduceConsumer(t *testing.T) {
8686
// than 2 events to it, p.Publish will block, once we call q.Close,
8787
// we ensure the 3rd event was not successfully published.
8888
func TestProducerDoesNotBlockWhenQueueClosed(t *testing.T) {
89-
q := NewQueue(nil, nil,
89+
q, err := NewQueue(nil, nil,
9090
Settings{
9191
Events: 2, // Queue size
9292
MaxGetRequest: 1, // make sure the queue won't buffer events
9393
FlushTimeout: time.Millisecond,
9494
}, 0, nil)
95+
require.NoError(t, err, "Queue creation must succeed")
9596

9697
p := q.Producer(queue.ProducerConfig{
9798
// We do not read from the queue, so the callbacks are never called
@@ -156,12 +157,13 @@ func TestProducerClosePreservesEventCount(t *testing.T) {
156157

157158
var activeEvents atomic.Int64
158159

159-
q := NewQueue(nil, nil,
160+
q, err := NewQueue(nil, nil,
160161
Settings{
161162
Events: 3, // Queue size
162163
MaxGetRequest: 2,
163164
FlushTimeout: 10 * time.Millisecond,
164165
}, 1, nil)
166+
require.NoError(t, err, "Queue creation must succeed")
165167

166168
p := q.Producer(queue.ProducerConfig{
167169
ACK: func(count int) {
@@ -229,11 +231,12 @@ func TestProducerClosePreservesEventCount(t *testing.T) {
229231

230232
func makeTestQueue(sz, minEvents int, flushTimeout time.Duration) queuetest.QueueFactory {
231233
return func(_ *testing.T) queue.Queue {
232-
return NewQueue(nil, nil, Settings{
234+
q, _ := NewQueue(nil, nil, Settings{
233235
Events: sz,
234236
MaxGetRequest: minEvents,
235237
FlushTimeout: flushTimeout,
236238
}, 0, nil)
239+
return q
237240
}
238241
}
239242

libbeat/publisher/queue/memqueue/runloop_test.go

+4-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) {
3838
// available. This test verifies that Get requests that can be completely
3939
// filled do not wait for the flush timer.
4040

41-
broker := newQueue(
41+
broker, err := newQueue(
4242
logp.NewLogger("testing"),
4343
nil,
4444
Settings{
@@ -47,6 +47,7 @@ func TestFlushSettingsDoNotBlockFullBatches(t *testing.T) {
4747
FlushTimeout: 10 * time.Second,
4848
},
4949
10, nil)
50+
require.NoError(t, err, "Queue creation must succeed")
5051

5152
producer := newProducer(broker, nil, nil)
5253
rl := broker.runLoop
@@ -77,7 +78,7 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) {
7778
// there are enough events. This one uses the same setup to confirm that
7879
// Get requests are delayed if there aren't enough events.
7980

80-
broker := newQueue(
81+
broker, err := newQueue(
8182
logp.NewLogger("testing"),
8283
nil,
8384
Settings{
@@ -86,6 +87,7 @@ func TestFlushSettingsBlockPartialBatches(t *testing.T) {
8687
FlushTimeout: 10 * time.Second,
8788
},
8889
10, nil)
90+
require.NoError(t, err, "Queue creation must succeed")
8991

9092
producer := newProducer(broker, nil, nil)
9193
rl := broker.runLoop

0 commit comments

Comments
 (0)