From a5238ee8afd9d068acac34f04eef936ca9797db8 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Wed, 18 Dec 2024 11:01:57 -0500 Subject: [PATCH 1/3] Add upstream deadlock warning to the logstash output (#41960) Add an internal timeout in the Logstash output so that it logs an error when published events stop making progress for an extended time, which can indicate that the Logstash host is silently deadlocked. (cherry picked from commit 0e62bf8f03efc8bf5f408ec02aae3b0835b4c38c) # Conflicts: # libbeat/outputs/logstash/async.go # libbeat/outputs/logstash/sync.go --- libbeat/outputs/logstash/async.go | 51 ++++++++---- libbeat/outputs/logstash/deadlock.go | 95 +++++++++++++++++++++++ libbeat/outputs/logstash/deadlock_test.go | 51 ++++++++++++ libbeat/outputs/logstash/sync.go | 11 ++- 4 files changed, 189 insertions(+), 19 deletions(-) create mode 100644 libbeat/outputs/logstash/deadlock.go create mode 100644 libbeat/outputs/logstash/deadlock_test.go diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index f196d137b88e..83fd31f8180e 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -46,13 +46,14 @@ type asyncClient struct { } type msgRef struct { - client *asyncClient - count atomic.Uint32 - batch publisher.Batch - slice []publisher.Event - err error - win *window - batchSize int + client *asyncClient + count atomic.Uint32 + batch publisher.Batch + slice []publisher.Event + err error + win *window + batchSize int + deadlockListener *deadlockListener } func newAsyncClient( @@ -146,6 +147,7 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { } ref := &msgRef{ +<<<<<<< HEAD client: c, count: atomic.MakeUint32(1), batch: batch, @@ -153,6 +155,15 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { batchSize: len(events), win: c.win, err: nil, +======= + client: c, + batch: batch, + slice: events, + batchSize: len(events), + win: c.win, + err: nil, + deadlockListener: newDeadlockListener(c.log, logstashDeadlockTimeout), +>>>>>>> 0e62bf8f0 (Add upstream deadlock warning to the logstash output (#41960)) } defer ref.dec() @@ -229,6 +240,7 @@ func (c *asyncClient) getClient() *v2.AsyncClient { return client } +<<<<<<< HEAD func (r *msgRef) callback(seq uint32, err error) { if err != nil { r.fail(seq, err) @@ -239,24 +251,29 @@ func (r *msgRef) callback(seq uint32, err error) { func (r *msgRef) done(n uint32) { r.client.observer.Acked(int(n)) +======= +func (r *msgRef) callback(n uint32, err error) { + r.client.observer.AckedEvents(int(n)) +>>>>>>> 0e62bf8f0 (Add upstream deadlock warning to the logstash output (#41960)) r.slice = r.slice[n:] - if r.win != nil { - r.win.tryGrowWindow(r.batchSize) - } - r.dec() -} - -func (r *msgRef) fail(n uint32, err error) { + r.deadlockListener.ack(int(n)) if r.err == nil { r.err = err } - r.slice = r.slice[n:] + // If publishing is windowed, update the window size. if r.win != nil { - r.win.shrinkWindow() + if err != nil { + r.win.shrinkWindow() + } else { + r.win.tryGrowWindow(r.batchSize) + } } +<<<<<<< HEAD r.client.observer.Acked(int(n)) +======= +>>>>>>> 0e62bf8f0 (Add upstream deadlock warning to the logstash output (#41960)) r.dec() } @@ -266,6 +283,8 @@ func (r *msgRef) dec() { return } + r.deadlockListener.close() + if L := len(r.slice); L > 0 { r.client.observer.Failed(L) } diff --git a/libbeat/outputs/logstash/deadlock.go b/libbeat/outputs/logstash/deadlock.go new file mode 100644 index 000000000000..9a291baeda02 --- /dev/null +++ b/libbeat/outputs/logstash/deadlock.go @@ -0,0 +1,95 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logstash + +import ( + "time" + + "github.com/elastic/elastic-agent-libs/logp" +) + +type deadlockListener struct { + log *logp.Logger + timeout time.Duration + ticker *time.Ticker + + ackChan chan int + + doneChan chan struct{} +} + +const logstashDeadlockTimeout = 5 * time.Minute + +func newDeadlockListener(log *logp.Logger, timeout time.Duration) *deadlockListener { + if timeout <= 0 { + return nil + } + r := &deadlockListener{ + log: log, + timeout: timeout, + ticker: time.NewTicker(timeout), + + ackChan: make(chan int), + doneChan: make(chan struct{}), + } + go r.run() + return r +} + +func (r *deadlockListener) run() { + defer r.ticker.Stop() + defer close(r.doneChan) + for { + select { + case n, ok := <-r.ackChan: + if !ok { + // Listener has been closed + return + } + if n > 0 { + // If progress was made, reset the countdown. + r.ticker.Reset(r.timeout) + } + case <-r.ticker.C: + // No progress was made within the timeout, log error so users + // know there is likely a problem with the upstream host + r.log.Errorf("Logstash batch hasn't reported progress in the last %v, the Logstash host may be stalled. This problem can be prevented by configuring Logstash to use PipelineBusV1 or by upgrading Logstash to 8.17+, for details see https://github.com/elastic/logstash/issues/16657", r.timeout) + return + } + } +} + +func (r *deadlockListener) ack(n int) { + if r == nil { + return + } + // Send the new ack to the run loop, unless it has already shut down in + // which case it can be safely ignored. + select { + case r.ackChan <- n: + case <-r.doneChan: + } +} + +func (r *deadlockListener) close() { + if r == nil { + return + } + // Signal the run loop to shut down + close(r.ackChan) +} diff --git a/libbeat/outputs/logstash/deadlock_test.go b/libbeat/outputs/logstash/deadlock_test.go new file mode 100644 index 000000000000..15c3716b9971 --- /dev/null +++ b/libbeat/outputs/logstash/deadlock_test.go @@ -0,0 +1,51 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package logstash + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/logp" +) + +func TestDeadlockListener(t *testing.T) { + const timeout = 5 * time.Millisecond + log := logp.NewLogger("test") + listener := newDeadlockListener(log, timeout) + + // Verify that the listener doesn't trigger when receiving regular acks + for i := 0; i < 5; i++ { + time.Sleep(timeout / 2) + listener.ack(1) + } + select { + case <-listener.doneChan: + require.Fail(t, "Deadlock listener should not trigger unless there is no progress for the configured time interval") + case <-time.After(timeout / 2): + } + + // Verify that the listener does trigger when the acks stop + select { + case <-time.After(timeout): + require.Fail(t, "Deadlock listener should trigger when there is no progress for the configured time interval") + case <-listener.doneChan: + } +} diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index 22e133db906c..f46924f229b6 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -113,6 +113,8 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { return nil } + deadlockListener := newDeadlockListener(c.log, logstashDeadlockTimeout) + defer deadlockListener.close() for len(events) > 0 { // check if we need to reconnect if c.ticker != nil { @@ -146,14 +148,16 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { n, len(events), c.Host()) events = events[n:] +<<<<<<< HEAD st.Acked(n) +======= + st.AckedEvents(n) + deadlockListener.ack(n) +>>>>>>> 0e62bf8f0 (Add upstream deadlock warning to the logstash output (#41960)) if err != nil { // return batch to pipeline before reporting/counting error batch.RetryEvents(events) - if c.win != nil { - c.win.shrinkWindow() - } _ = c.Close() c.log.Errorf("Failed to publish events caused by: %+v", err) @@ -182,6 +186,7 @@ func (c *syncClient) publishWindowed(events []publisher.Event) (int, error) { n, err := c.sendEvents(events) if err != nil { + c.win.shrinkWindow() return n, err } From 92d281da116adccbe49a0591046e126c7bed7c55 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 19 Dec 2024 14:46:53 -0500 Subject: [PATCH 2/3] fix merge --- libbeat/outputs/logstash/async.go | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index 83fd31f8180e..e36b3c4c28bb 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -147,15 +147,6 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { } ref := &msgRef{ -<<<<<<< HEAD - client: c, - count: atomic.MakeUint32(1), - batch: batch, - slice: events, - batchSize: len(events), - win: c.win, - err: nil, -======= client: c, batch: batch, slice: events, @@ -163,7 +154,6 @@ func (c *asyncClient) Publish(_ context.Context, batch publisher.Batch) error { win: c.win, err: nil, deadlockListener: newDeadlockListener(c.log, logstashDeadlockTimeout), ->>>>>>> 0e62bf8f0 (Add upstream deadlock warning to the logstash output (#41960)) } defer ref.dec() From 01adc32239969a6b26c09678d5b76e298c8c7d4d Mon Sep 17 00:00:00 2001 From: Julien Lind Date: Mon, 30 Dec 2024 15:37:01 +0100 Subject: [PATCH 3/3] resolve mergify conflict --- libbeat/outputs/logstash/async.go | 19 ------------------- libbeat/outputs/logstash/sync.go | 4 ---- 2 files changed, 23 deletions(-) diff --git a/libbeat/outputs/logstash/async.go b/libbeat/outputs/logstash/async.go index e36b3c4c28bb..697771061e87 100644 --- a/libbeat/outputs/logstash/async.go +++ b/libbeat/outputs/logstash/async.go @@ -230,21 +230,8 @@ func (c *asyncClient) getClient() *v2.AsyncClient { return client } -<<<<<<< HEAD -func (r *msgRef) callback(seq uint32, err error) { - if err != nil { - r.fail(seq, err) - } else { - r.done(seq) - } -} - -func (r *msgRef) done(n uint32) { - r.client.observer.Acked(int(n)) -======= func (r *msgRef) callback(n uint32, err error) { r.client.observer.AckedEvents(int(n)) ->>>>>>> 0e62bf8f0 (Add upstream deadlock warning to the logstash output (#41960)) r.slice = r.slice[n:] r.deadlockListener.ack(int(n)) if r.err == nil { @@ -258,12 +245,6 @@ func (r *msgRef) callback(n uint32, err error) { r.win.tryGrowWindow(r.batchSize) } } -<<<<<<< HEAD - - r.client.observer.Acked(int(n)) - -======= ->>>>>>> 0e62bf8f0 (Add upstream deadlock warning to the logstash output (#41960)) r.dec() } diff --git a/libbeat/outputs/logstash/sync.go b/libbeat/outputs/logstash/sync.go index f46924f229b6..48549bfc1f09 100644 --- a/libbeat/outputs/logstash/sync.go +++ b/libbeat/outputs/logstash/sync.go @@ -148,12 +148,8 @@ func (c *syncClient) Publish(_ context.Context, batch publisher.Batch) error { n, len(events), c.Host()) events = events[n:] -<<<<<<< HEAD - st.Acked(n) -======= st.AckedEvents(n) deadlockListener.ack(n) ->>>>>>> 0e62bf8f0 (Add upstream deadlock warning to the logstash output (#41960)) if err != nil { // return batch to pipeline before reporting/counting error batch.RetryEvents(events)