From 911f5272c53e5af1ce117a8301403a7a818d0adb Mon Sep 17 00:00:00 2001
From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com>
Date: Wed, 29 Jan 2025 12:36:16 +0000
Subject: [PATCH] [8.x](backport #41446) [filebeat] Elasticsearch state storage
for httpjson and cel inputs (#42451)
This enables Elasticsearch as State Store Backend for Security Integrations for
the Agentless solution.
The scope of this change was narrowed down to supporting only `httpjson` inputs
in order to support Okta integration for the initial release. All the other
integrations inputs still use the file storage as before.
This is a short term solution for the state storage for k8s.
The feature currently can only be enabled with the
`AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES` env var.
The existing code relied on the inputs state storage to be fully configurable
before the main beat managers runs. The change delays the configuration of
`httpjson` input to the time when the actual configuration is received from the
Agent.
Example of the state storage index content for Okta integration:
```
{
"took": 6,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "agentless-state-httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959",
"_id": "httpjson::httpjson-okta.system-028ecf4b-babe-44c6-939e-9e3096af6959::https://dev-36006609.okta.com/api/v1/logs",
"_seq_no": 39,
"_primary_term": 1,
"_score": 1,
"_source": {
"v": {
"ttl": 1800000000000,
"updated": "2024-10-24T20:21:22.032Z",
"cursor": {
"published": "2024-10-24T20:19:53.542Z"
}
}
}
}
]
}
}
```
The naming convention for all state store is `agentless-state-`,
since the expectation for agentless we would have only one agent per policy and
the agents are ephemeral.
Closes https://github.com/elastic/security-team/issues/11101
(cherry picked from commit 8180f23fb5ad183ca7d5da0489789b8900c625c4)
Co-authored-by: Aleksandr Maus
Co-authored-by: Orestis Floros
---
filebeat/beater/filebeat.go | 34 +-
filebeat/beater/store.go | 46 ++-
filebeat/features/features.go | 59 +++
filebeat/features/features_test.go | 86 +++++
filebeat/input/filestream/environment_test.go | 8 +-
filebeat/input/filestream/input_test.go | 2 +-
.../internal/input-logfile/manager.go | 2 +-
.../internal/input-logfile/store.go | 2 +-
.../internal/input-logfile/store_test.go | 2 +-
filebeat/input/journald/environment_test.go | 2 +-
.../input/journald/input_filtering_test.go | 2 +-
filebeat/input/v2/input-cursor/manager.go | 59 +--
filebeat/input/v2/input-cursor/store.go | 65 ++--
filebeat/input/v2/input-cursor/store_test.go | 6 +-
filebeat/registrar/registrar.go | 4 +-
libbeat/statestore/backend/backend.go | 6 +-
libbeat/statestore/backend/es/error.go | 24 ++
libbeat/statestore/backend/es/notifier.go | 77 ++++
.../statestore/backend/es/notifier_test.go | 211 +++++++++++
libbeat/statestore/backend/es/registry.go | 53 +++
libbeat/statestore/backend/es/store.go | 340 ++++++++++++++++++
libbeat/statestore/backend/memlog/store.go | 4 +
libbeat/statestore/mock_test.go | 3 +
libbeat/statestore/store.go | 4 +
libbeat/statestore/storetest/storetest.go | 4 +
x-pack/filebeat/input/awss3/states.go | 2 +-
x-pack/filebeat/input/awss3/states_test.go | 2 +-
.../input/salesforce/input_manager_test.go | 2 +-
.../tests/integration/managerV2_test.go | 326 ++++++++++++++---
29 files changed, 1313 insertions(+), 124 deletions(-)
create mode 100644 filebeat/features/features.go
create mode 100644 filebeat/features/features_test.go
create mode 100644 libbeat/statestore/backend/es/error.go
create mode 100644 libbeat/statestore/backend/es/notifier.go
create mode 100644 libbeat/statestore/backend/es/notifier_test.go
create mode 100644 libbeat/statestore/backend/es/registry.go
create mode 100644 libbeat/statestore/backend/es/store.go
diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go
index 060c4702dad3..e3d29c2e61c1 100644
--- a/filebeat/beater/filebeat.go
+++ b/filebeat/beater/filebeat.go
@@ -40,6 +40,7 @@ import (
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/cfgwarn"
+ "github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
"github.com/elastic/beats/v7/libbeat/management"
"github.com/elastic/beats/v7/libbeat/monitoring/inputmon"
@@ -80,7 +81,9 @@ type Filebeat struct {
type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin
type StateStore interface {
- Access() (*statestore.Store, error)
+ // Access returns the storage registry depending on the type. This is needed for the Elasticsearch state store which
+ // is guarded by the feature.IsElasticsearchStateStoreEnabledForInput(typ) check.
+ Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}
@@ -299,13 +302,36 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
return err
}
- stateStore, err := openStateStore(b.Info, logp.NewLogger("filebeat"), config.Registry)
+ // Use context, like normal people do, hooking up to the beat.done channel
+ ctx, cn := context.WithCancel(context.Background())
+ go func() {
+ <-fb.done
+ cn()
+ }()
+
+ stateStore, err := openStateStore(ctx, b.Info, logp.NewLogger("filebeat"), config.Registry)
if err != nil {
logp.Err("Failed to open state store: %+v", err)
return err
}
defer stateStore.Close()
+ // If notifier is set, configure the listener for output configuration
+ // The notifier passes the elasticsearch output configuration down to the Elasticsearch backed state storage
+ // in order to allow it fully configure
+ if stateStore.notifier != nil {
+ b.OutputConfigReloader = reload.ReloadableFunc(func(r *reload.ConfigWithMeta) error {
+ outCfg := conf.Namespace{}
+ if err := r.Config.Unpack(&outCfg); err != nil || outCfg.Name() != "elasticsearch" {
+ logp.Err("Failed to unpack the output config: %v", err)
+ return nil
+ }
+
+ stateStore.notifier.Notify(outCfg.Config())
+ return nil
+ })
+ }
+
err = processLogInputTakeOver(stateStore, config)
if err != nil {
logp.Err("Failed to attempt filestream state take over: %+v", err)
@@ -351,6 +377,8 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
defer func() {
_ = inputTaskGroup.Stop()
}()
+
+ // Store needs to be fully configured at this point
if err := v2InputLoader.Init(&inputTaskGroup); err != nil {
logp.Err("Failed to initialize the input managers: %v", err)
return err
@@ -535,7 +563,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
return nil
}
- store, err := stateStore.Access()
+ store, err := stateStore.Access("")
if err != nil {
return fmt.Errorf("Failed to access state when attempting take over: %w", err)
}
diff --git a/filebeat/beater/store.go b/filebeat/beater/store.go
index 745c507d6e5d..a32e248aba85 100644
--- a/filebeat/beater/store.go
+++ b/filebeat/beater/store.go
@@ -18,11 +18,15 @@
package beater
import (
+ "context"
"time"
"github.com/elastic/beats/v7/filebeat/config"
+ "github.com/elastic/beats/v7/filebeat/features"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
+ "github.com/elastic/beats/v7/libbeat/statestore/backend"
+ "github.com/elastic/beats/v7/libbeat/statestore/backend/es"
"github.com/elastic/beats/v7/libbeat/statestore/backend/memlog"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/paths"
@@ -30,12 +34,31 @@ import (
type filebeatStore struct {
registry *statestore.Registry
+ esRegistry *statestore.Registry
storeName string
cleanInterval time.Duration
+
+ // Notifies the Elasticsearch store about configuration change
+ // which is available only after the beat runtime manager connects to the Agent
+ // and receives the output configuration
+ notifier *es.Notifier
}
-func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
- memlog, err := memlog.New(logger, memlog.Settings{
+func openStateStore(ctx context.Context, info beat.Info, logger *logp.Logger, cfg config.Registry) (*filebeatStore, error) {
+ var (
+ reg backend.Registry
+ err error
+
+ esreg *es.Registry
+ notifier *es.Notifier
+ )
+
+ if features.IsElasticsearchStateStoreEnabled() {
+ notifier = es.NewNotifier()
+ esreg = es.New(ctx, logger, notifier)
+ }
+
+ reg, err = memlog.New(logger, memlog.Settings{
Root: paths.Resolve(paths.Data, cfg.Path),
FileMode: cfg.Permissions,
})
@@ -43,18 +66,29 @@ func openStateStore(info beat.Info, logger *logp.Logger, cfg config.Registry) (*
return nil, err
}
- return &filebeatStore{
- registry: statestore.NewRegistry(memlog),
+ store := &filebeatStore{
+ registry: statestore.NewRegistry(reg),
storeName: info.Beat,
cleanInterval: cfg.CleanInterval,
- }, nil
+ notifier: notifier,
+ }
+
+ if esreg != nil {
+ store.esRegistry = statestore.NewRegistry(esreg)
+ }
+
+ return store, nil
}
func (s *filebeatStore) Close() {
s.registry.Close()
}
-func (s *filebeatStore) Access() (*statestore.Store, error) {
+// Access returns the storage registry depending on the type. Default is the file store.
+func (s *filebeatStore) Access(typ string) (*statestore.Store, error) {
+ if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil {
+ return s.esRegistry.Get(s.storeName)
+ }
return s.registry.Get(s.storeName)
}
diff --git a/filebeat/features/features.go b/filebeat/features/features.go
new file mode 100644
index 000000000000..803aa5b5bdeb
--- /dev/null
+++ b/filebeat/features/features.go
@@ -0,0 +1,59 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package features
+
+import (
+ "os"
+ "strings"
+)
+
+// List of input types Elasticsearch state store is enabled for
+var esTypesEnabled map[string]struct{}
+
+var isESEnabled bool
+
+func init() {
+ initFromEnv("AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES")
+}
+
+func initFromEnv(envName string) {
+ esTypesEnabled = make(map[string]struct{})
+
+ arr := strings.Split(os.Getenv(envName), ",")
+ for _, e := range arr {
+ k := strings.TrimSpace(e)
+ if k != "" {
+ esTypesEnabled[k] = struct{}{}
+ }
+ }
+ isESEnabled = len(esTypesEnabled) > 0
+}
+
+// IsElasticsearchStateStoreEnabled returns true if feature is enabled for agentless
+func IsElasticsearchStateStoreEnabled() bool {
+ return isESEnabled
+}
+
+// IsElasticsearchStateStoreEnabledForInput returns true if the provided input type uses Elasticsearch for state storage if the Elasticsearch state store feature is enabled
+func IsElasticsearchStateStoreEnabledForInput(inputType string) bool {
+ if IsElasticsearchStateStoreEnabled() {
+ _, ok := esTypesEnabled[inputType]
+ return ok
+ }
+ return false
+}
diff --git a/filebeat/features/features_test.go b/filebeat/features/features_test.go
new file mode 100644
index 000000000000..00702ae379e3
--- /dev/null
+++ b/filebeat/features/features_test.go
@@ -0,0 +1,86 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package features
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func Test_initFromEnv(t *testing.T) {
+ const envName = "TEST_AGENTLESS_ENV"
+
+ t.Run("Without setting env", func(t *testing.T) {
+ // default init
+ assert.False(t, IsElasticsearchStateStoreEnabled())
+ assert.Empty(t, esTypesEnabled)
+ assert.False(t, IsElasticsearchStateStoreEnabledForInput("xxx"))
+
+ // init from env
+ initFromEnv(envName)
+ assert.False(t, IsElasticsearchStateStoreEnabled())
+ assert.Empty(t, esTypesEnabled)
+ assert.False(t, IsElasticsearchStateStoreEnabledForInput("xxx"))
+ })
+
+ tests := []struct {
+ name string
+ value string
+ wantEnabled bool
+ wantContains []string
+ }{
+ {
+ name: "Empty",
+ value: "",
+ wantEnabled: false,
+ wantContains: nil,
+ },
+ {
+ name: "Single value",
+ value: "xxx",
+ wantEnabled: true,
+ wantContains: []string{"xxx"},
+ },
+ {
+ name: "Multiple values",
+ value: "xxx,yyy",
+ wantEnabled: true,
+ wantContains: []string{"xxx", "yyy"},
+ },
+ {
+ name: "Multiple values with spaces",
+ value: ",,, , xxx , yyy, ,,,,",
+ wantEnabled: true,
+ wantContains: []string{"xxx", "yyy"},
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ t.Setenv(envName, tt.value)
+ initFromEnv(envName)
+
+ assert.Equal(t, tt.wantEnabled, IsElasticsearchStateStoreEnabled())
+ for _, contain := range tt.wantContains {
+ assert.Contains(t, esTypesEnabled, contain)
+ assert.True(t, IsElasticsearchStateStoreEnabledForInput(contain))
+ }
+ assert.Len(t, esTypesEnabled, len(tt.wantContains))
+ })
+ }
+}
diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go
index a4dfd2c15fe3..0254420a0bf0 100644
--- a/filebeat/input/filestream/environment_test.go
+++ b/filebeat/input/filestream/environment_test.go
@@ -194,7 +194,7 @@ func (e *inputTestingEnvironment) abspath(filename string) string {
}
func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
- inputStore, _ := e.stateStore.Access()
+ inputStore, _ := e.stateStore.Access("")
actual := 0
err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) {
@@ -331,7 +331,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}
- inputStore, _ := e.stateStore.Access()
+ inputStore, _ := e.stateStore.Access("")
id := getIDFromPath(filepath, inputID, fi)
var entry registryEntry
@@ -352,7 +352,7 @@ func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expect
}
func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) {
- inputStore, _ := e.stateStore.Access()
+ inputStore, _ := e.stateStore.Access("")
var entry registryEntry
err := inputStore.Get(key, &entry)
@@ -553,7 +553,7 @@ func (s *testInputStore) Close() {
s.registry.Close()
}
-func (s *testInputStore) Access() (*statestore.Store, error) {
+func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}
diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go
index 735ea0d0ffe7..970c6c7e7a56 100644
--- a/filebeat/input/filestream/input_test.go
+++ b/filebeat/input/filestream/input_test.go
@@ -247,7 +247,7 @@ func (s *testStore) Close() {
s.registry.Close()
}
-func (s *testStore) Access() (*statestore.Store, error) {
+func (s *testStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filestream-benchmark")
}
diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go
index c65ccb5e3089..ccac725b3d1b 100644
--- a/filebeat/input/filestream/internal/input-logfile/manager.go
+++ b/filebeat/input/filestream/internal/input-logfile/manager.go
@@ -88,7 +88,7 @@ const globalInputID = ".global"
// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
- Access() (*statestore.Store, error)
+ Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}
diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go
index 85f40d1f3a33..9a65f0cd011d 100644
--- a/filebeat/input/filestream/internal/input-logfile/store.go
+++ b/filebeat/input/filestream/internal/input-logfile/store.go
@@ -144,7 +144,7 @@ var closeStore = (*store).close
func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) {
ok := false
- persistentStore, err := statestore.Access()
+ persistentStore, err := statestore.Access("")
if err != nil {
return nil, err
}
diff --git a/filebeat/input/filestream/internal/input-logfile/store_test.go b/filebeat/input/filestream/internal/input-logfile/store_test.go
index 2d4f98b5d29b..ac77fc2c2942 100644
--- a/filebeat/input/filestream/internal/input-logfile/store_test.go
+++ b/filebeat/input/filestream/internal/input-logfile/store_test.go
@@ -508,7 +508,7 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore {
func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts }
func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod }
-func (ts testStateStore) Access() (*statestore.Store, error) {
+func (ts testStateStore) Access(string) (*statestore.Store, error) {
if ts.Store == nil {
return nil, errors.New("no store configured")
}
diff --git a/filebeat/input/journald/environment_test.go b/filebeat/input/journald/environment_test.go
index 57f75163e926..9ea77d017d15 100644
--- a/filebeat/input/journald/environment_test.go
+++ b/filebeat/input/journald/environment_test.go
@@ -139,7 +139,7 @@ func (s *testInputStore) Close() {
s.registry.Close()
}
-func (s *testInputStore) Access() (*statestore.Store, error) {
+func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}
diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go
index 220f71e2d9ba..9464016d40dd 100644
--- a/filebeat/input/journald/input_filtering_test.go
+++ b/filebeat/input/journald/input_filtering_test.go
@@ -256,7 +256,7 @@ func TestInputSeek(t *testing.T) {
env := newInputTestingEnvironment(t)
if testCase.cursor != "" {
- store, _ := env.stateStore.Access()
+ store, _ := env.stateStore.Access("")
tmp := map[string]any{}
if err := json.Unmarshal([]byte(testCase.cursor), &tmp); err != nil {
t.Fatal(err)
diff --git a/filebeat/input/v2/input-cursor/manager.go b/filebeat/input/v2/input-cursor/manager.go
index 1d5578a71223..f8d86054054e 100644
--- a/filebeat/input/v2/input-cursor/manager.go
+++ b/filebeat/input/v2/input-cursor/manager.go
@@ -21,11 +21,11 @@ import (
"context"
"errors"
"fmt"
- "sync"
"time"
"github.com/elastic/go-concert/unison"
+ "github.com/elastic/beats/v7/filebeat/features"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/statestore"
conf "github.com/elastic/elastic-agent-libs/config"
@@ -63,9 +63,9 @@ type InputManager struct {
// that will be used to collect events from each source.
Configure func(cfg *conf.C) ([]Source, Input, error)
- initOnce sync.Once
- initErr error
- store *store
+ initedFull bool
+ initErr error
+ store *store
}
// Source describe a source the input can collect data from.
@@ -82,25 +82,38 @@ var (
// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
- Access() (*statestore.Store, error)
+ Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}
-func (cim *InputManager) init() error {
- cim.initOnce.Do(func() {
- if cim.DefaultCleanTimeout <= 0 {
- cim.DefaultCleanTimeout = 30 * time.Minute
- }
+// init initializes the state store
+// This function is called from:
+// 1. InputManager::Init on beat start
+// 2. InputManager::Create when the input is initialized with configuration
+// When Elasticsearch state storage is used for the input it will be only fully configured on InputManager::Create,
+// so skip reading the state from the storage on InputManager::Init in this case
+func (cim *InputManager) init(inputID string) error {
+ if cim.initedFull {
+ return nil
+ }
- log := cim.Logger.With("input_type", cim.Type)
- var store *store
- store, cim.initErr = openStore(log, cim.StateStore, cim.Type)
- if cim.initErr != nil {
- return
- }
+ if cim.DefaultCleanTimeout <= 0 {
+ cim.DefaultCleanTimeout = 30 * time.Minute
+ }
- cim.store = store
- })
+ log := cim.Logger.With("input_type", cim.Type)
+ var store *store
+ useES := features.IsElasticsearchStateStoreEnabledForInput(cim.Type)
+ fullInit := !useES || inputID != ""
+ store, cim.initErr = openStore(log, cim.StateStore, cim.Type, inputID, fullInit)
+ if cim.initErr != nil {
+ return cim.initErr
+ }
+
+ cim.store = store
+ if fullInit {
+ cim.initedFull = true
+ }
return cim.initErr
}
@@ -108,7 +121,7 @@ func (cim *InputManager) init() error {
// Init starts background processes for deleting old entries from the
// persistent store if mode is ModeRun.
func (cim *InputManager) Init(group unison.Group) error {
- if err := cim.init(); err != nil {
+ if err := cim.init(""); err != nil {
return err
}
@@ -143,10 +156,6 @@ func (cim *InputManager) shutdown() {
// Create builds a new v2.Input using the provided Configure function.
// The Input will run a go-routine per source that has been configured.
func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
- if err := cim.init(); err != nil {
- return nil, err
- }
-
settings := struct {
ID string `config:"id"`
CleanInactive time.Duration `config:"clean_inactive"`
@@ -155,6 +164,10 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
return nil, err
}
+ if err := cim.init(settings.ID); err != nil {
+ return nil, err
+ }
+
sources, inp, err := cim.Configure(config)
if err != nil {
return nil, err
diff --git a/filebeat/input/v2/input-cursor/store.go b/filebeat/input/v2/input-cursor/store.go
index a53bc77a79f9..936735946b0b 100644
--- a/filebeat/input/v2/input-cursor/store.go
+++ b/filebeat/input/v2/input-cursor/store.go
@@ -127,16 +127,18 @@ type (
// hook into store close for testing purposes
var closeStore = (*store).close
-func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) {
+func openStore(log *logp.Logger, statestore StateStore, prefix string, inputID string, fullInit bool) (*store, error) {
ok := false
- persistentStore, err := statestore.Access()
+ log.Debugf("input-cursor::openStore: prefix: %v inputID: %s", prefix, inputID)
+ persistentStore, err := statestore.Access(prefix)
if err != nil {
return nil, err
}
defer cleanup.IfNot(&ok, func() { persistentStore.Close() })
+ persistentStore.SetID(inputID)
- states, err := readStates(log, persistentStore, prefix)
+ states, err := readStates(log, persistentStore, prefix, fullInit)
if err != nil {
return nil, err
}
@@ -283,41 +285,44 @@ func (r *resource) stateSnapshot() state {
}
}
-func readStates(log *logp.Logger, store *statestore.Store, prefix string) (*states, error) {
+func readStates(log *logp.Logger, store *statestore.Store, prefix string, fullInit bool) (*states, error) {
keyPrefix := prefix + "::"
states := &states{
table: map[string]*resource{},
}
- err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) {
- if !strings.HasPrefix(key, keyPrefix) {
- return true, nil
- }
+ if fullInit {
+ err := store.Each(func(key string, dec statestore.ValueDecoder) (bool, error) {
+ if !strings.HasPrefix(key, keyPrefix) {
+ return true, nil
+ }
+
+ var st state
+ if err := dec.Decode(&st); err != nil {
+ log.Errorf("Failed to read registry state for '%v', cursor state will be ignored. Error was: %+v",
+ key, err)
+ return true, nil
+ }
+
+ resource := &resource{
+ key: key,
+ stored: true,
+ lock: unison.MakeMutex(),
+ internalInSync: true,
+ internalState: stateInternal{
+ TTL: st.TTL,
+ Updated: st.Updated,
+ },
+ cursor: st.Cursor,
+ }
+ states.table[resource.key] = resource
- var st state
- if err := dec.Decode(&st); err != nil {
- log.Errorf("Failed to read regisry state for '%v', cursor state will be ignored. Error was: %+v",
- key, err)
return true, nil
+ })
+ log.Debugf("input-cursor store read %d keys", len(states.table))
+ if err != nil {
+ return nil, err
}
-
- resource := &resource{
- key: key,
- stored: true,
- lock: unison.MakeMutex(),
- internalInSync: true,
- internalState: stateInternal{
- TTL: st.TTL,
- Updated: st.Updated,
- },
- cursor: st.Cursor,
- }
- states.table[resource.key] = resource
-
- return true, nil
- })
- if err != nil {
- return nil, err
}
return states, nil
}
diff --git a/filebeat/input/v2/input-cursor/store_test.go b/filebeat/input/v2/input-cursor/store_test.go
index fc1d57fac3ee..b7fbba9c8ad6 100644
--- a/filebeat/input/v2/input-cursor/store_test.go
+++ b/filebeat/input/v2/input-cursor/store_test.go
@@ -52,7 +52,7 @@ func TestStore_OpenClose(t *testing.T) {
})
t.Run("fail if persistent store can not be accessed", func(t *testing.T) {
- _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test")
+ _, err := openStore(logp.NewLogger("test"), testStateStore{}, "test", "", true)
require.Error(t, err)
})
@@ -240,7 +240,7 @@ func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *sto
persistentStore = createSampleStore(t, nil)
}
- store, err := openStore(logp.NewLogger("test"), persistentStore, prefix)
+ store, err := openStore(logp.NewLogger("test"), persistentStore, prefix, "", true)
if err != nil {
t.Fatalf("failed to open the store")
}
@@ -267,7 +267,7 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore {
func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts }
func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod }
-func (ts testStateStore) Access() (*statestore.Store, error) {
+func (ts testStateStore) Access(_ string) (*statestore.Store, error) {
if ts.Store == nil {
return nil, errors.New("no store configured")
}
diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go
index 133a4fbdd695..8982c6d58609 100644
--- a/filebeat/registrar/registrar.go
+++ b/filebeat/registrar/registrar.go
@@ -55,7 +55,7 @@ type successLogger interface {
}
type StateStore interface {
- Access() (*statestore.Store, error)
+ Access(typ string) (*statestore.Store, error)
}
var (
@@ -72,7 +72,7 @@ const fileStatePrefix = "filebeat::logs::"
// New creates a new Registrar instance, updating the registry file on
// `file.State` updates. New fails if the file can not be opened or created.
func New(stateStore StateStore, out successLogger, flushTimeout time.Duration) (*Registrar, error) {
- store, err := stateStore.Access()
+ store, err := stateStore.Access("")
if err != nil {
return nil, err
}
diff --git a/libbeat/statestore/backend/backend.go b/libbeat/statestore/backend/backend.go
index c40d8515977d..c58ad173a3b1 100644
--- a/libbeat/statestore/backend/backend.go
+++ b/libbeat/statestore/backend/backend.go
@@ -42,7 +42,7 @@ type Store interface {
Close() error
// Has checks if the key exists. No error must be returned if the key does
- // not exists, but the bool return must be false.
+ // not exist, but the bool return must be false.
// An error return value must indicate internal errors only. The store is
// assumed to be in a 'bad' but recoverable state if 'Has' fails.
Has(key string) (bool, error)
@@ -68,4 +68,8 @@ type Store interface {
// is assumed to be invalidated once fn returns
// The loop shall return if fn returns an error or false.
Each(fn func(string, ValueDecoder) (bool, error)) error
+
+ // SetID Sets the store ID when the full input configuration is acquired.
+ // This is needed in order to support Elasticsearch state store naming convention based on the input ID.
+ SetID(id string)
}
diff --git a/libbeat/statestore/backend/es/error.go b/libbeat/statestore/backend/es/error.go
new file mode 100644
index 000000000000..df8b1a734d6f
--- /dev/null
+++ b/libbeat/statestore/backend/es/error.go
@@ -0,0 +1,24 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package es
+
+import "errors"
+
+var (
+ ErrKeyUnknown = errors.New("key unknown")
+)
diff --git a/libbeat/statestore/backend/es/notifier.go b/libbeat/statestore/backend/es/notifier.go
new file mode 100644
index 000000000000..153883cf18f8
--- /dev/null
+++ b/libbeat/statestore/backend/es/notifier.go
@@ -0,0 +1,77 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package es
+
+import (
+ "sync"
+
+ conf "github.com/elastic/elastic-agent-libs/config"
+)
+
+type OnConfigUpdateFunc func(c *conf.C)
+type UnsubscribeFunc func()
+
+type Notifier struct {
+ mx sync.Mutex
+
+ lastConfig *conf.C
+ listeners map[int]OnConfigUpdateFunc
+ id int
+}
+
+func NewNotifier() *Notifier {
+ return &Notifier{
+ listeners: make(map[int]OnConfigUpdateFunc),
+ id: 0,
+ }
+}
+
+// Subscribe adds a listener to the notifier. The listener will be called when Notify is called.
+// Each OnConfigUpdateFunc is called asynchronously in a separate goroutine in each Notify call.
+//
+// Returns an UnsubscribeFunc that can be used to remove the listener.
+//
+// Note: Subscribe will call the listener with the last config that was passed to Notify.
+func (n *Notifier) Subscribe(fn OnConfigUpdateFunc) UnsubscribeFunc {
+ n.mx.Lock()
+ defer n.mx.Unlock()
+
+ id := n.id
+ n.id++
+ n.listeners[id] = fn
+
+ if n.lastConfig != nil {
+ go fn(n.lastConfig)
+ }
+
+ return func() {
+ n.mx.Lock()
+ defer n.mx.Unlock()
+ delete(n.listeners, id)
+ }
+}
+
+func (n *Notifier) Notify(c *conf.C) {
+ n.mx.Lock()
+ defer n.mx.Unlock()
+ n.lastConfig = c
+
+ for _, listener := range n.listeners {
+ go listener(c)
+ }
+}
diff --git a/libbeat/statestore/backend/es/notifier_test.go b/libbeat/statestore/backend/es/notifier_test.go
new file mode 100644
index 000000000000..290508411ab3
--- /dev/null
+++ b/libbeat/statestore/backend/es/notifier_test.go
@@ -0,0 +1,211 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package es
+
+import (
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ conf "github.com/elastic/elastic-agent-libs/config"
+)
+
+func createTestConfigs(t *testing.T, n int) []*conf.C {
+ var res []*conf.C
+ for i := 0; i < n; i++ {
+ c, err := conf.NewConfigFrom(map[string]any{
+ "id": i,
+ })
+ require.NoError(t, err)
+ require.NotNil(t, c)
+ id, err := c.Int("id", -1)
+ require.NoError(t, err, "sanity check: id is stored")
+ require.Equal(t, int64(i), id, "sanity check: id is correct")
+ res = append(res, c)
+ }
+ return res
+}
+
+func wgWait(t *testing.T, wg *sync.WaitGroup) {
+ const timeout = 1 * time.Second
+ t.Helper()
+
+ done := make(chan struct{})
+ go func() {
+ wg.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ return
+ case <-time.After(timeout):
+ require.Fail(t, "timeout waiting for WaitGroup")
+ }
+}
+
+func TestSanity(t *testing.T) {
+ assert.Equal(t, createTestConfigs(t, 5), createTestConfigs(t, 5))
+ assert.NotEqual(t, createTestConfigs(t, 4), createTestConfigs(t, 5))
+ assert.NotEqual(t, createTestConfigs(t, 5)[3], createTestConfigs(t, 5)[4])
+}
+
+func TestSubscribeAndNotify(t *testing.T) {
+ notifier := NewNotifier()
+
+ var (
+ wg sync.WaitGroup
+ mx sync.Mutex
+ receivedFirst []*conf.C
+ receivedSecond []*conf.C
+ )
+
+ unsubFirst := notifier.Subscribe(func(c *conf.C) {
+ defer wg.Done()
+ mx.Lock()
+ defer mx.Unlock()
+ receivedFirst = append(receivedFirst, c)
+ })
+ defer unsubFirst()
+
+ unsubSecond := notifier.Subscribe(func(c *conf.C) {
+ defer wg.Done()
+ mx.Lock()
+ defer mx.Unlock()
+ receivedSecond = append(receivedSecond, c)
+ })
+ defer unsubSecond()
+
+ const totalNotifications = 3
+
+ configs := createTestConfigs(t, totalNotifications)
+
+ wg.Add(totalNotifications * 2)
+ for _, config := range configs {
+ notifier.Notify(config)
+ }
+
+ wgWait(t, &wg)
+ assert.ElementsMatch(t, configs, receivedFirst)
+ assert.ElementsMatch(t, configs, receivedSecond)
+
+ // Receive old config
+ wg.Add(1)
+ notifier.Subscribe(func(c *conf.C) {
+ defer wg.Done()
+ mx.Lock()
+ defer mx.Unlock()
+ })
+ wgWait(t, &wg)
+}
+
+func TestUnsubscribe(t *testing.T) {
+ var (
+ wg sync.WaitGroup
+ mx sync.Mutex
+ receivedFirst, receivedSecond []*conf.C
+ )
+
+ notifier := NewNotifier()
+
+ unsubFirst := notifier.Subscribe(func(c *conf.C) {
+ defer wg.Done()
+ mx.Lock()
+ defer mx.Unlock()
+ receivedFirst = append(receivedFirst, c)
+ })
+ defer unsubFirst()
+
+ unsubSecond := notifier.Subscribe(func(c *conf.C) {
+ defer wg.Done()
+ mx.Lock()
+ defer mx.Unlock()
+ receivedSecond = append(receivedSecond, c)
+ })
+ defer unsubSecond()
+
+ const totalNotifications = 3
+
+ configs := createTestConfigs(t, totalNotifications)
+
+ // Unsubscribe first
+ unsubFirst()
+
+ // Notify
+ wg.Add(totalNotifications)
+ for _, config := range configs {
+ notifier.Notify(config)
+ }
+
+ wgWait(t, &wg)
+ assert.Empty(t, receivedFirst)
+ assert.ElementsMatch(t, configs, receivedSecond)
+}
+
+func TestConcurrentSubscribeAndNotify(t *testing.T) {
+ notifier := NewNotifier()
+
+ var (
+ wg, wgSub sync.WaitGroup
+ mx, mxSub sync.Mutex
+ received []*conf.C
+ unsubFns []UnsubscribeFunc
+ )
+
+ // Concurrent subscribers
+ const count = 10
+ wgSub.Add(count)
+ wg.Add(count)
+ for i := 0; i < count; i++ {
+ go func() {
+ defer wgSub.Done()
+ mxSub.Lock()
+ defer mxSub.Unlock()
+ unsub := notifier.Subscribe(func(c *conf.C) {
+ defer wg.Done()
+ mx.Lock()
+ defer mx.Unlock()
+ received = append(received, c)
+ })
+ unsubFns = append(unsubFns, unsub)
+ }()
+ }
+ defer func() {
+ for _, unsubFn := range unsubFns {
+ unsubFn()
+ }
+ }()
+
+ // Wait for all subscribers to be added
+ wgWait(t, &wgSub)
+
+ // Notify
+ c := createTestConfigs(t, 1)[0]
+ notifier.Notify(c)
+
+ // Wait for all
+ wgWait(t, &wg)
+ expected := make([]*conf.C, count)
+ for i := 0; i < count; i++ {
+ expected[i] = c
+ }
+ assert.Equal(t, expected, received)
+}
diff --git a/libbeat/statestore/backend/es/registry.go b/libbeat/statestore/backend/es/registry.go
new file mode 100644
index 000000000000..42ef973a2bbf
--- /dev/null
+++ b/libbeat/statestore/backend/es/registry.go
@@ -0,0 +1,53 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package es
+
+import (
+ "context"
+ "sync"
+
+ "github.com/elastic/beats/v7/libbeat/statestore/backend"
+ "github.com/elastic/elastic-agent-libs/logp"
+)
+
+type Registry struct {
+ ctx context.Context
+
+ log *logp.Logger
+ mx sync.Mutex
+
+ notifier *Notifier
+}
+
+func New(ctx context.Context, log *logp.Logger, notifier *Notifier) *Registry {
+ return &Registry{
+ ctx: ctx,
+ log: log,
+ notifier: notifier,
+ }
+}
+
+func (r *Registry) Access(name string) (backend.Store, error) {
+ r.mx.Lock()
+ defer r.mx.Unlock()
+ return openStore(r.ctx, r.log, name, r.notifier)
+}
+
+func (r *Registry) Close() error {
+ return nil
+}
diff --git a/libbeat/statestore/backend/es/store.go b/libbeat/statestore/backend/es/store.go
new file mode 100644
index 000000000000..fee1e0c9ba48
--- /dev/null
+++ b/libbeat/statestore/backend/es/store.go
@@ -0,0 +1,340 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package es
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/http"
+ "net/url"
+ "sync"
+ "time"
+
+ "github.com/elastic/beats/v7/libbeat/common/transform/typeconv"
+ "github.com/elastic/beats/v7/libbeat/esleg/eslegclient"
+ "github.com/elastic/beats/v7/libbeat/statestore/backend"
+ conf "github.com/elastic/elastic-agent-libs/config"
+ "github.com/elastic/elastic-agent-libs/logp"
+)
+
+// The current typical usage of the state storage is such that the consumer
+// of the storage fetches all the keys and caches them at the start of the beat.
+// Then the key value gets updated (Set is called) possibly frequently, so we want these operations to happen fairly fast
+// and not block waiting on Elasticsearch refresh, thus the slight trade-off for performance instead of consistency.
+// The value is not normally retrieved after a modification, so the inconsistency (potential refresh delay) is acceptable for our use cases.
+//
+// If consistency becomes a strict requirement, the storage would need to implement possibly some caching mechanism
+// that would guarantee the consistency between Set/Remove/Get/Each operations at least for a given "in-memory" instance of the storage.
+
+type store struct {
+ ctx context.Context
+ cn context.CancelFunc
+ log *logp.Logger
+ name string
+ index string
+ notifier *Notifier
+
+ chReady chan struct{}
+ once sync.Once
+
+ mx sync.Mutex
+ cli *eslegclient.Connection
+ cliErr error
+}
+
+const docType = "_doc"
+
+func openStore(ctx context.Context, log *logp.Logger, name string, notifier *Notifier) (*store, error) {
+ ctx, cn := context.WithCancel(ctx)
+ s := &store{
+ ctx: ctx,
+ cn: cn,
+ log: log.With("name", name).With("backend", "elasticsearch"),
+ name: name,
+ index: renderIndexName(name),
+ notifier: notifier,
+ chReady: make(chan struct{}),
+ }
+
+ chCfg := make(chan *conf.C)
+
+ unsubFn := s.notifier.Subscribe(func(c *conf.C) {
+ select {
+ case chCfg <- c:
+ case <-ctx.Done():
+ }
+ })
+
+ go s.loop(ctx, cn, unsubFn, chCfg)
+
+ return s, nil
+}
+
+func renderIndexName(name string) string {
+ return "agentless-state-" + name
+}
+
+func (s *store) waitReady() error {
+ select {
+ case <-s.ctx.Done():
+ return s.ctx.Err()
+ case <-s.chReady:
+ return s.cliErr
+ }
+}
+
+func (s *store) SetID(id string) {
+ s.mx.Lock()
+ defer s.mx.Unlock()
+
+ if id == "" {
+ return
+ }
+ s.index = renderIndexName(id)
+}
+
+func (s *store) Close() error {
+ s.mx.Lock()
+ defer s.mx.Unlock()
+
+ if s.cn != nil {
+ s.cn()
+ }
+
+ if s.cli != nil {
+ err := s.cli.Close()
+ s.cli = nil
+ return err
+ }
+ return nil
+}
+
+func (s *store) Has(key string) (bool, error) {
+ if err := s.waitReady(); err != nil {
+ return false, err
+ }
+ s.mx.Lock()
+ defer s.mx.Unlock()
+
+ var v interface{}
+ err := s.get(key, v)
+ if err != nil {
+ if errors.Is(err, ErrKeyUnknown) {
+ return false, nil
+ }
+ return false, err
+ }
+ return true, nil
+}
+
+func (s *store) Get(key string, to interface{}) error {
+ if err := s.waitReady(); err != nil {
+ return err
+ }
+ s.mx.Lock()
+ defer s.mx.Unlock()
+
+ return s.get(key, to)
+}
+
+func (s *store) get(key string, to interface{}) error {
+ status, data, err := s.cli.Request("GET", fmt.Sprintf("/%s/%s/%s", s.index, docType, url.QueryEscape(key)), "", nil, nil)
+
+ if err != nil {
+ if status == http.StatusNotFound {
+ return ErrKeyUnknown
+ }
+ return err
+ }
+
+ var qr queryResult
+ err = json.Unmarshal(data, &qr)
+ if err != nil {
+ return err
+ }
+
+ err = json.Unmarshal(qr.Source.Value, to)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+type queryResult struct {
+ Found bool `json:"found"`
+ Source struct {
+ Value json.RawMessage `json:"v"`
+ } `json:"_source"`
+}
+
+type doc struct {
+ Value any `struct:"v"`
+ UpdatedAt any `struct:"updated_at"`
+}
+
+type entry struct {
+ value interface{}
+}
+
+func (e entry) Decode(to interface{}) error {
+ return typeconv.Convert(to, e.value)
+}
+
+func renderRequest(val interface{}) doc {
+ return doc{
+ Value: val,
+ UpdatedAt: time.Now().UTC().Format("2006-01-02T15:04:05.000Z"),
+ }
+}
+
+func (s *store) Set(key string, value interface{}) error {
+ if err := s.waitReady(); err != nil {
+ return err
+ }
+ s.mx.Lock()
+ defer s.mx.Unlock()
+
+ doc := renderRequest(value)
+ _, _, err := s.cli.Request("PUT", fmt.Sprintf("/%s/%s/%s", s.index, docType, url.QueryEscape(key)), "", nil, doc)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (s *store) Remove(key string) error {
+ if err := s.waitReady(); err != nil {
+ return err
+ }
+ s.mx.Lock()
+ defer s.mx.Unlock()
+
+ _, _, err := s.cli.Delete(s.index, docType, url.QueryEscape(key), nil)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+type searchResult struct {
+ ID string `json:"_id"`
+ Source struct {
+ Value json.RawMessage `json:"v"`
+ } `json:"_source"`
+}
+
+func (s *store) Each(fn func(string, backend.ValueDecoder) (bool, error)) error {
+ if err := s.waitReady(); err != nil {
+ return err
+ }
+
+ s.mx.Lock()
+ defer s.mx.Unlock()
+
+ // Do nothing for now if the store was not initialized
+ if s.cli == nil {
+ return nil
+ }
+
+ status, result, err := s.cli.SearchURIWithBody(s.index, "", nil, map[string]any{
+ "query": map[string]any{
+ "match_all": map[string]any{},
+ },
+ "size": 1000, // TODO: we might have to do scroll if there are more than 1000 keys
+ })
+
+ if err != nil && status != http.StatusNotFound {
+ return err
+ }
+
+ if result == nil || len(result.Hits.Hits) == 0 {
+ return nil
+ }
+
+ for _, hit := range result.Hits.Hits {
+ var sres searchResult
+ err = json.Unmarshal(hit, &sres)
+ if err != nil {
+ return err
+ }
+
+ var e entry
+ err = json.Unmarshal(sres.Source.Value, &e.value)
+ if err != nil {
+ return err
+ }
+
+ key, err := url.QueryUnescape(sres.ID)
+ if err != nil {
+ return err
+ }
+
+ cont, err := fn(key, e)
+ if !cont || err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (s *store) configure(ctx context.Context, c *conf.C) {
+ s.log.Debugf("Configure ES store")
+ s.mx.Lock()
+ defer s.mx.Unlock()
+
+ if s.cli != nil {
+ _ = s.cli.Close()
+ s.cli = nil
+ }
+ s.cliErr = nil
+
+ cli, err := eslegclient.NewConnectedClient(ctx, c, s.name)
+ if err != nil {
+ s.log.Errorf("ES store, failed to create elasticsearch client: %v", err)
+ s.cliErr = err
+ } else {
+ s.cli = cli
+ }
+
+ // Signal store is ready
+ s.once.Do(func() {
+ close(s.chReady)
+ })
+
+}
+
+func (s *store) loop(ctx context.Context, cn context.CancelFunc, unsubFn UnsubscribeFunc, chCfg chan *conf.C) {
+ defer cn()
+
+ // Unsubscribe on exit
+ defer unsubFn()
+
+ defer s.log.Debug("ES store exit main loop")
+
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case cu := <-chCfg:
+ s.configure(ctx, cu)
+ }
+ }
+}
diff --git a/libbeat/statestore/backend/memlog/store.go b/libbeat/statestore/backend/memlog/store.go
index 5bd6aac77fdf..67b94862262c 100644
--- a/libbeat/statestore/backend/memlog/store.go
+++ b/libbeat/statestore/backend/memlog/store.go
@@ -276,6 +276,10 @@ func (m *memstore) Remove(key string) bool {
return true
}
+func (s *store) SetID(_ string) {
+ // NOOP
+}
+
func (e entry) Decode(to interface{}) error {
return typeconv.Convert(to, e.value)
}
diff --git a/libbeat/statestore/mock_test.go b/libbeat/statestore/mock_test.go
index 165243bcd02d..9cc220df3ddd 100644
--- a/libbeat/statestore/mock_test.go
+++ b/libbeat/statestore/mock_test.go
@@ -93,3 +93,6 @@ func (m *mockStore) Each(fn func(string, backend.ValueDecoder) (bool, error)) er
args := m.Called(fn)
return args.Error(0)
}
+
+func (m *mockStore) SetID(_ string) {
+}
diff --git a/libbeat/statestore/store.go b/libbeat/statestore/store.go
index c204fcde8f51..875ba43e870c 100644
--- a/libbeat/statestore/store.go
+++ b/libbeat/statestore/store.go
@@ -61,6 +61,10 @@ func newStore(shared *sharedStore) *Store {
}
}
+func (s *Store) SetID(id string) {
+ s.shared.backend.SetID(id)
+}
+
// Close deactivates the current store. No new transacation can be generated.
// Already active transaction will continue to function until Closed.
// The backing store will be closed once all stores and active transactions have been closed.
diff --git a/libbeat/statestore/storetest/storetest.go b/libbeat/statestore/storetest/storetest.go
index 69f065e9bdc8..d43ce9bd1e42 100644
--- a/libbeat/statestore/storetest/storetest.go
+++ b/libbeat/statestore/storetest/storetest.go
@@ -213,3 +213,7 @@ func (s *MapStore) Each(fn func(string, backend.ValueDecoder) (bool, error)) err
func (d valueUnpacker) Decode(to interface{}) error {
return typeconv.Convert(to, d.from)
}
+
+func (s *MapStore) SetID(_ string) {
+ // NOOP
+}
diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go
index 2bfb9f29cd8b..fe5d36016eb8 100644
--- a/x-pack/filebeat/input/awss3/states.go
+++ b/x-pack/filebeat/input/awss3/states.go
@@ -35,7 +35,7 @@ type states struct {
// newStates generates a new states registry.
func newStates(log *logp.Logger, stateStore beater.StateStore, listPrefix string) (*states, error) {
- store, err := stateStore.Access()
+ store, err := stateStore.Access("")
if err != nil {
return nil, fmt.Errorf("can't access persistent store: %w", err)
}
diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go
index fa604ed08d96..d15590493ee5 100644
--- a/x-pack/filebeat/input/awss3/states_test.go
+++ b/x-pack/filebeat/input/awss3/states_test.go
@@ -32,7 +32,7 @@ func (s *testInputStore) Close() {
_ = s.registry.Close()
}
-func (s *testInputStore) Access() (*statestore.Store, error) {
+func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}
diff --git a/x-pack/filebeat/input/salesforce/input_manager_test.go b/x-pack/filebeat/input/salesforce/input_manager_test.go
index 8b73763f93fa..fc69f9180401 100644
--- a/x-pack/filebeat/input/salesforce/input_manager_test.go
+++ b/x-pack/filebeat/input/salesforce/input_manager_test.go
@@ -34,7 +34,7 @@ func makeTestStore(data map[string]interface{}) *statestore.Store {
type stateStore struct{}
-func (stateStore) Access() (*statestore.Store, error) {
+func (stateStore) Access(_ string) (*statestore.Store, error) {
return makeTestStore(map[string]interface{}{"hello": "world"}), nil
}
func (stateStore) CleanupInterval() time.Duration { return time.Duration(0) }
diff --git a/x-pack/filebeat/tests/integration/managerV2_test.go b/x-pack/filebeat/tests/integration/managerV2_test.go
index dc935d719507..84f6ad0f54bb 100644
--- a/x-pack/filebeat/tests/integration/managerV2_test.go
+++ b/x-pack/filebeat/tests/integration/managerV2_test.go
@@ -14,6 +14,9 @@ import (
"fmt"
"io"
"math"
+ "net/http"
+ "net/http/httptest"
+ "net/url"
"os"
"path/filepath"
"strings"
@@ -22,6 +25,7 @@ import (
"testing"
"time"
+ "github.com/gofrs/uuid/v5"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
@@ -32,6 +36,7 @@ import (
"github.com/elastic/beats/v7/libbeat/version"
"github.com/elastic/beats/v7/testing/certutil"
"github.com/elastic/beats/v7/x-pack/libbeat/management"
+ "github.com/elastic/beats/v7/x-pack/libbeat/management/tests"
"github.com/elastic/elastic-agent-client/v7/pkg/client/mock"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
)
@@ -234,50 +239,15 @@ func TestInputReloadUnderElasticAgent(t *testing.T) {
"-E", "management.enabled=true",
)
- // waitDeadlineOr5Mins looks at the test deadline
- // and returns a reasonable value of waiting for a
- // condition to be met. The possible values are:
- // - if no test deadline is set, return 5 minutes
- // - if a deadline is set and there is less than
- // 0.5 second left, return the time left
- // - otherwise return the time left minus 0.5 second.
- waitDeadlineOr5Min := func() time.Duration {
- deadline, deadlineSet := t.Deadline()
- if deadlineSet {
- left := time.Until(deadline)
- final := left - 500*time.Millisecond
- if final <= 0 {
- return left
- }
- return final
- }
- return 5 * time.Minute
+ for _, contains := range []string{
+ "Can only start an input when all related states are finished",
+ "file 'flog.log' is not finished, will retry starting the input soon",
+ "ForceReload set to TRUE",
+ "Reloading Beats inputs because forceReload is true",
+ "ForceReload set to FALSE",
+ } {
+ checkFilebeatLogs(t, filebeat, contains)
}
-
- require.Eventually(t, func() bool {
- return filebeat.LogContains("Can only start an input when all related states are finished")
- }, waitDeadlineOr5Min(), 100*time.Millisecond,
- "String 'Can only start an input when all related states are finished' not found on Filebeat logs")
-
- require.Eventually(t, func() bool {
- return filebeat.LogContains("file 'flog.log' is not finished, will retry starting the input soon")
- }, waitDeadlineOr5Min(), 100*time.Millisecond,
- "String 'file 'flog.log' is not finished, will retry starting the input soon' not found on Filebeat logs")
-
- require.Eventually(t, func() bool {
- return filebeat.LogContains("ForceReload set to TRUE")
- }, waitDeadlineOr5Min(), 100*time.Millisecond,
- "String 'ForceReload set to TRUE' not found on Filebeat logs")
-
- require.Eventually(t, func() bool {
- return filebeat.LogContains("Reloading Beats inputs because forceReload is true")
- }, waitDeadlineOr5Min(), 100*time.Millisecond,
- "String 'Reloading Beats inputs because forceReload is true' not found on Filebeat logs")
-
- require.Eventually(t, func() bool {
- return filebeat.LogContains("ForceReload set to FALSE")
- }, waitDeadlineOr5Min(), 100*time.Millisecond,
- "String 'ForceReload set to FALSE' not found on Filebeat logs")
}
// TestFailedOutputReportsUnhealthy ensures that if an output
@@ -832,3 +802,273 @@ func writeStartUpInfo(t *testing.T, w io.Writer, info *proto.StartUpInfo) {
_, err = w.Write(infoBytes)
require.NoError(t, err, "failed to write connection information")
}
+
+// Response structure for JSON
+type response struct {
+ Message string `json:"message"`
+ Published string `json:"published"`
+}
+
+func TestHTTPJSONInputReloadUnderElasticAgentWithElasticStateStore(t *testing.T) {
+ // First things first, ensure ES is running and we can connect to it.
+ // If ES is not running, the test will timeout and the only way to know
+ // what caused it is going through Filebeat's logs.
+ integration.EnsureESIsRunning(t)
+
+ // Create a test httpjson server for httpjson input
+ h := serverHelper{t: t}
+ defer func() {
+ assert.GreaterOrEqual(t, h.called, 2, "HTTP server should be called at least twice")
+ }()
+ testServer := httptest.NewServer(http.HandlerFunc(h.handler))
+ defer testServer.Close()
+
+ inputID := "httpjson-generic-" + uuid.Must(uuid.NewV4()).String()
+ inputUnit := &proto.UnitExpected{
+ Id: inputID,
+ Type: proto.UnitType_INPUT,
+ ConfigStateIdx: 1,
+ State: proto.State_HEALTHY,
+ LogLevel: proto.UnitLogLevel_DEBUG,
+ Config: &proto.UnitExpectedConfig{
+ Id: inputID,
+ Source: tests.RequireNewStruct(map[string]any{
+ "id": inputID,
+ "type": "httpjson",
+ "name": "httpjson-1",
+ "enabled": true,
+ }),
+ Type: "httpjson",
+ Name: "httpjson-1",
+ Streams: []*proto.Stream{
+ {
+ Id: inputID,
+ Source: integration.RequireNewStruct(t, map[string]any{
+ "id": inputID,
+ "enabled": true,
+ "type": "httpjson",
+ "interval": "5s",
+ "request.url": testServer.URL,
+ "request.method": "GET",
+ "request.transforms": []any{
+ map[string]any{
+ "set": map[string]any{
+ "target": "url.params.since",
+ "value": "[[.cursor.published]]",
+ "default": `[[formatDate (now (parseDuration "-24h")) "RFC3339"]]`,
+ },
+ },
+ },
+ "cursor": map[string]any{
+ "published": map[string]any{
+ "value": "[[.last_event.published]]",
+ },
+ },
+ }),
+ },
+ },
+ },
+ }
+ units := [][]*proto.UnitExpected{
+ {outputUnitES(t, 1), inputUnit},
+ {outputUnitES(t, 2), inputUnit},
+ }
+
+ idx := 0
+ waiting := false
+ when := time.Now()
+
+ final := atomic.Bool{}
+ nextState := func() {
+ if waiting {
+ if time.Now().After(when) {
+ t.Log("Next state")
+ idx = (idx + 1) % len(units)
+ waiting = false
+ h.notifyChange()
+ return
+ }
+ return
+ }
+ waiting = true
+ when = time.Now().Add(10 * time.Second)
+ }
+
+ server := &mock.StubServerV2{
+ CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected {
+ if management.DoesStateMatch(observed, units[idx], 0) {
+ if idx < len(units)-1 {
+ nextState()
+ } else {
+ final.Store(true)
+ }
+ }
+ for _, unit := range observed.GetUnits() {
+ expected := []proto.State{proto.State_HEALTHY, proto.State_CONFIGURING, proto.State_STARTING}
+ if !waiting {
+ expected = append(expected, proto.State_STOPPING)
+ }
+ require.Containsf(t, expected, unit.GetState(), "Unit '%s' is not healthy, state: %s", unit.GetId(), unit.GetState().String())
+ }
+ return &proto.CheckinExpected{
+ Units: units[idx],
+ }
+ },
+ ActionImpl: func(response *proto.ActionResponse) error { return nil },
+ }
+
+ require.NoError(t, server.Start())
+ t.Cleanup(server.Stop)
+
+ t.Setenv("AGENTLESS_ELASTICSEARCH_STATE_STORE_INPUT_TYPES", "httpjson,cel")
+ filebeat := NewFilebeat(t)
+ filebeat.RestartOnBeatOnExit = true
+ filebeat.Start(
+ "-E", fmt.Sprintf(`management.insecure_grpc_url_for_testing="localhost:%d"`, server.Port),
+ "-E", "management.enabled=true",
+ "-E", "management.restart_on_output_change=true",
+ )
+
+ for _, contains := range []string{
+ "Configure ES store",
+ "input-cursor::openStore: prefix: httpjson inputID: " + inputID,
+ "input-cursor store read 0 keys", // first, no previous data exists
+ "input-cursor store read 1 keys", // after the restart, previous key is read
+ } {
+ checkFilebeatLogs(t, filebeat, contains)
+ }
+
+ require.Eventually(t,
+ final.Load,
+ waitDeadlineOr5Min(t),
+ 100*time.Millisecond,
+ "Failed to reach the final state",
+ )
+}
+
+type serverHelper struct {
+ t *testing.T
+ lock sync.Mutex
+ previous time.Time
+ called int
+ stateChanged bool
+}
+
+func (h *serverHelper) verifyTime(since time.Time) time.Time {
+ h.lock.Lock()
+ defer h.lock.Unlock()
+
+ h.called++
+
+ if h.previous.IsZero() {
+ assert.WithinDurationf(h.t, time.Now().Add(-24*time.Hour), since, 15*time.Minute, "since should be ~24h ago")
+ } else {
+ // XXX: `since` field is expected to be equal to the last published time. However, between unit restarts, the last
+ // updated field might not be persisted successfully. As a workaround, we allow a larger delta between restarts.
+ // However, we are still checking that the `since` field is not too far in the past, like 24h ago which is the
+ // initial value.
+ assert.WithinDurationf(h.t, h.previous, since, h.getDelta(since), "since should re-use last value")
+ }
+ h.previous = time.Now()
+ return h.previous
+}
+
+func (h *serverHelper) getDelta(actual time.Time) time.Duration {
+ const delta = 1 * time.Second
+ if !h.stateChanged {
+ return delta
+ }
+
+ dt := h.previous.Sub(actual)
+ if dt < -delta || dt > delta {
+ h.stateChanged = false
+ return time.Minute
+ }
+ return delta
+}
+
+func (h *serverHelper) handler(w http.ResponseWriter, r *http.Request) {
+ since := parseParams(h.t, r.RequestURI)
+ published := h.verifyTime(since)
+
+ w.Header().Set("Content-Type", "application/json")
+ err := json.NewEncoder(w).Encode(response{
+ Message: "Hello",
+ Published: published.Format(time.RFC3339),
+ })
+ require.NoError(h.t, err)
+}
+
+func (h *serverHelper) notifyChange() {
+ h.lock.Lock()
+ defer h.lock.Unlock()
+ h.stateChanged = true
+}
+
+func parseParams(t *testing.T, uri string) time.Time {
+ myUrl, err := url.Parse(uri)
+ require.NoError(t, err)
+ params, err := url.ParseQuery(myUrl.RawQuery)
+ require.NoError(t, err)
+ since := params["since"]
+ require.NotEmpty(t, since)
+ sinceStr := since[0]
+ sinceTime, err := time.Parse(time.RFC3339, sinceStr)
+ require.NoError(t, err)
+ return sinceTime
+}
+
+func checkFilebeatLogs(t *testing.T, filebeat *integration.BeatProc, contains string) {
+ t.Helper()
+ const tick = 100 * time.Millisecond
+
+ require.Eventually(t,
+ func() bool { return filebeat.LogContains(contains) },
+ waitDeadlineOr5Min(t),
+ tick,
+ fmt.Sprintf("String '%s' not found on Filebeat logs", contains),
+ )
+}
+
+// waitDeadlineOr5Min looks at the test deadline and returns a reasonable value of waiting for a condition to be met.
+// The possible values are:
+// - if no test deadline is set, return 5 minutes
+// - if a deadline is set and there is less than 0.5 second left, return the time left
+// - otherwise return the time left minus 0.5 second.
+func waitDeadlineOr5Min(t *testing.T) time.Duration {
+ deadline, deadlineSet := t.Deadline()
+ if !deadlineSet {
+ return 5 * time.Minute
+ }
+ left := time.Until(deadline)
+ final := left - 500*time.Millisecond
+ if final <= 0 {
+ return left
+ }
+ return final
+}
+
+func outputUnitES(t *testing.T, id int) *proto.UnitExpected {
+ return &proto.UnitExpected{
+ Id: fmt.Sprintf("output-unit-%d", id),
+ Type: proto.UnitType_OUTPUT,
+ ConfigStateIdx: 1,
+ State: proto.State_HEALTHY,
+ LogLevel: proto.UnitLogLevel_DEBUG,
+ Config: &proto.UnitExpectedConfig{
+ Id: "default",
+ Type: "elasticsearch",
+ Name: fmt.Sprintf("elasticsearch-%d", id),
+ Source: integration.RequireNewStruct(t,
+ map[string]interface{}{
+ "type": "elasticsearch",
+ "hosts": []interface{}{"http://localhost:9200"},
+ "username": "admin",
+ "password": "testing",
+ "protocol": "http",
+ "enabled": true,
+ "allow_older_versions": true,
+ }),
+ },
+ }
+}