Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filebeat,libbeat,x-pack/filebeat: clean up state store types #43063

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"path/filepath"
"strings"
"sync"
"time"

"github.com/elastic/beats/v7/filebeat/backup"
"github.com/elastic/beats/v7/filebeat/channel"
Expand Down Expand Up @@ -79,15 +78,7 @@ type Filebeat struct {
pipeline beat.PipelineConnector
}

type PluginFactory func(beat.Info, *logp.Logger, StateStore) []v2.Plugin

type StateStore interface {
// Access returns the storage registry depending on the type.
// The value of typ is expected to have been obtained from
// cursor.InputManager.Type and represents the input type.
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}
type PluginFactory func(beat.Info, *logp.Logger, statestore.States) []v2.Plugin

// New creates a new Filebeat pointer instance.
func New(plugins PluginFactory) beat.Creator {
Expand Down Expand Up @@ -569,7 +560,7 @@ func newPipelineLoaderFactory(ctx context.Context, esConfig *conf.C) fileset.Pip

// some of the filestreams might want to take over the loginput state
// if their `take_over` flag is set to `true`.
func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
func processLogInputTakeOver(stateStore statestore.States, config *cfg.Config) error {
inputs, err := fetchInputConfiguration(config)
if err != nil {
return fmt.Errorf("Failed to fetch input configuration when attempting take over: %w", err)
Expand All @@ -578,7 +569,7 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error {
return nil
}

store, err := stateStore.Access("")
store, err := stateStore.StoreFor("")
if err != nil {
return fmt.Errorf("Failed to access state when attempting take over: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions filebeat/beater/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/elastic/elastic-agent-libs/paths"
)

var _ statestore.States = (*filebeatStore)(nil)

type filebeatStore struct {
registry *statestore.Registry
esRegistry *statestore.Registry
Expand Down Expand Up @@ -84,8 +86,8 @@ func (s *filebeatStore) Close() {
s.registry.Close()
}

// Access returns the storage registry depending on the type. Default is the file store.
func (s *filebeatStore) Access(typ string) (*statestore.Store, error) {
// AccessType returns the storage registry depending on the type. Default is the file store.
func (s *filebeatStore) StoreFor(typ string) (*statestore.Store, error) {
if features.IsElasticsearchStateStoreEnabledForInput(typ) && s.esRegistry != nil {
return s.esRegistry.Get(s.storeName)
}
Expand Down
6 changes: 3 additions & 3 deletions filebeat/input/default-inputs/inputs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@
package inputs

import (
"github.com/elastic/beats/v7/filebeat/beater"
"github.com/elastic/beats/v7/filebeat/input/filestream"
"github.com/elastic/beats/v7/filebeat/input/kafka"
"github.com/elastic/beats/v7/filebeat/input/tcp"
"github.com/elastic/beats/v7/filebeat/input/udp"
"github.com/elastic/beats/v7/filebeat/input/unix"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/elastic-agent-libs/logp"
)

func Init(info beat.Info, log *logp.Logger, components beater.StateStore) []v2.Plugin {
func Init(info beat.Info, log *logp.Logger, components statestore.States) []v2.Plugin {
return append(
genericInputs(log, components),
osInputs(info, log, components)...,
)
}

func genericInputs(log *logp.Logger, components beater.StateStore) []v2.Plugin {
func genericInputs(log *logp.Logger, components statestore.States) []v2.Plugin {
return []v2.Plugin{
filestream.Plugin(log, components),
kafka.Plugin(),
Expand Down
8 changes: 2 additions & 6 deletions filebeat/input/default-inputs/inputs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,14 @@ package inputs
import (
"github.com/elastic/beats/v7/filebeat/input/journald"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/elastic-agent-libs/logp"
)

// inputs that are only supported on linux

type osComponents interface {
cursor.StateStore
}

func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin {
func osInputs(info beat.Info, log *logp.Logger, components statestore.States) []v2.Plugin {
var plugins []v2.Plugin

zeroPlugin := v2.Plugin{}
Expand Down
8 changes: 2 additions & 6 deletions filebeat/input/default-inputs/inputs_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,13 @@ package inputs

import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/filebeat/input/winlog"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/elastic-agent-libs/logp"
)

type osComponents interface {
cursor.StateStore
}

func osInputs(info beat.Info, log *logp.Logger, components osComponents) []v2.Plugin {
func osInputs(info beat.Info, log *logp.Logger, components statestore.States) []v2.Plugin {
return []v2.Plugin{
winlog.Plugin(log, components),
}
Expand Down
14 changes: 8 additions & 6 deletions filebeat/input/filestream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import (
type inputTestingEnvironment struct {
t *testing.T
workingDir string
stateStore loginp.StateStore
stateStore statestore.States
pipeline *mockPipelineConnector

pluginInitOnce sync.Once
Expand Down Expand Up @@ -194,7 +194,7 @@ func (e *inputTestingEnvironment) abspath(filename string) string {
}

func (e *inputTestingEnvironment) requireRegistryEntryCount(expectedCount int) {
inputStore, _ := e.stateStore.Access("")
inputStore, _ := e.stateStore.StoreFor("")

actual := 0
err := inputStore.Each(func(_ string, _ statestore.ValueDecoder) (bool, error) {
Expand Down Expand Up @@ -331,7 +331,7 @@ func (e *inputTestingEnvironment) requireNoEntryInRegistry(filename, inputID str
e.t.Fatalf("cannot stat file when cheking for offset: %+v", err)
}

inputStore, _ := e.stateStore.Access("")
inputStore, _ := e.stateStore.StoreFor("")
id := getIDFromPath(filepath, inputID, fi)

var entry registryEntry
Expand All @@ -352,7 +352,7 @@ func (e *inputTestingEnvironment) requireOffsetInRegistryByID(key string, expect
}

func (e *inputTestingEnvironment) getRegistryState(key string) (registryEntry, error) {
inputStore, _ := e.stateStore.Access("")
inputStore, _ := e.stateStore.StoreFor("")

var entry registryEntry
err := inputStore.Get(key, &entry)
Expand Down Expand Up @@ -539,11 +539,13 @@ func (e *inputTestingEnvironment) requireEventTimestamp(nr int, ts string) {
require.True(e.t, selectedEvent.Timestamp.Equal(tm), "got: %s, expected: %s", selectedEvent.Timestamp.String(), tm.String())
}

var _ statestore.States = (*testInputStore)(nil)

type testInputStore struct {
registry *statestore.Registry
}

func openTestStatestore() loginp.StateStore {
func openTestStatestore() statestore.States {
return &testInputStore{
registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend()),
}
Expand All @@ -553,7 +555,7 @@ func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
func (s *testInputStore) StoreFor(string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import (
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/reader/readfile"
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
"github.com/elastic/beats/v7/libbeat/statestore"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -67,7 +68,7 @@ type filestream struct {
}

// Plugin creates a new filestream input plugin for creating a stateful input.
func Plugin(log *logp.Logger, store loginp.StateStore) input.Plugin {
func Plugin(log *logp.Logger, store statestore.States) input.Plugin {
return input.Plugin{
Name: pluginName,
Stability: feature.Stable,
Expand Down
7 changes: 4 additions & 3 deletions filebeat/input/filestream/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"github.com/stretchr/testify/require"

loginp "github.com/elastic/beats/v7/filebeat/input/filestream/internal/input-logfile"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/statestore"
Expand Down Expand Up @@ -235,10 +234,12 @@ func generateFile(t testing.TB, dir string, lineCount int) string {
return filename
}

func createTestStore(t testing.TB) loginp.StateStore {
func createTestStore(t testing.TB) statestore.States {
return &testStore{registry: statestore.NewRegistry(storetest.NewMemoryStoreBackend())}
}

var _ statestore.States = (*testStore)(nil)

type testStore struct {
registry *statestore.Registry
}
Expand All @@ -247,7 +248,7 @@ func (s *testStore) Close() {
s.registry.Close()
}

func (s *testStore) Access(_ string) (*statestore.Store, error) {
func (s *testStore) StoreFor(string) (*statestore.Store, error) {
return s.registry.Get("filestream-benchmark")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type InputManager struct {
Logger *logp.Logger

// StateStore gives the InputManager access to the persistent key value store.
StateStore StateStore
StateStore statestore.States

// Type must contain the name of the input type. It is used to create the key name
// for all sources the inputs collect from.
Expand Down Expand Up @@ -87,12 +87,6 @@ var errNoInputRunner = errors.New("no input runner available")
// Deprecated: Inputs without an ID are not supported anymore.
const globalInputID = ".global"

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

func (cim *InputManager) init() error {
cim.initOnce.Do(func() {

Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/filestream/internal/input-logfile/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,10 @@ type (
// hook into store close for testing purposes
var closeStore = (*store).close

func openStore(log *logp.Logger, statestore StateStore, prefix string) (*store, error) {
func openStore(log *logp.Logger, statestore statestore.States, prefix string) (*store, error) {
ok := false

persistentStore, err := statestore.Access("")
persistentStore, err := statestore.StoreFor("")
if err != nil {
return nil, err
}
Expand Down
16 changes: 9 additions & 7 deletions filebeat/input/filestream/internal/input-logfile/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,6 @@ import (
"github.com/elastic/go-concert/unison"
)

type testStateStore struct {
Store *statestore.Store
GCPeriod time.Duration
}

func TestResource_CopyInto(t *testing.T) {
src := resource{lock: unison.MakeMutex()}
dst := resource{lock: unison.MakeMutex()}
Expand Down Expand Up @@ -476,7 +471,7 @@ func closeStoreWith(fn func(s *store)) func() {
}

//nolint:unparam // It's a test helper
func testOpenStore(t *testing.T, prefix string, persistentStore StateStore) *store {
func testOpenStore(t *testing.T, prefix string, persistentStore statestore.States) *store {
if persistentStore == nil {
persistentStore = createSampleStore(t, nil)
}
Expand Down Expand Up @@ -506,9 +501,16 @@ func createSampleStore(t *testing.T, data map[string]state) testStateStore {
}
}

var _ statestore.States = testStateStore{}

type testStateStore struct {
Store *statestore.Store
GCPeriod time.Duration
}

func (ts testStateStore) WithGCPeriod(d time.Duration) testStateStore { ts.GCPeriod = d; return ts }
func (ts testStateStore) CleanupInterval() time.Duration { return ts.GCPeriod }
func (ts testStateStore) Access(string) (*statestore.Store, error) {
func (ts testStateStore) StoreFor(string) (*statestore.Store, error) {
if ts.Store == nil {
return nil, errors.New("no store configured")
}
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/journald/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func (e *inputTestingEnvironment) RequireStatuses(expected []statusUpdate) {
}
}

var _ statestore.States = (*testInputStore)(nil)

type testInputStore struct {
registry *statestore.Registry
}
Expand All @@ -161,7 +163,7 @@ func (s *testInputStore) Close() {
s.registry.Close()
}

func (s *testInputStore) Access(_ string) (*statestore.Store, error) {
func (s *testInputStore) StoreFor(string) (*statestore.Store, error) {
return s.registry.Get("filebeat")
}

Expand Down
3 changes: 2 additions & 1 deletion filebeat/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/elastic/beats/v7/libbeat/management/status"
"github.com/elastic/beats/v7/libbeat/reader"
"github.com/elastic/beats/v7/libbeat/reader/parser"
"github.com/elastic/beats/v7/libbeat/statestore"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
)
Expand Down Expand Up @@ -72,7 +73,7 @@ const localSystemJournalID = "LOCAL_SYSTEM_JOURNAL"
const pluginName = "journald"

// Plugin creates a new journald input plugin for creating a stateful input.
func Plugin(log *logp.Logger, store cursor.StateStore) input.Plugin {
func Plugin(log *logp.Logger, store statestore.States) input.Plugin {
return input.Plugin{
Name: pluginName,
Stability: feature.Stable,
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/journald/input_filtering_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ func TestInputSeek(t *testing.T) {
env := newInputTestingEnvironment(t)

if testCase.cursor != "" {
store, _ := env.stateStore.Access("")
store, _ := env.stateStore.StoreFor("")
tmp := map[string]any{}
if err := json.Unmarshal([]byte(testCase.cursor), &tmp); err != nil {
t.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions filebeat/input/journald/input_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ package journald

import (
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
cursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/elastic-agent-libs/logp"
)

func Plugin(log *logp.Logger, store cursor.StateStore) v2.Plugin {
func Plugin(log *logp.Logger, store statestore.States) v2.Plugin {
return v2.Plugin{}
}
10 changes: 2 additions & 8 deletions filebeat/input/v2/input-cursor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import (
type InputManager struct {
Logger *logp.Logger

// StateStore gives the InputManager access to the persitent key value store.
StateStore StateStore
// StateStore gives the InputManager access to the persistent key value store.
StateStore statestore.States

// Type must contain the name of the input type. It is used to create the key name
// for all sources the inputs collect from.
Expand Down Expand Up @@ -80,12 +80,6 @@ var (
errNoInputRunner = errors.New("no input runner available")
)

// StateStore interface and configurations used to give the Manager access to the persistent store.
type StateStore interface {
Access(typ string) (*statestore.Store, error)
CleanupInterval() time.Duration
}

// init initializes the state store
// This function is called from:
// 1. InputManager::Init on beat start
Expand Down
Loading
Loading