From b9e0f0243cfd7a4277b421e8c1184dfbc023a2d0 Mon Sep 17 00:00:00 2001 From: Orestis Floros Date: Mon, 3 Mar 2025 18:46:18 +0100 Subject: [PATCH 1/4] filebeat: make deep copy before notifying of config change Fixes #42815 --- filebeat/beater/filebeat.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index dbab9a77de5e..f4f6a5d377cb 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -329,7 +329,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return nil } - stateStore.notifier.Notify(outCfg.Config()) + c, err := conf.NewConfigFrom(outCfg.Config()) + if err != nil { + return nil + } + stateStore.notifier.Notify(c) return nil }) } From f8b4631e230ce25682084269f21e65486b7ddfb9 Mon Sep 17 00:00:00 2001 From: Orestis Floros Date: Mon, 3 Mar 2025 18:56:35 +0100 Subject: [PATCH 2/4] comment --- filebeat/beater/filebeat.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index f4f6a5d377cb..d1e904a89ab8 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -329,11 +329,14 @@ func (fb *Filebeat) Run(b *beat.Beat) error { return nil } - c, err := conf.NewConfigFrom(outCfg.Config()) + // Create a new config with the output configuration. Since r.Config is a pointer, a copy is required to + // avoid concurrent map read and write. + // See https://github.com/elastic/beats/issues/42815 + configCopy, err := conf.NewConfigFrom(outCfg.Config()) if err != nil { return nil } - stateStore.notifier.Notify(c) + stateStore.notifier.Notify(configCopy) return nil }) } From 2ff73a7486e0fc5f3d35fe1d1ce2a6c5b235b6e3 Mon Sep 17 00:00:00 2001 From: Orestis Floros Date: Mon, 3 Mar 2025 18:57:00 +0100 Subject: [PATCH 3/4] log --- filebeat/beater/filebeat.go | 1 + 1 file changed, 1 insertion(+) diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index d1e904a89ab8..7c9229cd09b5 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -334,6 +334,7 @@ func (fb *Filebeat) Run(b *beat.Beat) error { // See https://github.com/elastic/beats/issues/42815 configCopy, err := conf.NewConfigFrom(outCfg.Config()) if err != nil { + logp.Err("Failed to create a new config from the output config: %v", err) return nil } stateStore.notifier.Notify(configCopy) From ae73a67d57c4fa69a9ab450c1a762887c8c6b6eb Mon Sep 17 00:00:00 2001 From: Orestis Floros Date: Mon, 3 Mar 2025 18:59:05 +0100 Subject: [PATCH 4/4] info logs --- libbeat/statestore/backend/es/store.go | 2 +- x-pack/filebeat/tests/integration/managerV2_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/libbeat/statestore/backend/es/store.go b/libbeat/statestore/backend/es/store.go index fee1e0c9ba48..d4a6fd130853 100644 --- a/libbeat/statestore/backend/es/store.go +++ b/libbeat/statestore/backend/es/store.go @@ -296,7 +296,7 @@ func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error } func (s *store) configure(ctx context.Context, c *conf.C) { - s.log.Debugf("Configure ES store") + s.log.Info("Configuring ES store") s.mx.Lock() defer s.mx.Unlock() diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index 84f6ad0f54bb..a05ee01570e0 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -930,7 +930,7 @@ func TestHTTPJSONInputReloadUnderElasticAgentWithElasticStateStore(t *testing.T) ) for _, contains := range []string{ - "Configure ES store", + "Configuring ES store", "input-cursor::openStore: prefix: httpjson inputID: " + inputID, "input-cursor store read 0 keys", // first, no previous data exists "input-cursor store read 1 keys", // after the restart, previous key is read