From 53e12e4df68692d01a2c81d02dfe1f93089bccef Mon Sep 17 00:00:00 2001 From: Orestis Floros Date: Thu, 6 Mar 2025 13:23:43 +0100 Subject: [PATCH] filebeat: make deep copy before notifying of config change (#42992) This prevents concurrent read & write map access. Unrelated, but I've escalated one log line to Info to allow for easier verifying that ES store is being used from agent logs. Fixes #42815 (cherry picked from commit f1e42fcaf3e3d3f36e9cf7ded219650aaa6a8114) # Conflicts: # filebeat/beater/filebeat.go # libbeat/statestore/backend/es/store.go # x-pack/filebeat/tests/integration/managerV2_test.go --- filebeat/beater/filebeat.go | 32 ++ libbeat/statestore/backend/es/store.go | 340 ++++++++++++++++++ .../tests/integration/managerV2_test.go | 273 ++++++++++++++ 3 files changed, 645 insertions(+) create mode 100644 libbeat/statestore/backend/es/store.go diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 815b6fabfde2..7d265e963733 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -291,6 +291,38 @@ func (fb *Filebeat) Run(b *beat.Beat) error { } defer stateStore.Close() +<<<<<<< HEAD +======= + // If notifier is set, configure the listener for output configuration + // The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage + // in order to allow it fully configure + if stateStore.notifier != nil { + b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error { + outCfg := conf.Namespace{} + if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" { + logp.Err("Failed to unpack the output config: %v", err) + return nil + } + + // Create a new config with the output configuration. Since r.Config is a pointer, a copy is required to + // avoid concurrent map read and write. + // See https://github.com/elastic/beats/issues/42815 + configCopy, err := conf.NewConfigFrom(outCfg.Config()) + if err != nil { + logp.Err("Failed to create a new config from the output config: %v", err) + return nil + } + stateStore.notifier.Notify(configCopy) + return nil + }) + } + + err = filestream.ValidateInputIDs(config.Inputs, logp.NewLogger("input.filestream")) + if err != nil { + logp.Err("invalid filestream configuration: %+v", err) + return err + } +>>>>>>> f1e42fcaf (filebeat: make deep copy before notifying of config change (#42992)) err = processLogInputTakeOver(stateStore, config) if err != nil { logp.Err("Failed to attempt filestream state take over: %+v", err) diff --git a/libbeat/statestore/backend/es/store.go b/libbeat/statestore/backend/es/store.go new file mode 100644 index 000000000000..d4a6fd130853 --- /dev/null +++ b/libbeat/statestore/backend/es/store.go @@ -0,0 +1,340 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package es + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "sync" + "time" + + "github.com/elastic/beats/v7/libbeat/common/transform/typeconv" + "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" + "github.com/elastic/beats/v7/libbeat/statestore/backend" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +// The current typical usage of the state storage is such that the consumer +// of the storage fetches all the keys and caches them at the start of the beat. +// Then the key value gets updated (Set is called) possibly frequently, so we want these operations to happen fairly fast +// and not block waiting on Elasticsearch refresh, thus the slight trade-off for performance instead of consistency. +// The value is not normally retrieved after a modification, so the inconsistency (potential refresh delay) is acceptable for our use cases. +// +// If consistency becomes a strict requirement, the storage would need to implement possibly some caching mechanism +// that would guarantee the consistency between Set/Remove/Get/Each operations at least for a given "in-memory" instance of the storage. + +type store struct { + ctx context.Context + cn context.CancelFunc + log *logp.Logger + name string + index string + notifier *Notifier + + chReady chan struct{} + once sync.Once + + mx sync.Mutex + cli *eslegclient.Connection + cliErr error +} + +const docType = "_doc" + +func openStore(ctx context.Context, log *logp.Logger, name string, notifier *Notifier) (*store, error) { + ctx, cn := context.WithCancel(ctx) + s := &store{ + ctx: ctx, + cn: cn, + log: log.With("name", name).With("backend", "elasticsearch"), + name: name, + index: renderIndexName(name), + notifier: notifier, + chReady: make(chan struct{}), + } + + chCfg := make(chan *conf.C) + + unsubFn := s.notifier.Subscribe(func(c *conf.C) { + select { + case chCfg <- c: + case <-ctx.Done(): + } + }) + + go s.loop(ctx, cn, unsubFn, chCfg) + + return s, nil +} + +func renderIndexName(name string) string { + return "agentless-state-" + name +} + +func (s *store) waitReady() error { + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case <-s.chReady: + return s.cliErr + } +} + +func (s *store) SetID(id string) { + s.mx.Lock() + defer s.mx.Unlock() + + if id == "" { + return + } + s.index = renderIndexName(id) +} + +func (s *store) Close() error { + s.mx.Lock() + defer s.mx.Unlock() + + if s.cn != nil { + s.cn() + } + + if s.cli != nil { + err := s.cli.Close() + s.cli = nil + return err + } + return nil +} + +func (s *store) Has(key string) (bool, error) { + if err := s.waitReady(); err != nil { + return false, err + } + s.mx.Lock() + defer s.mx.Unlock() + + var v interface{} + err := s.get(key, v) + if err != nil { + if errors.Is(err, ErrKeyUnknown) { + return false, nil + } + return false, err + } + return true, nil +} + +func (s *store) Get(key string, to interface{}) error { + if err := s.waitReady(); err != nil { + return err + } + s.mx.Lock() + defer s.mx.Unlock() + + return s.get(key, to) +} + +func (s *store) get(key string, to interface{}) error { + status, data, err := s.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", s.index, docType, url.QueryEscape(key)), "", nil, nil) + + if err != nil { + if status == http.StatusNotFound { + return ErrKeyUnknown + } + return err + } + + var qr queryResult + err = json.Unmarshal(data, &qr) + if err != nil { + return err + } + + err = json.Unmarshal(qr.Source.Value, to) + if err != nil { + return err + } + return nil +} + +type queryResult struct { + Found bool `json:"found"` + Source struct { + Value json.RawMessage `json:"v"` + } `json:"_source"` +} + +type doc struct { + Value any `struct:"v"` + UpdatedAt any `struct:"updated_at"` +} + +type entry struct { + value interface{} +} + +func (e entry) Decode(to interface{}) error { + return typeconv.Convert(to, e.value) +} + +func renderRequest(val interface{}) doc { + return doc{ + Value: val, + UpdatedAt: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"), + } +} + +func (s *store) Set(key string, value interface{}) error { + if err := s.waitReady(); err != nil { + return err + } + s.mx.Lock() + defer s.mx.Unlock() + + doc := renderRequest(value) + _, _, err := s.cli.Request("PUT", fmt.Sprintf("/%s/%s/%s", s.index, docType, url.QueryEscape(key)), "", nil, doc) + if err != nil { + return err + } + return nil +} + +func (s *store) Remove(key string) error { + if err := s.waitReady(); err != nil { + return err + } + s.mx.Lock() + defer s.mx.Unlock() + + _, _, err := s.cli.Delete(s.index, docType, url.QueryEscape(key), nil) + if err != nil { + return err + } + return nil +} + +type searchResult struct { + ID string `json:"_id"` + Source struct { + Value json.RawMessage `json:"v"` + } `json:"_source"` +} + +func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error { + if err := s.waitReady(); err != nil { + return err + } + + s.mx.Lock() + defer s.mx.Unlock() + + // Do nothing for now if the store was not initialized + if s.cli == nil { + return nil + } + + status, result, err := s.cli.SearchURIWithBody(s.index, "", nil, map[string]any{ + "query": map[string]any{ + "match_all": map[string]any{}, + }, + "size": 1000, // TODO: we might have to do scroll if there are more than 1000 keys + }) + + if err != nil && status != http.StatusNotFound { + return err + } + + if result == nil || len(result.Hits.Hits) == 0 { + return nil + } + + for _, hit := range result.Hits.Hits { + var sres searchResult + err = json.Unmarshal(hit, &sres) + if err != nil { + return err + } + + var e entry + err = json.Unmarshal(sres.Source.Value, &e.value) + if err != nil { + return err + } + + key, err := url.QueryUnescape(sres.ID) + if err != nil { + return err + } + + cont, err := fn(key, e) + if !cont || err != nil { + return err + } + } + + return nil +} + +func (s *store) configure(ctx context.Context, c *conf.C) { + s.log.Info("Configuring ES store") + s.mx.Lock() + defer s.mx.Unlock() + + if s.cli != nil { + _ = s.cli.Close() + s.cli = nil + } + s.cliErr = nil + + cli, err := eslegclient.NewConnectedClient(ctx, c, s.name) + if err != nil { + s.log.Errorf("ES store, failed to create elasticsearch client: %v", err) + s.cliErr = err + } else { + s.cli = cli + } + + // Signal store is ready + s.once.Do(func() { + close(s.chReady) + }) + +} + +func (s *store) loop(ctx context.Context, cn context.CancelFunc, unsubFn UnsubscribeFunc, chCfg chan *conf.C) { + defer cn() + + // Unsubscribe on exit + defer unsubFn() + + defer s.log.Debug("ES store exit main loop") + + for { + select { + case <-ctx.Done(): + return + case cu := <-chCfg: + s.configure(ctx, cu) + } + } +} diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go index 000fc30eea3f..3900e1ce8f25 100644 --- a/x-pack/filebeat/tests/integration/managerV2_test.go +++ b/x-pack/filebeat/tests/integration/managerV2_test.go @@ -830,3 +830,276 @@ func writeStartUpInfo(t *testing.T, w io.Writer, info *proto.StartUpInfo) { _, err = w.Write(infoBytes) require.NoError(t, err, "failed to write connection information") } +<<<<<<< HEAD +======= + +// Response structure for JSON +type response struct { + Message string `json:"message"` + Published string `json:"published"` +} + +func TestHTTPJSONInputReloadUnderElasticAgentWithElasticStateStore(t *testing.T) { + // First things first, ensure ES is running and we can connect to it. + // If ES is not running, the test will timeout and the only way to know + // what caused it is going through Filebeat's logs. + integration.EnsureESIsRunning(t) + + // Create a test httpjson server for httpjson input + h := serverHelper{t: t} + defer func() { + assert.GreaterOrEqual(t, h.called, 2, "HTTP server should be called at least twice") + }() + testServer := httptest.NewServer(http.HandlerFunc(h.handler)) + defer testServer.Close() + + inputID := "httpjson-generic-" + uuid.Must(uuid.NewV4()).String() + inputUnit := &proto.UnitExpected{ + Id: inputID, + Type: proto.UnitType_INPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: inputID, + Source: tests.RequireNewStruct(map[string]any{ + "id": inputID, + "type": "httpjson", + "name": "httpjson-1", + "enabled": true, + }), + Type: "httpjson", + Name: "httpjson-1", + Streams: []*proto.Stream{ + { + Id: inputID, + Source: integration.RequireNewStruct(t, map[string]any{ + "id": inputID, + "enabled": true, + "type": "httpjson", + "interval": "5s", + "request.url": testServer.URL, + "request.method": "GET", + "request.transforms": []any{ + map[string]any{ + "set": map[string]any{ + "target": "url.params.since", + "value": "[[.cursor.published]]", + "default": `[[formatDate (now (parseDuration "-24h")) "RFC3339"]]`, + }, + }, + }, + "cursor": map[string]any{ + "published": map[string]any{ + "value": "[[.last_event.published]]", + }, + }, + }), + }, + }, + }, + } + units := [][]*proto.UnitExpected{ + {outputUnitES(t, 1), inputUnit}, + {outputUnitES(t, 2), inputUnit}, + } + + idx := 0 + waiting := false + when := time.Now() + + final := atomic.Bool{} + nextState := func() { + if waiting { + if time.Now().After(when) { + t.Log("Next state") + idx = (idx + 1) % len(units) + waiting = false + h.notifyChange() + return + } + return + } + waiting = true + when = time.Now().Add(10 * time.Second) + } + + server := &mock.StubServerV2{ + CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { + if management.DoesStateMatch(observed, units[idx], 0) { + if idx < len(units)-1 { + nextState() + } else { + final.Store(true) + } + } + for _, unit := range observed.GetUnits() { + expected := []proto.State{proto.State_HEALTHY, proto.State_CONFIGURING, proto.State_STARTING} + if !waiting { + expected = append(expected, proto.State_STOPPING) + } + require.Containsf(t, expected, unit.GetState(), "Unit '%s' is not healthy, state: %s", unit.GetId(), unit.GetState().String()) + } + return &proto.CheckinExpected{ + Units: units[idx], + } + }, + ActionImpl: func(response *proto.ActionResponse) error { return nil }, + } + + require.NoError(t, server.Start()) + t.Cleanup(server.Stop) + + t.Setenv("AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES", "httpjson,cel") + filebeat := NewFilebeat(t) + filebeat.RestartOnBeatOnExit = true + filebeat.Start( + "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port), + "-E", "management.enabled=true", + "-E", "management.restart_on_output_change=true", + ) + + for _, contains := range []string{ + "Configuring ES store", + "input-cursor::openStore: prefix: httpjson inputID: " + inputID, + "input-cursor store read 0 keys", // first, no previous data exists + "input-cursor store read 1 keys", // after the restart, previous key is read + } { + checkFilebeatLogs(t, filebeat, contains) + } + + require.Eventually(t, + final.Load, + waitDeadlineOr5Min(t), + 100*time.Millisecond, + "Failed to reach the final state", + ) +} + +type serverHelper struct { + t *testing.T + lock sync.Mutex + previous time.Time + called int + stateChanged bool +} + +func (h *serverHelper) verifyTime(since time.Time) time.Time { + h.lock.Lock() + defer h.lock.Unlock() + + h.called++ + + if h.previous.IsZero() { + assert.WithinDurationf(h.t, time.Now().Add(-24*time.Hour), since, 15*time.Minute, "since should be ~24h ago") + } else { + // XXX: `since` field is expected to be equal to the last published time. However, between unit restarts, the last + // updated field might not be persisted successfully. As a workaround, we allow a larger delta between restarts. + // However, we are still checking that the `since` field is not too far in the past, like 24h ago which is the + // initial value. + assert.WithinDurationf(h.t, h.previous, since, h.getDelta(since), "since should re-use last value") + } + h.previous = time.Now() + return h.previous +} + +func (h *serverHelper) getDelta(actual time.Time) time.Duration { + const delta = 1 * time.Second + if !h.stateChanged { + return delta + } + + dt := h.previous.Sub(actual) + if dt < -delta || dt > delta { + h.stateChanged = false + return time.Minute + } + return delta +} + +func (h *serverHelper) handler(w http.ResponseWriter, r *http.Request) { + since := parseParams(h.t, r.RequestURI) + published := h.verifyTime(since) + + w.Header().Set("Content-Type", "application/json") + err := json.NewEncoder(w).Encode(response{ + Message: "Hello", + Published: published.Format(time.RFC3339), + }) + require.NoError(h.t, err) +} + +func (h *serverHelper) notifyChange() { + h.lock.Lock() + defer h.lock.Unlock() + h.stateChanged = true +} + +func parseParams(t *testing.T, uri string) time.Time { + myUrl, err := url.Parse(uri) + require.NoError(t, err) + params, err := url.ParseQuery(myUrl.RawQuery) + require.NoError(t, err) + since := params["since"] + require.NotEmpty(t, since) + sinceStr := since[0] + sinceTime, err := time.Parse(time.RFC3339, sinceStr) + require.NoError(t, err) + return sinceTime +} + +func checkFilebeatLogs(t *testing.T, filebeat *integration.BeatProc, contains string) { + t.Helper() + const tick = 100 * time.Millisecond + + require.Eventually(t, + func() bool { return filebeat.LogContains(contains) }, + waitDeadlineOr5Min(t), + tick, + fmt.Sprintf("String '%s' not found on Filebeat logs", contains), + ) +} + +// waitDeadlineOr5Min looks at the test deadline and returns a reasonable value of waiting for a condition to be met. +// The possible values are: +// - if no test deadline is set, return 5 minutes +// - if a deadline is set and there is less than 0.5 second left, return the time left +// - otherwise return the time left minus 0.5 second. +func waitDeadlineOr5Min(t *testing.T) time.Duration { + deadline, deadlineSet := t.Deadline() + if !deadlineSet { + return 5 * time.Minute + } + left := time.Until(deadline) + final := left - 500*time.Millisecond + if final <= 0 { + return left + } + return final +} + +func outputUnitES(t *testing.T, id int) *proto.UnitExpected { + return &proto.UnitExpected{ + Id: fmt.Sprintf("output-unit-%d", id), + Type: proto.UnitType_OUTPUT, + ConfigStateIdx: 1, + State: proto.State_HEALTHY, + LogLevel: proto.UnitLogLevel_DEBUG, + Config: &proto.UnitExpectedConfig{ + Id: "default", + Type: "elasticsearch", + Name: fmt.Sprintf("elasticsearch-%d", id), + Source: integration.RequireNewStruct(t, + map[string]interface{}{ + "type": "elasticsearch", + "hosts": []interface{}{"http://localhost:9200"}, + "username": "admin", + "password": "testing", + "protocol": "http", + "enabled": true, + "allow_older_versions": true, + }), + }, + } +} +>>>>>>> f1e42fcaf (filebeat: make deep copy before notifying of config change (#42992))