diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 7c9229cd09b5..25040008168f 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -24,7 +24,6 @@ import ( "path/filepath" "strings" "sync" - "time" "github.com/elastic/beats/v7/filebeat/backup" "github.com/elastic/beats/v7/filebeat/channel" @@ -79,15 +78,7 @@ type Filebeat struct { pipeline beat.PipelineConnector } -type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin - -type StateStore interface { - // Access returns the storage registry depending on the type. - // The value of typ is expected to have been obtained from - // cursor.InputManager.Type and represents the input type. - Access(typ string) (*statestore.Store, error) - CleanupInterval() time.Duration -} +type PluginFactory func(beat.Info, *logp.Logger, statestore.States) []v2.Plugin // New creates a new Filebeat pointer instance. func New(plugins PluginFactory) beat.Creator { @@ -569,7 +560,7 @@ func newPipelineLoaderFactory(ctx context.Context, esConfig *conf.C) fileset.Pip // some of the filestreams might want to take over the loginput state // if their `take_over` flag is set to `true`. -func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error { +func processLogInputTakeOver(stateStore statestore.States, config *cfg.Config) error { inputs, err := fetchInputConfiguration(config) if err != nil { return fmt.Errorf("Failed to fetch input configuration when attempting take over: %w", err) @@ -578,7 +569,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error { return nil } - store, err := stateStore.Access("") + store, err := stateStore.StoreFor("") if err != nil { return fmt.Errorf("Failed to access state when attempting take over: %w", err) } diff --git a/filebeat/beater/store.go b/filebeat/beater/store.go index a32e248aba85..82f4427d4e75 100644 --- a/filebeat/beater/store.go +++ b/filebeat/beater/store.go @@ -32,6 +32,8 @@ import ( "github.com/elastic/elastic-agent-libs/paths" ) +var _ statestore.States = (*filebeatStore)(nil) + type filebeatStore struct { registry *statestore.Registry esRegistry *statestore.Registry @@ -84,8 +86,8 @@ func (s *filebeatStore) Close() { s.registry.Close() } -// Access returns the storage registry depending on the type. Default is the file store. -func (s *filebeatStore) Access(typ string) (*statestore.Store, error) { +// AccessType returns the storage registry depending on the type. Default is the file store. +func (s *filebeatStore) StoreFor(typ string) (*statestore.Store, error) { if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil { return s.esRegistry.Get(s.storeName) } diff --git a/filebeat/input/default-inputs/inputs.go b/filebeat/input/default-inputs/inputs.go index 4b0c86f6a0d4..18b2f6f1ad5a 100644 --- a/filebeat/input/default-inputs/inputs.go +++ b/filebeat/input/default-inputs/inputs.go @@ -18,7 +18,6 @@ package inputs import ( - "github.com/elastic/beats/v7/filebeat/beater" "github.com/elastic/beats/v7/filebeat/input/filestream" "github.com/elastic/beats/v7/filebeat/input/kafka" "github.com/elastic/beats/v7/filebeat/input/tcp" @@ -26,17 +25,18 @@ import ( "github.com/elastic/beats/v7/filebeat/input/unix" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" ) -func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.Plugin { +func Init(info beat.Info, log *logp.Logger, components statestore.States) []v2.Plugin { return append( genericInputs(log, components), osInputs(info, log, components)..., ) } -func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin { +func genericInputs(log *logp.Logger, components statestore.States) []v2.Plugin { return []v2.Plugin{ filestream.Plugin(log, components), kafka.Plugin(), diff --git a/filebeat/input/default-inputs/inputs_linux.go b/filebeat/input/default-inputs/inputs_linux.go index 8eed9a3ea4f5..1d67966bf4e7 100644 --- a/filebeat/input/default-inputs/inputs_linux.go +++ b/filebeat/input/default-inputs/inputs_linux.go @@ -20,18 +20,14 @@ package inputs import ( "github.com/elastic/beats/v7/filebeat/input/journald" v2 "github.com/elastic/beats/v7/filebeat/input/v2" - cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" ) // inputs that are only supported on linux -type osComponents interface { - cursor.StateStore -} - -func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin { +func osInputs(info beat.Info, log *logp.Logger, components statestore.States) []v2.Plugin { var plugins []v2.Plugin zeroPlugin := v2.Plugin{} diff --git a/filebeat/input/default-inputs/inputs_windows.go b/filebeat/input/default-inputs/inputs_windows.go index 21e7fbd81198..11bd71d89042 100644 --- a/filebeat/input/default-inputs/inputs_windows.go +++ b/filebeat/input/default-inputs/inputs_windows.go @@ -19,17 +19,13 @@ package inputs import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" - cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/filebeat/input/winlog" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" ) -type osComponents interface { - cursor.StateStore -} - -func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin { +func osInputs(info beat.Info, log *logp.Logger, components statestore.States) []v2.Plugin { return []v2.Plugin{ winlog.Plugin(log, components), } diff --git a/filebeat/input/filestream/environment_test.go b/filebeat/input/filestream/environment_test.go index 0254420a0bf0..7db5cb92009b 100644 --- a/filebeat/input/filestream/environment_test.go +++ b/filebeat/input/filestream/environment_test.go @@ -48,7 +48,7 @@ import ( type inputTestingEnvironment struct { t *testing.T workingDir string - stateStore loginp.StateStore + stateStore statestore.States pipeline *mockPipelineConnector pluginInitOnce sync.Once @@ -194,7 +194,7 @@ func (e *inputTestingEnvironment) abspath(filename string) string { } func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) { - inputStore, _ := e.stateStore.Access("") + inputStore, _ := e.stateStore.StoreFor("") actual := 0 err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) { @@ -331,7 +331,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str e.t.Fatalf("cannot stat file when cheking for offset: %+v", err) } - inputStore, _ := e.stateStore.Access("") + inputStore, _ := e.stateStore.StoreFor("") id := getIDFromPath(filepath, inputID, fi) var entry registryEntry @@ -352,7 +352,7 @@ func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expect } func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) { - inputStore, _ := e.stateStore.Access("") + inputStore, _ := e.stateStore.StoreFor("") var entry registryEntry err := inputStore.Get(key, &entry) @@ -539,11 +539,13 @@ func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) { require.True(e.t, selectedEvent.Timestamp.Equal(tm), "got: %s, expected: %s", selectedEvent.Timestamp.String(), tm.String()) } +var _ statestore.States = (*testInputStore)(nil) + type testInputStore struct { registry *statestore.Registry } -func openTestStatestore() loginp.StateStore { +func openTestStatestore() statestore.States { return &testInputStore{ registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), } @@ -553,7 +555,7 @@ func (s *testInputStore) Close() { s.registry.Close() } -func (s *testInputStore) Access(_ string) (*statestore.Store, error) { +func (s *testInputStore) StoreFor(string) (*statestore.Store, error) { return s.registry.Get("filebeat") } diff --git a/filebeat/input/filestream/input.go b/filebeat/input/filestream/input.go index 620db9e5426b..995f0ca1e392 100644 --- a/filebeat/input/filestream/input.go +++ b/filebeat/input/filestream/input.go @@ -40,6 +40,7 @@ import ( "github.com/elastic/beats/v7/libbeat/reader/parser" "github.com/elastic/beats/v7/libbeat/reader/readfile" "github.com/elastic/beats/v7/libbeat/reader/readfile/encoding" + "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -67,7 +68,7 @@ type filestream struct { } // Plugin creates a new filestream input plugin for creating a stateful input. -func Plugin(log *logp.Logger, store loginp.StateStore) input.Plugin { +func Plugin(log *logp.Logger, store statestore.States) input.Plugin { return input.Plugin{ Name: pluginName, Stability: feature.Stable, diff --git a/filebeat/input/filestream/input_test.go b/filebeat/input/filestream/input_test.go index 970c6c7e7a56..f2a36041edfd 100644 --- a/filebeat/input/filestream/input_test.go +++ b/filebeat/input/filestream/input_test.go @@ -28,7 +28,6 @@ import ( "github.com/stretchr/testify/require" - loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/statestore" @@ -235,10 +234,12 @@ func generateFile(t testing.TB, dir string, lineCount int) string { return filename } -func createTestStore(t testing.TB) loginp.StateStore { +func createTestStore(t testing.TB) statestore.States { return &testStore{registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend())} } +var _ statestore.States = (*testStore)(nil) + type testStore struct { registry *statestore.Registry } @@ -247,7 +248,7 @@ func (s *testStore) Close() { s.registry.Close() } -func (s *testStore) Access(_ string) (*statestore.Store, error) { +func (s *testStore) StoreFor(string) (*statestore.Store, error) { return s.registry.Get("filestream-benchmark") } diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index 36e02d2007b3..c657cefea569 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -51,7 +51,7 @@ type InputManager struct { Logger *logp.Logger // StateStore gives the InputManager access to the persistent key value store. - StateStore StateStore + StateStore statestore.States // Type must contain the name of the input type. It is used to create the key name // for all sources the inputs collect from. @@ -87,12 +87,6 @@ var errNoInputRunner = errors.New("no input runner available") // Deprecated: Inputs without an ID are not supported anymore. const globalInputID = ".global" -// StateStore interface and configurations used to give the Manager access to the persistent store. -type StateStore interface { - Access(typ string) (*statestore.Store, error) - CleanupInterval() time.Duration -} - func (cim *InputManager) init() error { cim.initOnce.Do(func() { diff --git a/filebeat/input/filestream/internal/input-logfile/store.go b/filebeat/input/filestream/internal/input-logfile/store.go index 9a65f0cd011d..ac0c5f9d3142 100644 --- a/filebeat/input/filestream/internal/input-logfile/store.go +++ b/filebeat/input/filestream/internal/input-logfile/store.go @@ -141,10 +141,10 @@ type ( // hook into store close for testing purposes var closeStore = (*store).close -func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) { +func openStore(log *logp.Logger, statestore statestore.States, prefix string) (*store, error) { ok := false - persistentStore, err := statestore.Access("") + persistentStore, err := statestore.StoreFor("") if err != nil { return nil, err } diff --git a/filebeat/input/filestream/internal/input-logfile/store_test.go b/filebeat/input/filestream/internal/input-logfile/store_test.go index ac77fc2c2942..f73e78c86d7a 100644 --- a/filebeat/input/filestream/internal/input-logfile/store_test.go +++ b/filebeat/input/filestream/internal/input-logfile/store_test.go @@ -35,11 +35,6 @@ import ( "github.com/elastic/go-concert/unison" ) -type testStateStore struct { - Store *statestore.Store - GCPeriod time.Duration -} - func TestResource_CopyInto(t *testing.T) { src := resource{lock: unison.MakeMutex()} dst := resource{lock: unison.MakeMutex()} @@ -476,7 +471,7 @@ func closeStoreWith(fn func(s *store)) func() { } //nolint:unparam // It's a test helper -func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *store { +func testOpenStore(t *testing.T, prefix string, persistentStore statestore.States) *store { if persistentStore == nil { persistentStore = createSampleStore(t, nil) } @@ -506,9 +501,16 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore { } } +var _ statestore.States = testStateStore{} + +type testStateStore struct { + Store *statestore.Store + GCPeriod time.Duration +} + func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts } func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod } -func (ts testStateStore) Access(string) (*statestore.Store, error) { +func (ts testStateStore) StoreFor(string) (*statestore.Store, error) { if ts.Store == nil { return nil, errors.New("no store configured") } diff --git a/filebeat/input/journald/environment_test.go b/filebeat/input/journald/environment_test.go index 5b6a8fcf35c2..2e698abdeade 100644 --- a/filebeat/input/journald/environment_test.go +++ b/filebeat/input/journald/environment_test.go @@ -147,6 +147,8 @@ func (e *inputTestingEnvironment) RequireStatuses(expected []statusUpdate) { } } +var _ statestore.States = (*testInputStore)(nil) + type testInputStore struct { registry *statestore.Registry } @@ -161,7 +163,7 @@ func (s *testInputStore) Close() { s.registry.Close() } -func (s *testInputStore) Access(_ string) (*statestore.Store, error) { +func (s *testInputStore) StoreFor(string) (*statestore.Store, error) { return s.registry.Get("filebeat") } diff --git a/filebeat/input/journald/input.go b/filebeat/input/journald/input.go index 65864b6539f0..aa9a3077cfd7 100644 --- a/filebeat/input/journald/input.go +++ b/filebeat/input/journald/input.go @@ -33,6 +33,7 @@ import ( "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/reader" "github.com/elastic/beats/v7/libbeat/reader/parser" + "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -72,7 +73,7 @@ const localSystemJournalID = "LOCAL_SYSTEM_JOURNAL" const pluginName = "journald" // Plugin creates a new journald input plugin for creating a stateful input. -func Plugin(log *logp.Logger, store cursor.StateStore) input.Plugin { +func Plugin(log *logp.Logger, store statestore.States) input.Plugin { return input.Plugin{ Name: pluginName, Stability: feature.Stable, diff --git a/filebeat/input/journald/input_filtering_test.go b/filebeat/input/journald/input_filtering_test.go index 34d0755393bd..98f31422fb18 100644 --- a/filebeat/input/journald/input_filtering_test.go +++ b/filebeat/input/journald/input_filtering_test.go @@ -256,7 +256,7 @@ func TestInputSeek(t *testing.T) { env := newInputTestingEnvironment(t) if testCase.cursor != "" { - store, _ := env.stateStore.Access("") + store, _ := env.stateStore.StoreFor("") tmp := map[string]any{} if err := json.Unmarshal([]byte(testCase.cursor), &tmp); err != nil { t.Fatal(err) diff --git a/filebeat/input/journald/input_stub.go b/filebeat/input/journald/input_stub.go index 5c42239e6c9a..c635c259b2e3 100644 --- a/filebeat/input/journald/input_stub.go +++ b/filebeat/input/journald/input_stub.go @@ -21,10 +21,10 @@ package journald import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" - cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" ) -func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { +func Plugin(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{} } diff --git a/filebeat/input/v2/input-cursor/manager.go b/filebeat/input/v2/input-cursor/manager.go index f8d86054054e..276009f8c57c 100644 --- a/filebeat/input/v2/input-cursor/manager.go +++ b/filebeat/input/v2/input-cursor/manager.go @@ -48,8 +48,8 @@ import ( type InputManager struct { Logger *logp.Logger - // StateStore gives the InputManager access to the persitent key value store. - StateStore StateStore + // StateStore gives the InputManager access to the persistent key value store. + StateStore statestore.States // Type must contain the name of the input type. It is used to create the key name // for all sources the inputs collect from. @@ -80,12 +80,6 @@ var ( errNoInputRunner = errors.New("no input runner available") ) -// StateStore interface and configurations used to give the Manager access to the persistent store. -type StateStore interface { - Access(typ string) (*statestore.Store, error) - CleanupInterval() time.Duration -} - // init initializes the state store // This function is called from: // 1. InputManager::Init on beat start diff --git a/filebeat/input/v2/input-cursor/store.go b/filebeat/input/v2/input-cursor/store.go index 936735946b0b..2371b0addcc6 100644 --- a/filebeat/input/v2/input-cursor/store.go +++ b/filebeat/input/v2/input-cursor/store.go @@ -127,11 +127,11 @@ type ( // hook into store close for testing purposes var closeStore = (*store).close -func openStore(log *logp.Logger, statestore StateStore, prefix string, inputID string, fullInit bool) (*store, error) { +func openStore(log *logp.Logger, statestore statestore.States, prefix string, inputID string, fullInit bool) (*store, error) { ok := false log.Debugf("input-cursor::openStore: prefix: %v inputID: %s", prefix, inputID) - persistentStore, err := statestore.Access(prefix) + persistentStore, err := statestore.StoreFor(prefix) if err != nil { return nil, err } diff --git a/filebeat/input/v2/input-cursor/store_test.go b/filebeat/input/v2/input-cursor/store_test.go index b7fbba9c8ad6..0a31f66484f8 100644 --- a/filebeat/input/v2/input-cursor/store_test.go +++ b/filebeat/input/v2/input-cursor/store_test.go @@ -31,11 +31,6 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -type testStateStore struct { - Store *statestore.Store - GCPeriod time.Duration -} - func TestStore_OpenClose(t *testing.T) { t.Run("releasing store closes", func(t *testing.T) { var closed bool @@ -235,7 +230,7 @@ func closeStoreWith(fn func(s *store)) func() { } } -func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *store { +func testOpenStore(t *testing.T, prefix string, persistentStore statestore.States) *store { if persistentStore == nil { persistentStore = createSampleStore(t, nil) } @@ -265,9 +260,16 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore { } } +var _ statestore.States = testStateStore{} + +type testStateStore struct { + Store *statestore.Store + GCPeriod time.Duration +} + func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts } func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod } -func (ts testStateStore) Access(_ string) (*statestore.Store, error) { +func (ts testStateStore) StoreFor(string) (*statestore.Store, error) { if ts.Store == nil { return nil, errors.New("no store configured") } diff --git a/filebeat/input/winlog/input.go b/filebeat/input/winlog/input.go index 882d93f30a0c..fe06fb598700 100644 --- a/filebeat/input/winlog/input.go +++ b/filebeat/input/winlog/input.go @@ -23,6 +23,7 @@ import ( input "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/ctxtool" @@ -52,7 +53,7 @@ func (pub *publisher) Publish(records []eventlog.Record) error { type winlogInput struct{} // Plugin create a stateful input Plugin collecting logs from Windows Event Logs. -func Plugin(log *logp.Logger, store cursor.StateStore) input.Plugin { +func Plugin(log *logp.Logger, store statestore.States) input.Plugin { return input.Plugin{ Name: pluginName, Stability: feature.Beta, diff --git a/filebeat/registrar/registrar.go b/filebeat/registrar/registrar.go index 8982c6d58609..867a9d1cde78 100644 --- a/filebeat/registrar/registrar.go +++ b/filebeat/registrar/registrar.go @@ -54,10 +54,6 @@ type successLogger interface { Published(n int) bool } -type StateStore interface { - Access(typ string) (*statestore.Store, error) -} - var ( statesUpdate = monitoring.NewInt(nil, "registrar.states.update") statesCleanup = monitoring.NewInt(nil, "registrar.states.cleanup") @@ -71,8 +67,8 @@ const fileStatePrefix = "filebeat::logs::" // New creates a new Registrar instance, updating the registry file on // `file.State` updates. New fails if the file can not be opened or created. -func New(stateStore StateStore, out successLogger, flushTimeout time.Duration) (*Registrar, error) { - store, err := stateStore.Access("") +func New(stateStore statestore.States, out successLogger, flushTimeout time.Duration) (*Registrar, error) { + store, err := stateStore.StoreFor("") if err != nil { return nil, err } diff --git a/libbeat/statestore/store.go b/libbeat/statestore/store.go index 875ba43e870c..d725139c4fc2 100644 --- a/libbeat/statestore/store.go +++ b/libbeat/statestore/store.go @@ -19,11 +19,27 @@ package statestore import ( "sync/atomic" + "time" "github.com/elastic/beats/v7/libbeat/statestore/backend" "github.com/elastic/go-concert/unison" ) +// States is a collection of states backed by one or more persistent stores +// that may be differentiated by input type. +type States interface { + // StoreFor returns the storage registry for the given type. + // The value of typ is expected to have been obtained from + // cursor.InputManager.Type and represents the input type. + // Whether the receiver considers the value of typ is + // implementation dependent. + StoreFor(typ string) (*Store, error) + + // CleanupInterval returns the time between garbage collection + // runs for the stores owned by the States. + CleanupInterval() time.Duration +} + type sharedStore struct { reg *Registry refCount atomic.Int64 diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 32f9f24be464..c14ceea75ee7 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -7,9 +7,9 @@ package awss3 import ( "fmt" - "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/statestore" awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/go-concert/unison" @@ -17,7 +17,7 @@ import ( const inputName = "aws-s3" -func Plugin(store beater.StateStore) v2.Plugin { +func Plugin(store statestore.States) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Stable, @@ -28,7 +28,7 @@ func Plugin(store beater.StateStore) v2.Plugin { } type s3InputManager struct { - store beater.StateStore + store statestore.States } func (im *s3InputManager) Init(grp unison.Group) error { diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go index ad20579aae49..e727b52f9529 100644 --- a/x-pack/filebeat/input/awss3/s3_input.go +++ b/x-pack/filebeat/input/awss3/s3_input.go @@ -13,10 +13,10 @@ import ( awssdk "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/aws/ratelimit" - "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common/backoff" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/timed" ) @@ -30,7 +30,7 @@ type s3PollerInput struct { pipeline beat.Pipeline config config awsConfig awssdk.Config - store beater.StateStore + store statestore.States provider string s3 s3API metrics *inputMetrics @@ -42,7 +42,7 @@ type s3PollerInput struct { func newS3PollerInput( config config, awsConfig awssdk.Config, - store beater.StateStore, + store statestore.States, ) (v2.Input, error) { return &s3PollerInput{ config: config, diff --git a/x-pack/filebeat/input/awss3/states.go b/x-pack/filebeat/input/awss3/states.go index fe5d36016eb8..63948c0b0686 100644 --- a/x-pack/filebeat/input/awss3/states.go +++ b/x-pack/filebeat/input/awss3/states.go @@ -9,7 +9,6 @@ import ( "strings" "sync" - "github.com/elastic/beats/v7/filebeat/beater" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" ) @@ -34,8 +33,8 @@ type states struct { } // newStates generates a new states registry. -func newStates(log *logp.Logger, stateStore beater.StateStore, listPrefix string) (*states, error) { - store, err := stateStore.Access("") +func newStates(log *logp.Logger, stateStore statestore.States, listPrefix string) (*states, error) { + store, err := stateStore.StoreFor("") if err != nil { return nil, fmt.Errorf("can't access persistent store: %w", err) } diff --git a/x-pack/filebeat/input/awss3/states_test.go b/x-pack/filebeat/input/awss3/states_test.go index d15590493ee5..b087b86b47c4 100644 --- a/x-pack/filebeat/input/awss3/states_test.go +++ b/x-pack/filebeat/input/awss3/states_test.go @@ -9,7 +9,6 @@ import ( "testing" "time" - "github.com/elastic/beats/v7/filebeat/beater" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" "github.com/elastic/elastic-agent-libs/logp" @@ -18,28 +17,6 @@ import ( "github.com/stretchr/testify/require" ) -type testInputStore struct { - registry *statestore.Registry -} - -func openTestStatestore() beater.StateStore { - return &testInputStore{ - registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), - } -} - -func (s *testInputStore) Close() { - _ = s.registry.Close() -} - -func (s *testInputStore) Access(_ string) (*statestore.Store, error) { - return s.registry.Get("filebeat") -} - -func (s *testInputStore) CleanupInterval() time.Duration { - return 24 * time.Hour -} - func TestStatesAddStateAndIsProcessed(t *testing.T) { type stateTestCase struct { // An initialization callback to invoke on the (initially empty) states. @@ -270,3 +247,27 @@ func TestStatesPrefixHandling(t *testing.T) { }) } + +var _ statestore.States = (*testInputStore)(nil) + +type testInputStore struct { + registry *statestore.Registry +} + +func openTestStatestore() statestore.States { + return &testInputStore{ + registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()), + } +} + +func (s *testInputStore) Close() { + _ = s.registry.Close() +} + +func (s *testInputStore) StoreFor(string) (*statestore.Store, error) { + return s.registry.Get("filebeat") +} + +func (s *testInputStore) CleanupInterval() time.Duration { + return 24 * time.Hour +} diff --git a/x-pack/filebeat/input/azureblobstorage/input.go b/x-pack/filebeat/input/azureblobstorage/input.go index 7c7989ccbce3..604198410eff 100644 --- a/x-pack/filebeat/input/azureblobstorage/input.go +++ b/x-pack/filebeat/input/azureblobstorage/input.go @@ -14,6 +14,7 @@ import ( cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -33,7 +34,7 @@ const ( inputName string = "azure-blob-storage" ) -func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { +func Plugin(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Stable, diff --git a/x-pack/filebeat/input/cel/input.go b/x-pack/filebeat/input/cel/input.go index 92d29f001630..c15176100a14 100644 --- a/x-pack/filebeat/input/cel/input.go +++ b/x-pack/filebeat/input/cel/input.go @@ -45,6 +45,7 @@ import ( "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/management/status" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog" "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httpmon" @@ -73,7 +74,7 @@ const ( // is not given a user-agent string, this user agent is added to the request. var userAgent = useragent.UserAgent("Filebeat", version.GetDefaultVersion(), version.Commit(), version.BuildTime().String()) -func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { +func Plugin(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Stable, diff --git a/x-pack/filebeat/input/cel/input_manager.go b/x-pack/filebeat/input/cel/input_manager.go index 1c26b56b305e..edc09f4f600c 100644 --- a/x-pack/filebeat/input/cel/input_manager.go +++ b/x-pack/filebeat/input/cel/input_manager.go @@ -9,6 +9,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -22,7 +23,7 @@ type InputManager struct { var _ v2.InputManager = InputManager{} -func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManager { +func NewInputManager(log *logp.Logger, store statestore.States) InputManager { return InputManager{ cursor: &inputcursor.InputManager{ Logger: log, diff --git a/x-pack/filebeat/input/default-inputs/inputs.go b/x-pack/filebeat/input/default-inputs/inputs.go index a5ab6aad3c16..6174066a32c0 100644 --- a/x-pack/filebeat/input/default-inputs/inputs.go +++ b/x-pack/filebeat/input/default-inputs/inputs.go @@ -5,14 +5,14 @@ package inputs import ( - "github.com/elastic/beats/v7/filebeat/beater" ossinputs "github.com/elastic/beats/v7/filebeat/input/default-inputs" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" ) -func Init(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { +func Init(info beat.Info, log *logp.Logger, store statestore.States) []v2.Plugin { return append( xpackInputs(info, log, store), ossinputs.Init(info, log, store)..., diff --git a/x-pack/filebeat/input/default-inputs/inputs_aix.go b/x-pack/filebeat/input/default-inputs/inputs_aix.go index d3fa76b10890..6e9adca0ea7b 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_aix.go +++ b/x-pack/filebeat/input/default-inputs/inputs_aix.go @@ -5,9 +5,9 @@ package inputs import ( - "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics" "github.com/elastic/beats/v7/x-pack/filebeat/input/http_endpoint" @@ -18,7 +18,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { +func xpackInputs(info beat.Info, log *logp.Logger, store statestore.States) []v2.Plugin { return []v2.Plugin{ entityanalytics.Plugin(log), http_endpoint.Plugin(), diff --git a/x-pack/filebeat/input/default-inputs/inputs_darwin.go b/x-pack/filebeat/input/default-inputs/inputs_darwin.go index b43d75258b35..f299f1c982dd 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_darwin.go +++ b/x-pack/filebeat/input/default-inputs/inputs_darwin.go @@ -7,9 +7,9 @@ package inputs import ( - "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage" @@ -30,7 +30,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { +func xpackInputs(info beat.Info, log *logp.Logger, store statestore.States) []v2.Plugin { return []v2.Plugin{ azureblobstorage.Plugin(log, store), azureeventhub.Plugin(log), diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index a14145a5265f..4ed63215b482 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -7,9 +7,9 @@ package inputs import ( - "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage" @@ -29,7 +29,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { +func xpackInputs(info beat.Info, log *logp.Logger, store statestore.States) []v2.Plugin { return []v2.Plugin{ azureblobstorage.Plugin(log, store), azureeventhub.Plugin(log), diff --git a/x-pack/filebeat/input/default-inputs/inputs_windows.go b/x-pack/filebeat/input/default-inputs/inputs_windows.go index 4b27c3b35c2f..19805da61211 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_windows.go +++ b/x-pack/filebeat/input/default-inputs/inputs_windows.go @@ -7,9 +7,9 @@ package inputs import ( - "github.com/elastic/beats/v7/filebeat/beater" v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage" @@ -29,7 +29,7 @@ import ( "github.com/elastic/elastic-agent-libs/logp" ) -func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2.Plugin { +func xpackInputs(info beat.Info, log *logp.Logger, store statestore.States) []v2.Plugin { return []v2.Plugin{ azureblobstorage.Plugin(log, store), azureeventhub.Plugin(log), diff --git a/x-pack/filebeat/input/gcs/input.go b/x-pack/filebeat/input/gcs/input.go index 77580f70e401..67151ac074a4 100644 --- a/x-pack/filebeat/input/gcs/input.go +++ b/x-pack/filebeat/input/gcs/input.go @@ -15,6 +15,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -33,7 +34,7 @@ const ( inputName = "gcs" ) -func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { +func Plugin(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Stable, diff --git a/x-pack/filebeat/input/httpjson/input.go b/x-pack/filebeat/input/httpjson/input.go index 90f9b124b781..72b73858c564 100644 --- a/x-pack/filebeat/input/httpjson/input.go +++ b/x-pack/filebeat/input/httpjson/input.go @@ -31,6 +31,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog" "github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httpmon" @@ -84,7 +85,7 @@ func (log *retryLogger) Warn(msg string, keysAndValues ...interface{}) { log.log.Warnw(msg, keysAndValues...) } -func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { +func Plugin(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Stable, diff --git a/x-pack/filebeat/input/httpjson/input_manager.go b/x-pack/filebeat/input/httpjson/input_manager.go index d0fea886a355..459012ac8ef0 100644 --- a/x-pack/filebeat/input/httpjson/input_manager.go +++ b/x-pack/filebeat/input/httpjson/input_manager.go @@ -13,6 +13,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" ) @@ -26,7 +27,7 @@ type InputManager struct { var _ v2.InputManager = InputManager{} -func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManager { +func NewInputManager(log *logp.Logger, store statestore.States) InputManager { sim := stateless.NewInputManager(statelessConfigure) return InputManager{ stateless: &sim, diff --git a/x-pack/filebeat/input/o365audit/input.go b/x-pack/filebeat/input/o365audit/input.go index b01ba98ac907..a5b1e7d9c0b9 100644 --- a/x-pack/filebeat/input/o365audit/input.go +++ b/x-pack/filebeat/input/o365audit/input.go @@ -17,6 +17,7 @@ import ( cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/version" "github.com/elastic/beats/v7/x-pack/filebeat/input/o365audit/poll" conf "github.com/elastic/elastic-agent-libs/config" @@ -51,7 +52,7 @@ type apiEnvironment struct { Clock func() time.Time } -func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin { +func Plugin(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{ Name: pluginName, Stability: feature.Experimental, diff --git a/x-pack/filebeat/input/salesforce/input.go b/x-pack/filebeat/input/salesforce/input.go index 081b45dfd134..657f0d315f4f 100644 --- a/x-pack/filebeat/input/salesforce/input.go +++ b/x-pack/filebeat/input/salesforce/input.go @@ -30,6 +30,7 @@ import ( inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/go-concert/ctxtool" @@ -57,7 +58,7 @@ type salesforceInput struct { // var userAgent = useragent.UserAgent("Filebeat", version.GetDefaultVersion(), version.Commit(), version.BuildTime().String()) // Plugin returns the input plugin. -func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { +func Plugin(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Stable, diff --git a/x-pack/filebeat/input/salesforce/input_manager.go b/x-pack/filebeat/input/salesforce/input_manager.go index 49ef0513cdf4..9e2f78c1a978 100644 --- a/x-pack/filebeat/input/salesforce/input_manager.go +++ b/x-pack/filebeat/input/salesforce/input_manager.go @@ -12,6 +12,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport/httpcommon" @@ -28,7 +29,7 @@ type InputManager struct { } // NewInputManager creates a new input manager. -func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManager { +func NewInputManager(log *logp.Logger, store statestore.States) InputManager { return InputManager{ cursor: &inputcursor.InputManager{ Logger: log, diff --git a/x-pack/filebeat/input/salesforce/input_manager_test.go b/x-pack/filebeat/input/salesforce/input_manager_test.go index fc69f9180401..c1739f0723ac 100644 --- a/x-pack/filebeat/input/salesforce/input_manager_test.go +++ b/x-pack/filebeat/input/salesforce/input_manager_test.go @@ -10,7 +10,6 @@ import ( "github.com/stretchr/testify/assert" - cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" conf "github.com/elastic/elastic-agent-libs/config" @@ -32,16 +31,16 @@ func makeTestStore(data map[string]interface{}) *statestore.Store { return store } +// compile-time check if stateStore implements statestore.States +var _ statestore.States = stateStore{} + type stateStore struct{} -func (stateStore) Access(_ string) (*statestore.Store, error) { +func (stateStore) StoreFor(string) (*statestore.Store, error) { return makeTestStore(map[string]interface{}{"hello": "world"}), nil } func (stateStore) CleanupInterval() time.Duration { return time.Duration(0) } -// compile-time check if stateStore implements cursor.StateStore -var _ cursor.StateStore = stateStore{} - func TestInputManager(t *testing.T) { inputManager := NewInputManager(logp.NewLogger("salesforce_test"), stateStore{}) diff --git a/x-pack/filebeat/input/streaming/input.go b/x-pack/filebeat/input/streaming/input.go index 3df6b3845548..9e53f9c6956a 100644 --- a/x-pack/filebeat/input/streaming/input.go +++ b/x-pack/filebeat/input/streaming/input.go @@ -20,6 +20,7 @@ import ( inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/go-concert/ctxtool" "github.com/elastic/mito/lib" @@ -42,7 +43,7 @@ const ( root string = "state" ) -func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { +func Plugin(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Experimental, @@ -53,7 +54,7 @@ func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { } } -func PluginWebsocketAlias(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { +func PluginWebsocketAlias(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{ Name: "websocket", Stability: feature.Experimental, diff --git a/x-pack/filebeat/input/streaming/input_manager.go b/x-pack/filebeat/input/streaming/input_manager.go index 6a1bd8bc5a46..7d2d60a7834a 100644 --- a/x-pack/filebeat/input/streaming/input_manager.go +++ b/x-pack/filebeat/input/streaming/input_manager.go @@ -9,6 +9,7 @@ import ( v2 "github.com/elastic/beats/v7/filebeat/input/v2" inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" + "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" ) @@ -22,7 +23,7 @@ type InputManager struct { var _ v2.InputManager = InputManager{} -func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManager { +func NewInputManager(log *logp.Logger, store statestore.States) InputManager { return InputManager{ cursor: &inputcursor.InputManager{ Logger: log, diff --git a/x-pack/filebeat/input/unifiedlogs/input.go b/x-pack/filebeat/input/unifiedlogs/input.go index c67f48b850fe..11fbeb0f5f20 100644 --- a/x-pack/filebeat/input/unifiedlogs/input.go +++ b/x-pack/filebeat/input/unifiedlogs/input.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/feature" "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/beats/v7/libbeat/statestore" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -43,7 +44,7 @@ var ( timeNow = time.Now ) -func Plugin(log *logp.Logger, store inputcursor.StateStore) v2.Plugin { +func Plugin(log *logp.Logger, store statestore.States) v2.Plugin { return v2.Plugin{ Name: inputName, Stability: feature.Stable,