From ac5740945191799367d7a9c0ed976f104040219b Mon Sep 17 00:00:00 2001 From: ShourieG Date: Tue, 7 Jan 2025 12:55:49 +0530 Subject: [PATCH 1/6] [filebeat][streaming] - Added more retry codes to websocket retry logic (#42218) --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/streaming/websocket.go | 3 +++ 2 files changed, 4 insertions(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 3e825c8c0984..51fed7692a11 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -203,6 +203,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019] - The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] - Fix Netflow Template Sharing configuration handling. {pull}42080[42080] +- Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218] *Heartbeat* diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 225bc76e8d9f..d21eb9c21b47 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -176,6 +176,9 @@ func isRetryableError(err error) bool { websocket.CloseInternalServerErr, websocket.CloseTryAgainLater, websocket.CloseServiceRestart, + websocket.CloseAbnormalClosure, + websocket.CloseMessageTooBig, + websocket.CloseNoStatusReceived, websocket.CloseTLSHandshake: return true } From 7e25c4d1b7ac561a0b1c2998c50407bbb83b9431 Mon Sep 17 00:00:00 2001 From: Andrew Gizas Date: Tue, 7 Jan 2025 09:49:59 +0200 Subject: [PATCH 2/6] [Kubernetes Integration] Fix for apiserver token expiration (#42016) * initial fix for apiserver * adding fix for controller and schedule --- CHANGELOG.next.asciidoc | 1 + metricbeat/helper/prometheus/prometheus.go | 12 ++++ .../module/kubernetes/apiserver/metricset.go | 59 ++++++++++++++---- .../controllermanager/controllermanager.go | 59 ++++++++++++++---- .../module/kubernetes/scheduler/scheduler.go | 60 +++++++++++++++---- 5 files changed, 156 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 51fed7692a11..33feb91cff20 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -237,6 +237,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Do not report non-existant 0 values for RSS metrics in docker/memory {pull}41449[41449] - Log Cisco Meraki `getDevicePerformanceScores` errors without stopping metrics collection. {pull}41622[41622] - Don't skip first bucket value in GCP metrics metricset for distribution type metrics {pull}41822[41822] +- [K8s Integration] Enhance HTTP authentication in case of token updates for Apiserver, Controllermanager and Scheduler metricsets {issue}41910[41910] {pull}42016[42016] - Fixed `creation_date` scientific notation output in the `elasticsearch.index` metricset. {pull}42053[42053] - Fix bug where metricbeat unintentionally triggers Windows ASR. {pull}42177[42177] diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go index f4e06df7e1f1..50929c128c37 100644 --- a/metricbeat/helper/prometheus/prometheus.go +++ b/metricbeat/helper/prometheus/prometheus.go @@ -35,6 +35,9 @@ const acceptHeader = `text/plain;version=0.0.4;q=0.5,*/*;q=0.1` // Prometheus helper retrieves prometheus formatted metrics type Prometheus interface { + // GetHttp returns the HTTP Client that handles the connection towards remote endpoint + GetHttp() (*helper.HTTP, error) + // GetFamilies requests metric families from prometheus endpoint and returns them GetFamilies() ([]*MetricFamily, error) @@ -66,6 +69,15 @@ func NewPrometheusClient(base mb.BaseMetricSet) (Prometheus, error) { return &prometheus{http, base.Logger()}, nil } +// GetHttp returns HTTP Client +func (p *prometheus) GetHttp() (*helper.HTTP, error) { + httpClient, ok := p.httpfetcher.(*helper.HTTP) + if !ok { + return nil, fmt.Errorf("httpfetcher is not of type *helper.HTTP") + } + return httpClient, nil +} + // GetFamilies requests metric families from prometheus endpoint and returns them func (p *prometheus) GetFamilies() ([]*MetricFamily, error) { var reader io.Reader diff --git a/metricbeat/module/kubernetes/apiserver/metricset.go b/metricbeat/module/kubernetes/apiserver/metricset.go index 9dd9a81976d2..5457093e5536 100644 --- a/metricbeat/module/kubernetes/apiserver/metricset.go +++ b/metricbeat/module/kubernetes/apiserver/metricset.go @@ -19,9 +19,14 @@ package apiserver import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -29,9 +34,11 @@ import ( // Metricset for apiserver is a prometheus based metricset type Metricset struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } var _ mb.ReportingMetricSetV2Error = (*Metricset)(nil) @@ -41,11 +48,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &Metricset{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil @@ -54,20 +73,36 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *Metricset) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) - } - isOpen := reporter.Event(event) - if !isOpen { - return nil + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } + return nil } - - return nil } diff --git a/metricbeat/module/kubernetes/controllermanager/controllermanager.go b/metricbeat/module/kubernetes/controllermanager/controllermanager.go index dbfcddc2b6be..6c7b1c8ae528 100644 --- a/metricbeat/module/kubernetes/controllermanager/controllermanager.go +++ b/metricbeat/module/kubernetes/controllermanager/controllermanager.go @@ -19,9 +19,14 @@ package controllermanager import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -74,9 +79,11 @@ func init() { // MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling. type MetricSet struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } // New create a new instance of the MetricSet @@ -87,11 +94,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &MetricSet{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil } @@ -99,19 +118,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) - } - isOpen := reporter.Event(event) - if !isOpen { - return nil + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } - } - return nil + return nil + } } diff --git a/metricbeat/module/kubernetes/scheduler/scheduler.go b/metricbeat/module/kubernetes/scheduler/scheduler.go index f512c96b7f2f..1b563ad000af 100644 --- a/metricbeat/module/kubernetes/scheduler/scheduler.go +++ b/metricbeat/module/kubernetes/scheduler/scheduler.go @@ -19,9 +19,14 @@ package scheduler import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -78,9 +83,11 @@ func init() { // MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling. type MetricSet struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } // New create a new instance of the MetricSet @@ -91,11 +98,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &MetricSet{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil } @@ -103,20 +122,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } - isOpen := reporter.Event(event) - if !isOpen { - return nil - } - } - return nil + return nil + } } From 177a47a01599c754c190e1e27da11aafafdd715c Mon Sep 17 00:00:00 2001 From: ShourieG Date: Tue, 7 Jan 2025 17:38:56 +0530 Subject: [PATCH 3/6] [filebeat][websocket] - Added infinite & blanket retry options to websockets and improved logging and retry logic (#42225) * added blanket & infinite retry options and improved logging --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-streaming.asciidoc | 14 +++++- x-pack/filebeat/input/streaming/config.go | 10 +++-- .../filebeat/input/streaming/config_test.go | 12 ++++++ x-pack/filebeat/input/streaming/input_test.go | 2 +- x-pack/filebeat/input/streaming/websocket.go | 43 +++++++++++++------ 6 files changed, 63 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 33feb91cff20..c3a20e925c26 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -383,6 +383,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] - Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094] +- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index 7f07fb4954f6..85a7c02467af 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -316,7 +316,7 @@ This specifies whether fields should be replaced with a `*` or deleted entirely [float] ==== `retry` -The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. +The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. It also supports blanket retries and infinite retries via the `blanket_retires` and `infinite_retries` configuration options. These are set to `false` by default. ["source","yaml",subs="attributes"] ---- @@ -333,6 +333,8 @@ filebeat.inputs: max_attempts: 5 wait_min: 1s wait_max: 10s + blanket_retries: false + infinite_retries: false ---- [float] ==== `retry.max_attempts` @@ -349,6 +351,16 @@ The minimum time to wait between retries. This ensures that retries are spaced o The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. The default value is `30` seconds. +[float] +==== `retry.blanket_retries` + +Normally the input will only retry when a connection error is found to be retryable based on the error type and the RFC 6455 error codes defined by the websocket protocol. If `blanket_retries` is set to `true` (`false` by default) the input will retry on any error. This is not recommended unless the user is certain that all errors are transient and can be resolved by retrying. + +[float] +==== `retry.infinite_retries` + +Normally the input will only retry a maximum of `max_attempts` times. If `infinite_retries` is set to `true` (`false` by default) the input will retry indefinitely. This is not recommended unless the user is certain that the connection will eventually succeed. + [float] === `timeout` Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds. diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index eea8c2afc704..df557d553de2 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -59,9 +59,11 @@ type redact struct { } type retry struct { - MaxAttempts int `config:"max_attempts"` - WaitMin time.Duration `config:"wait_min"` - WaitMax time.Duration `config:"wait_max"` + MaxAttempts int `config:"max_attempts"` + WaitMin time.Duration `config:"wait_min"` + WaitMax time.Duration `config:"wait_max"` + BlanketRetries bool `config:"blanket_retries"` + InfiniteRetries bool `config:"infinite_retries"` } type authConfig struct { @@ -136,7 +138,7 @@ func (c config) Validate() error { if c.Retry != nil { switch { - case c.Retry.MaxAttempts <= 0: + case c.Retry.MaxAttempts <= 0 && !c.Retry.InfiniteRetries: return errors.New("max_attempts must be greater than zero") case c.Retry.WaitMin > c.Retry.WaitMax: return errors.New("wait_min must be less than or equal to wait_max") diff --git a/x-pack/filebeat/input/streaming/config_test.go b/x-pack/filebeat/input/streaming/config_test.go index 840c35d400fa..437267bc7b71 100644 --- a/x-pack/filebeat/input/streaming/config_test.go +++ b/x-pack/filebeat/input/streaming/config_test.go @@ -130,6 +130,18 @@ var configTests = []struct { "url": "wss://localhost:443/v1/stream", }, }, + { + name: "valid_retry_with_infinite", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "infinite_retries": true, + "max_attempts": 0, + "wait_min": "1s", + "wait_max": "2s", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, } func TestConfig(t *testing.T) { diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index c11784ea3dbf..e4a8eac1d417 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -450,7 +450,7 @@ var inputTests = []struct { "wait_max": "2s", }, }, - wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"), + wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake and (status 403)"), }, { name: "single_event_tls", diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index d21eb9c21b47..584852aabcce 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -118,7 +118,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { _, message, err := c.ReadMessage() if err != nil { s.metrics.errorsTotal.Inc() - if !isRetryableError(err) { + if !s.cfg.Retry.BlanketRetries && !isRetryableError(err) { s.log.Errorw("failed to read websocket data", "error", err) return err } @@ -233,21 +233,38 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log } if cfg.Retry != nil { retryConfig := cfg.Retry - for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { - conn, response, err = dialer.DialContext(ctx, url, headers) - if err == nil { - return conn, response, nil + if !retryConfig.InfiniteRetries { + for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { + conn, response, err = dialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + //nolint:errorlint // it will never be a wrapped error at this point + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + } else { + log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode) + } + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) } - //nolint:errorlint // it will never be a wrapped error at this point - if err == websocket.ErrBadHandshake { - log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) - continue + return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w and (status %d)", retryConfig.MaxAttempts, err, response.StatusCode) + } else { + for attempt := 1; ; attempt++ { + conn, response, err = dialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + //nolint:errorlint // it will never be a wrapped error at this point + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + } else { + log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode) + } + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) } - log.Debugf("attempt %d: webSocket connection failed. retrying...\n", attempt) - waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) - time.Sleep(waitTime) } - return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err) } return dialer.DialContext(ctx, url, headers) From 6474305f130e18754b767ecceb5b6f476371b4ba Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Tue, 7 Jan 2025 14:02:01 +0100 Subject: [PATCH 4/6] github-actions: python 3.12 is the default when using ubuntu-latest (#42233) github-actions: python 3.12 is the default when using ubuntu-latest * check-docs uses python too * single quote --- .github/workflows/check-default.yml | 5 +++++ .github/workflows/check-docs.yml | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/.github/workflows/check-default.yml b/.github/workflows/check-default.yml index f394815356c4..9edcf7c2654f 100644 --- a/.github/workflows/check-default.yml +++ b/.github/workflows/check-default.yml @@ -20,6 +20,11 @@ jobs: - uses: actions/setup-go@v5 with: go-version-file: .go-version + #  when using ubuntu-latest, python 3.10 is not the default version. + - name: Fix Code is not compatible with Python 3.12 + uses: actions/setup-python@v4 + with: + python-version: '3.10' - name: Run check-default run: | go install github.com/magefile/mage diff --git a/.github/workflows/check-docs.yml b/.github/workflows/check-docs.yml index a2f26979ec40..50ef425ae33a 100644 --- a/.github/workflows/check-docs.yml +++ b/.github/workflows/check-docs.yml @@ -28,6 +28,11 @@ jobs: run: sudo apt-get install -y libsystemd-dev - name: Install librpm-dev run: sudo apt-get install -y librpm-dev + #  when using ubuntu-latest, python 3.10 is not the default version. + - name: Fix Code is not compatible with Python 3.12 + uses: actions/setup-python@v4 + with: + python-version: '3.10' - name: Run check run: | make check From 4244fa247e08e266f58c64938a8d27593a602c99 Mon Sep 17 00:00:00 2001 From: Shourie Ganguly Date: Tue, 7 Jan 2025 22:07:09 +0530 Subject: [PATCH 5/6] [filebeat][streaming] - Added OAuth2 support with auto token refresh for websockets (#42212) * Added OAuth2 support with auto token refresh for websockets. A manual token refresh logic had to be implemented since the OAuth2 client and the websocket client are separate entities and cannot be clubbed together. --- CHANGELOG.next.asciidoc | 1 + .../docs/inputs/input-streaming.asciidoc | 45 +++- x-pack/filebeat/input/streaming/config.go | 47 +++- .../filebeat/input/streaming/config_test.go | 73 ++++++ x-pack/filebeat/input/streaming/input.go | 6 +- .../filebeat/input/streaming/input_manager.go | 1 - x-pack/filebeat/input/streaming/input_test.go | 217 +++++++++++++++++- x-pack/filebeat/input/streaming/websocket.go | 85 ++++++- 8 files changed, 454 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c3a20e925c26..f21fd1c56402 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -383,6 +383,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Rate limiting operability improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] - Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] - Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094] +- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212] - Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225] *Auditbeat* diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index 85a7c02467af..1ee343e4a9b7 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -20,6 +20,7 @@ The websocket streaming input supports: ** Basic ** Bearer ** Custom +** OAuth2.0 NOTE: The `streaming` input websocket handler does not currently support XML messages. Auto-reconnects are also not supported at the moment so reconnection will occur on input restart. @@ -113,7 +114,7 @@ This will include any sensitive or secret information kept in the `state` object ==== Authentication -The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication and authentication via a custom auth config. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively. +The websocket streaming input supports authentication via Basic token authentication, Bearer token authentication, authentication via a custom auth config and OAuth2 based authentication. Unlike REST inputs Basic Authentication contains a basic auth token, Bearer Authentication contains a bearer token and custom auth contains any combination of custom header and value. These token/key values are are added to the request headers and are not exposed to the `state` object. The custom auth configuration is useful for constructing requests that require custom headers and values for authentication. The basic and bearer token configurations will always use the `Authorization` header and prepend the token with `Basic` or `Bearer` respectively. Example configurations with authentication: @@ -166,6 +167,48 @@ filebeat.inputs: token_url: https://api.crowdstrike.com/oauth2/token ---- +==== Websocket OAuth2.0 + +The `websocket` streaming input supports OAuth2.0 authentication. The `auth` configuration field is used to specify the OAuth2.0 configuration. These values are not exposed to the `state` object. + +The `auth` configuration field has the following subfields: + + - `client_id`: The client ID to use for OAuth2.0 authentication. + - `client_secret`: The client secret to use for OAuth2.0 authentication. + - `token_url`: The token URL to use for OAuth2.0 authentication. + - `scopes`: The scopes to use for OAuth2.0 authentication. + - `endpoint_params`: The endpoint parameters to use for OAuth2.0 authentication. + - `auth_style`: The authentication style to use for OAuth2.0 authentication. If left unset, the style will be automatically detected. + - `token_expiry_buffer`: Minimum valid time remaining before attempting an OAuth2 token renewal. The default value is `2m`. + +**Explanations for `auth_style` and `token_expiry_buffer`:** + +- `auth_style`: The authentication style to use for OAuth2.0 authentication which determines how the values of sensitive information like `client_id` and `client_secret` are sent in the token request. The default style value is automatically inferred and used appropriately if no value is provided. The `auth_style` configuration field is optional and can be used to specify the authentication style to use for OAuth2.0 authentication. The `auth_style` configuration field supports the following configurable values: + + * `in_header`: The `client_id` and `client_secret` is sent in the header as a base64 encoded `Authorization` header. + * `in_params`: The `client_id` and `client_secret` is sent in the request body along with the other OAuth2 parameters. + +- `token_expiry_buffer`: The token expiry buffer to use for OAuth2.0 authentication. The `token_expiry_buffer` is used as a safety net to ensure that the token does not expire before the input can refresh it. The `token_expiry_buffer` configuration field is optional. If the `token_expiry_buffer` configuration field is not set, the default value of `2m` is used. + +NOTE: We recommend leaving the `auth_style` configuration field unset (automatically inferred internally) for most scenarios, except where manual intervention is required. + +["source","yaml",subs="attributes"] +---- +filebeat.inputs: +- type: streaming + auth: + client_id: a23fcea2643868ef1a41565a1a8a1c7c + client_secret: c3VwZXJzZWNyZXRfY2xpZW50X3NlY3JldF9zaGhoaGgK + token_url: https://api.sample-url.com/oauth2/token + scopes: ["read", "write"] + endpoint_params: + param1: value1 + param2: value2 + auth_style: in_params + token_expiry_buffer: 5m + url: wss://localhost:443/_stream +---- + [[input-state-streaming]] ==== Input state diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index df557d553de2..753c36febf3a 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -12,10 +12,17 @@ import ( "regexp" "time" + "golang.org/x/oauth2" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) +const ( + authStyleInHeader = "in_header" + authStyleInParams = "in_params" +) + type config struct { // Type is the type of the stream being followed. The // zero value indicates websocket. @@ -85,11 +92,30 @@ type customAuthConfig struct { type oAuth2Config struct { // common oauth fields - ClientID string `config:"client_id"` - ClientSecret string `config:"client_secret"` - EndpointParams map[string][]string `config:"endpoint_params"` - Scopes []string `config:"scopes"` - TokenURL string `config:"token_url"` + AuthStyle string `config:"auth_style"` + ClientID string `config:"client_id"` + ClientSecret string `config:"client_secret"` + EndpointParams url.Values `config:"endpoint_params"` + Scopes []string `config:"scopes"` + TokenExpiryBuffer time.Duration `config:"token_expiry_buffer" validate:"min=0"` + TokenURL string `config:"token_url"` + // accessToken is only used internally to set the initial headers via formHeader() if oauth2 is enabled + accessToken string +} + +func (o oAuth2Config) isEnabled() bool { + return o.ClientID != "" && o.ClientSecret != "" && o.TokenURL != "" +} + +func (o oAuth2Config) getAuthStyle() oauth2.AuthStyle { + switch o.AuthStyle { + case authStyleInHeader: + return oauth2.AuthStyleInHeader + case authStyleInParams: + return oauth2.AuthStyleInParams + default: + return oauth2.AuthStyleAutoDetect + } } type urlConfig struct { @@ -144,6 +170,12 @@ func (c config) Validate() error { return errors.New("wait_min must be less than or equal to wait_max") } } + + if c.Auth.OAuth2.isEnabled() { + if c.Auth.OAuth2.AuthStyle != authStyleInHeader && c.Auth.OAuth2.AuthStyle != authStyleInParams && c.Auth.OAuth2.AuthStyle != "" { + return fmt.Errorf("unsupported auth style: %s", c.Auth.OAuth2.AuthStyle) + } + } return nil } @@ -173,6 +205,11 @@ func defaultConfig() config { Transport: httpcommon.HTTPTransportSettings{ Timeout: 180 * time.Second, }, + Auth: authConfig{ + OAuth2: oAuth2Config{ + TokenExpiryBuffer: 2 * time.Minute, + }, + }, Retry: &retry{ MaxAttempts: 5, WaitMin: 1 * time.Second, diff --git a/x-pack/filebeat/input/streaming/config_test.go b/x-pack/filebeat/input/streaming/config_test.go index 437267bc7b71..99b3cc805597 100644 --- a/x-pack/filebeat/input/streaming/config_test.go +++ b/x-pack/filebeat/input/streaming/config_test.go @@ -142,6 +142,79 @@ var configTests = []struct { "url": "wss://localhost:443/v1/stream", }, }, + { + name: "valid_authStyle_default", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, + { + name: "valid_authStyle_in_params", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "auth_style": "in_params", + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, + { + name: "valid_authStyle_in_header", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "auth_style": "in_header", + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, + { + name: "invalid_authStyle", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "auth_style": "in_query", + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + }, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("unsupported auth style: in_query accessing config"), + }, + { + name: "valid_tokenExpiryBuffer", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + "token_expiry_buffer": "5m", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, + { + name: "invalid_tokenExpiryBuffer", + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "token_url": "https://localhost:443/token", + "token_expiry_buffer": "-1s", + }, + "url": "wss://localhost:443/v1/stream", + }, + wantErr: fmt.Errorf("requires duration >= 0 accessing 'auth.token_expiry_buffer'"), + }, } func TestConfig(t *testing.T) { diff --git a/x-pack/filebeat/input/streaming/input.go b/x-pack/filebeat/input/streaming/input.go index 12a362625bff..3df6b3845548 100644 --- a/x-pack/filebeat/input/streaming/input.go +++ b/x-pack/filebeat/input/streaming/input.go @@ -378,12 +378,14 @@ func errorMessage(msg string) map[string]interface{} { func formHeader(cfg config) map[string][]string { header := make(map[string][]string) switch { - case cfg.Auth.CustomAuth != nil: - header[cfg.Auth.CustomAuth.Header] = []string{cfg.Auth.CustomAuth.Value} + case cfg.Auth.OAuth2.accessToken != "": + header["Authorization"] = []string{"Bearer " + cfg.Auth.OAuth2.accessToken} case cfg.Auth.BearerToken != "": header["Authorization"] = []string{"Bearer " + cfg.Auth.BearerToken} case cfg.Auth.BasicToken != "": header["Authorization"] = []string{"Basic " + cfg.Auth.BasicToken} + case cfg.Auth.CustomAuth != nil: + header[cfg.Auth.CustomAuth.Header] = []string{cfg.Auth.CustomAuth.Value} } return header } diff --git a/x-pack/filebeat/input/streaming/input_manager.go b/x-pack/filebeat/input/streaming/input_manager.go index c685452c34f1..6a1bd8bc5a46 100644 --- a/x-pack/filebeat/input/streaming/input_manager.go +++ b/x-pack/filebeat/input/streaming/input_manager.go @@ -38,7 +38,6 @@ func cursorConfigure(cfg *conf.C) ([]inputcursor.Source, inputcursor.Input, erro if err := cfg.Unpack(&src.cfg); err != nil { return nil, nil, err } - if src.cfg.Program == "" { // set default program src.cfg.Program = ` diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index e4a8eac1d417..df9b406e17cf 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -43,7 +43,9 @@ var inputTests = []struct { name string server func(*testing.T, WebSocketHandler, map[string]interface{}, []string) proxyServer func(*testing.T, WebSocketHandler, map[string]interface{}, []string) *httptest.Server + oauth2Server func(*testing.T, http.HandlerFunc, map[string]interface{}) handler WebSocketHandler + oauth2Handler http.HandlerFunc config map[string]interface{} response []string time func() time.Time @@ -417,13 +419,13 @@ var inputTests = []struct { }, }, response: []string{` - { - "pps": { - "agent": "example.proofpoint.com", - "cid": "mmeng_uivm071" - }, - "ts": 1502908200 - }`, + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": 1502908200 + }`, }, want: []map[string]interface{}{ { @@ -586,6 +588,171 @@ var inputTests = []struct { }, }, }, + { + name: "oauth2_blank_auth_style", + oauth2Server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + s := httptest.NewServer(h) + config["auth.token_url"] = s.URL + "/token" + config["url"] = "ws://placeholder" + t.Cleanup(s.Close) + }, + oauth2Handler: oauth2TokenHandler, + server: webSocketTestServerWithAuth(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "scopes": []string{ + "scope1", + "scope2", + }, + "endpoint_params": map[string]string{ + "param1": "v1", + }, + }, + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyA" + }`}, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyA", + }, + }, + }, + { + name: "oauth2_in_params_auth_style", + oauth2Server: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) { + s := httptest.NewServer(h) + config["auth.token_url"] = s.URL + "/token" + config["url"] = "ws://placeholder" + t.Cleanup(s.Close) + }, + oauth2Handler: oauth2TokenHandler, + server: webSocketTestServerWithAuth(httptest.NewServer), + handler: defaultHandler, + config: map[string]interface{}{ + "auth": map[string]interface{}{ + "auth_style": "in_params", + "client_id": "a_client_id", + "client_secret": "a_client_secret", + "scopes": []string{ + "scope1", + "scope2", + }, + "endpoint_params": map[string]string{ + "param1": "v1", + }, + }, + "program": ` + bytes(state.response).decode_json().as(inner_body,{ + "events": [inner_body], + })`, + }, + response: []string{` + { + "pps": { + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071" + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": { + "tls": { + "verify": "NONE" + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": [ + "/dev/null" + ], + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": 35342 + }, + "id": "ZeYGULpZmL5N0151HN1OyA" + }`}, + want: []map[string]interface{}{ + { + "pps": map[string]interface{}{ + "agent": "example.proofpoint.com", + "cid": "mmeng_uivm071", + }, + "ts": "2017-08-17T14:54:12.949180-07:00", + "data": "2017-08-17T14:54:12.949180-07:00 example sendmail[30641]:v7HLqYbx029423: to=/dev/null, ctladdr= (8/0),delay=00:00:00, xdelay=00:00:00, mailer=*file*, tls_verify=NONE, pri=35342,dsn=2.0.0, stat=Sent", + "sm": map[string]interface{}{ + "tls": map[string]interface{}{ + "verify": "NONE", + }, + "stat": "Sent", + "qid": "v7HLqYbx029423", + "dsn": "2.0.0", + "mailer": "*file*", + "to": []interface{}{ + "/dev/null", + }, + "ctladdr": " (8/0)", + "delay": "00:00:00", + "xdelay": "00:00:00", + "pri": float64(35342), + }, + "id": "ZeYGULpZmL5N0151HN1OyA", + }, + }, + }, } var urlEvalTests = []struct { @@ -693,6 +860,9 @@ func TestInput(t *testing.T) { logp.TestingSetup() for _, test := range inputTests { t.Run(test.name, func(t *testing.T) { + if test.oauth2Server != nil { + test.oauth2Server(t, test.oauth2Handler, test.config) + } if test.server != nil { test.server(t, test.handler, test.config, test.response) } @@ -870,7 +1040,7 @@ func webSocketTestServerWithAuth(serve func(http.Handler) *httptest.Server) func handler(t, conn, response) })) // only set the resource URL if it is not already set - if config["url"] == nil { + if config["url"] == nil || config["url"] == "ws://placeholder" { config["url"] = "ws" + server.URL[4:] } t.Cleanup(server.Close) @@ -1029,3 +1199,34 @@ func newWebSocketProxyTestServer(t *testing.T, handler WebSocketHandler, config config["proxy_url"] = "ws" + backendServer.URL[4:] return httptest.NewServer(webSocketProxyHandler(config["url"].(string))) } + +//nolint:errcheck // no point checking errors in test server. +func oauth2TokenHandler(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/token" { + return + } + w.Header().Set("content-type", "application/json") + r.ParseForm() + switch { + case r.Method != http.MethodPost: + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong method"}`)) + case r.FormValue("grant_type") != "client_credentials": + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong grant_type"}`)) + case r.FormValue("client_id") != "a_client_id": + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong client_id"}`)) + case r.FormValue("client_secret") != "a_client_secret": + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong client_secret"}`)) + case r.FormValue("scope") != "scope1 scope2": + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong scope"}`)) + case r.FormValue("param1") != "v1": + w.WriteHeader(http.StatusBadRequest) + w.Write([]byte(`{"error":"wrong param1"}`)) + default: + w.Write([]byte(`{"token_type": "Bearer", "expires_in": "3600", "access_token": "` + bearerToken + `"}`)) + } +} diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 584852aabcce..eeb89ad5c9b8 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -21,6 +21,8 @@ import ( "github.com/gorilla/websocket" "go.uber.org/zap/zapcore" + "golang.org/x/oauth2" + "golang.org/x/oauth2/clientcredentials" inputcursor "github.com/elastic/beats/v7/filebeat/input/v2/input-cursor" "github.com/elastic/elastic-agent-libs/logp" @@ -31,11 +33,29 @@ import ( type websocketStream struct { processor - id string - cfg config - cursor map[string]any + id string + cfg config + cursor map[string]any + tokenSource oauth2.TokenSource + tokenExpiry <-chan time.Time + time func() time.Time +} + +type loggingRoundTripper struct { + rt http.RoundTripper + log *logp.Logger +} - time func() time.Time +func (l *loggingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + resp, err := l.rt.RoundTrip(req) + // avoided logging request and and response body as it may contain sensitive information and can be huge + if l.log.Core().Enabled(zapcore.DebugLevel) { + l.log.Debugf("request: %v %v\nHeaders: %v\n", req.Method, req.URL, req.Header) + if err == nil { + l.log.Debugf("response: %v\nHeaders: %v\n", resp.Status, resp.Header) + } + } + return resp, err } // NewWebsocketFollower performs environment construction including CEL @@ -53,9 +73,40 @@ func NewWebsocketFollower(ctx context.Context, id string, cfg config, cursor map redact: cfg.Redact, metrics: newInputMetrics(id), }, + // the token expiry handler will never trigger unless a valid expiry time is assigned + tokenExpiry: nil, } s.metrics.url.Set(cfg.URL.String()) s.metrics.errorsTotal.Set(0) + // initialize the oauth2 token source if oauth2 is enabled and set access token in the config + if cfg.Auth.OAuth2.isEnabled() { + config := &clientcredentials.Config{ + AuthStyle: cfg.Auth.OAuth2.getAuthStyle(), + ClientID: cfg.Auth.OAuth2.ClientID, + ClientSecret: cfg.Auth.OAuth2.ClientSecret, + TokenURL: cfg.Auth.OAuth2.TokenURL, + Scopes: cfg.Auth.OAuth2.Scopes, + EndpointParams: cfg.Auth.OAuth2.EndpointParams, + } + // injecting a custom http client with loggingRoundTripper to debug-log request and response attributes for oauth2 token + client := &http.Client{ + Transport: &loggingRoundTripper{http.DefaultTransport, log}, + } + oauth2Ctx := context.WithValue(ctx, oauth2.HTTPClient, client) + s.tokenSource = config.TokenSource(oauth2Ctx) + // get the initial token + token, err := s.tokenSource.Token() + if err != nil { + s.metrics.errorsTotal.Inc() + s.Close() + return nil, fmt.Errorf("failed to obtain oauth2 token: %w", err) + } + // set the initial token in the config if oauth2 is enabled + // this allows seamless header creation in formHeader() for the initial connection + s.cfg.Auth.OAuth2.accessToken = token.AccessToken + // set the initial token expiry channel with buffer of 2 mins + s.tokenExpiry = time.After(time.Until(token.Expiry) - cfg.Auth.OAuth2.TokenExpiryBuffer) + } patterns, err := regexpsFromConfig(cfg) if err != nil { @@ -114,6 +165,32 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { case <-ctx.Done(): s.log.Debugw("context cancelled, closing websocket connection") return ctx.Err() + // s.tokenExpiry channel will only trigger if oauth2 is enabled and the token is about to expire + case <-s.tokenExpiry: + // get the new token + token, err := s.tokenSource.Token() + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to obtain oauth2 token during token refresh", "error", err) + return err + } + // gracefully close current connection + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the existing websocket connection during token refresh", "error", err) + } + // set the new token in the config + s.cfg.Auth.OAuth2.accessToken = token.AccessToken + // set the new token expiry channel with 2 mins buffer + s.tokenExpiry = time.After(time.Until(token.Expiry) - s.cfg.Auth.OAuth2.TokenExpiryBuffer) + // establish a new connection with the new token + c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) + handleConnectionResponse(resp, s.metrics, s.log) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to establish a new websocket connection on token refresh", "error", err) + return err + } default: _, message, err := c.ReadMessage() if err != nil { From 4ba7d1c9a876b26267691007606f770acdb48c0c Mon Sep 17 00:00:00 2001 From: Kavindu Dodanduwa Date: Tue, 7 Jan 2025 10:46:55 -0800 Subject: [PATCH 6/6] [aws] [s3] Introduce ignore_older & start_timestamp for S3 input allowing better registry cleanups (#41817) * add changelog entry Signed-off-by: Kavindu Dodanduwa * sort config entries Signed-off-by: Kavindu Dodanduwa * introduce ignore old and start timestamp configurations and document them Signed-off-by: Kavindu Dodanduwa * add filtering logic Signed-off-by: Kavindu Dodanduwa * filter tests Signed-off-by: Kavindu Dodanduwa * add component test for filtering and fix lint issues Signed-off-by: Kavindu Dodanduwa # Conflicts: # x-pack/filebeat/input/awss3/s3_test.go * add changelog entry Signed-off-by: Kavindu Dodanduwa * improve documentation Signed-off-by: Kavindu Dodanduwa * review changes - improve naming, change signature and improve documentation Signed-off-by: Kavindu Dodanduwa --------- Signed-off-by: Kavindu Dodanduwa --- CHANGELOG.next.asciidoc | 1 + .../filebeat.inputs.reference.xpack.yml.tmpl | 10 + .../docs/inputs/input-aws-s3.asciidoc | 28 ++ x-pack/filebeat/filebeat.reference.yml | 10 + x-pack/filebeat/input/awss3/config.go | 34 ++- x-pack/filebeat/input/awss3/config_test.go | 33 ++ .../input/awss3/input_benchmark_test.go | 1 + x-pack/filebeat/input/awss3/s3_filters.go | 123 ++++++++ .../filebeat/input/awss3/s3_filters_test.go | 179 +++++++++++ x-pack/filebeat/input/awss3/s3_input.go | 20 +- x-pack/filebeat/input/awss3/s3_test.go | 283 +++++++++++++++++- 11 files changed, 687 insertions(+), 35 deletions(-) create mode 100644 x-pack/filebeat/input/awss3/s3_filters.go create mode 100644 x-pack/filebeat/input/awss3/s3_filters_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f21fd1c56402..b61eb2a9ee2f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -385,6 +385,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094] - Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212] - Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225] +- Introduce ignore older and start timestamp filters for AWS S3 input. {pull}41804[41804] *Auditbeat* diff --git a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl index 4e966d594c57..84ae2d963176 100644 --- a/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl +++ b/x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl @@ -124,6 +124,16 @@ # Controls deletion of objects after backing them up #delete_after_backup: false + # Ignore bucket entries older than the given timespan. + # Timespan is calculated from current time to processing object's last modified timestamp. + # This is disabled by default(value 0) and can be configured to a time duration like "48h" or "2h30m". + #ignore_older: 0 + + # Accept bucket entries with last modified timestamp newer than the given timestamp. + # Accepts a timestamp in YYYY-MM-DDTHH:MM:SSZ format and default is empty. + # For example, "2024-11-20T20:00:00Z" (UTC) or "2024-11-20T22:30:00+02:30" (with zone offset). + #start_timestamp: + #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input #- type: aws-cloudwatch diff --git a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc index 9bfb312768b3..1935e2478b92 100644 --- a/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc @@ -178,6 +178,8 @@ Node pipeline. The `aws-s3` input supports the following configuration options plus the <<{beatname_lc}-input-{type}-common-options>> described later. +NOTE: For time durations, valid time units are - "ns", "us" (or "µs"), "ms", "s", "m", "h". For example, "2h" + [float] ==== `api_timeout` @@ -690,6 +692,32 @@ This option can only be used together with the backup functionality. [id="{beatname_lc}-input-{type}-common-options"] include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] +[float] +==== `ignore_older` + +The parameter specifies the time duration (ex:- 30m, 2h or 48h) during which bucket entries are accepted for processing. +By default, this feature is disabled, allowing any entry in the bucket to be processed. +It is recommended to set a suitable duration to prevent older bucket entries from being tracked, which helps to reduce the memory usage. + +When defined, bucket entries are processed only if their last modified timestamp falls within the specified time duration, relative to the current time. +However, when the start_timestamp is set, the initial processing will include all bucket entries up to that timestamp. + +NOTE: Bucket entries that are older than the defined duration and have failed processing will not be re-processed. +It is recommended to configure a sufficiently long duration based on your use case and current settings to avoid conflicts with the bucket_list_interval property. +Additionally, this ensures that subsequent runs can include and re-process objects that failed due to unavoidable errors. + +[float] +==== `start_timestamp` + +Accepts a timestamp in the YYYY-MM-DDTHH:MM:SSZ format, which defines the point from which bucket entries are accepted for processing. +By default, this is disabled, allowing all entries in the bucket to be processed. + +This parameter is useful when configuring input for the first time, especially if you want to ingest logs starting from a specific time. +The timestamp can also be set to a future time, offering greater flexibility. +You can combine this property with ignore_older duration to improve memory usage by reducing tracked bucket entries. + +NOTE: It is recommended to update this value when updating or restarting filebeat + [float] === AWS Permissions diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 5da6e5d838b9..eeb3770e8fef 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3051,6 +3051,16 @@ filebeat.inputs: # Controls deletion of objects after backing them up #delete_after_backup: false + # Ignore bucket entries older than the given timespan. + # Timespan is calculated from current time to processing object's last modified timestamp. + # This is disabled by default(value 0) and can be configured to a time duration like "48h" or "2h30m". + #ignore_older: 0 + + # Accept bucket entries with last modified timestamp newer than the given timestamp. + # Accepts a timestamp in YYYY-MM-DDTHH:MM:SSZ format and default is empty. + # For example, "2024-11-20T20:00:00Z" (UTC) or "2024-11-20T22:30:00+02:30" (with zone offset). + #start_timestamp: + #------------------------------ AWS CloudWatch input -------------------------------- # Beta: Config options for AWS CloudWatch input #- type: aws-cloudwatch diff --git a/x-pack/filebeat/input/awss3/config.go b/x-pack/filebeat/input/awss3/config.go index 843061ae3c3e..bc62ed9f8717 100644 --- a/x-pack/filebeat/input/awss3/config.go +++ b/x-pack/filebeat/input/awss3/config.go @@ -27,24 +27,26 @@ import ( type config struct { APITimeout time.Duration `config:"api_timeout"` - VisibilityTimeout time.Duration `config:"visibility_timeout"` - SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning. - SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it. - SQSScript *scriptConfig `config:"sqs.notification_parsing_script"` - QueueURL string `config:"queue_url"` - RegionName string `config:"region"` - BucketARN string `config:"bucket_arn"` + AWSConfig awscommon.ConfigAWS `config:",inline"` AccessPointARN string `config:"access_point_arn"` - NonAWSBucketName string `config:"non_aws_bucket_name"` + BackupConfig backupConfig `config:",inline"` + BucketARN string `config:"bucket_arn"` BucketListInterval time.Duration `config:"bucket_list_interval"` BucketListPrefix string `config:"bucket_list_prefix"` - NumberOfWorkers int `config:"number_of_workers"` - AWSConfig awscommon.ConfigAWS `config:",inline"` FileSelectors []fileSelectorConfig `config:"file_selectors"` - ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used. + IgnoreOlder time.Duration `config:"ignore_older"` + NonAWSBucketName string `config:"non_aws_bucket_name"` + NumberOfWorkers int `config:"number_of_workers"` PathStyle bool `config:"path_style"` ProviderOverride string `config:"provider"` - BackupConfig backupConfig `config:",inline"` + QueueURL string `config:"queue_url"` + ReaderConfig readerConfig `config:",inline"` // Reader options to apply when no file_selectors are used. + RegionName string `config:"region"` + SQSMaxReceiveCount int `config:"sqs.max_receive_count"` // The max number of times a message should be received (retried) before deleting it. + SQSScript *scriptConfig `config:"sqs.notification_parsing_script"` + SQSWaitTime time.Duration `config:"sqs.wait_time"` // The max duration for which the SQS ReceiveMessage call waits for a message to arrive in the queue before returning. + StartTimestamp string `config:"start_timestamp"` + VisibilityTimeout time.Duration `config:"visibility_timeout"` } func defaultConfig() config { @@ -142,6 +144,13 @@ func (c *config) Validate() error { } } + if c.StartTimestamp != "" { + _, err := time.Parse(time.RFC3339, c.StartTimestamp) + if err != nil { + return fmt.Errorf("invalid input for start_timestamp: %w", err) + } + } + return nil } @@ -295,6 +304,7 @@ func (c config) sqsConfigModifier(o *sqs.Options) { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } if c.AWSConfig.Endpoint != "" { + //nolint:staticcheck // not changing through this PR o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint) } } diff --git a/x-pack/filebeat/input/awss3/config_test.go b/x-pack/filebeat/input/awss3/config_test.go index d791271ba6ef..50ccf47c1e80 100644 --- a/x-pack/filebeat/input/awss3/config_test.go +++ b/x-pack/filebeat/input/awss3/config_test.go @@ -596,6 +596,39 @@ func TestConfig(t *testing.T) { expectedErr: "backup_to_bucket_prefix cannot be the same as bucket_list_prefix, this will create an infinite loop", expectedCfg: nil, }, + { + name: "validate ignore_older and start_timestamp configurations", + s3Bucket: s3Bucket, + config: mapstr.M{ + "bucket_arn": s3Bucket, + "ignore_older": "24h", + "start_timestamp": "2024-11-20T20:00:00Z", + }, + expectedCfg: func(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket string) config { + c := makeConfig(queueURL, s3Bucket, s3AccessPoint, nonAWSS3Bucket) + c.IgnoreOlder = 24 * time.Hour + c.StartTimestamp = "2024-11-20T20:00:00Z" + return c + }, + }, + { + name: "ignore_older only accepts valid duration - unit valid with ParseDuration", + s3Bucket: s3Bucket, + config: mapstr.M{ + "bucket_arn": s3Bucket, + "ignore_older": "24D", + }, + expectedErr: "time: unknown unit \"D\" in duration \"24D\" accessing 'ignore_older'", + }, + { + name: "start_timestamp accepts a valid timestamp of format - YYYY-MM-DDTHH:MM:SSZ", + s3Bucket: s3Bucket, + config: mapstr.M{ + "bucket_arn": s3Bucket, + "start_timestamp": "2024-11-20 20:20:00", + }, + expectedErr: "invalid input for start_timestamp: parsing time \"2024-11-20 20:20:00\" as \"2006-01-02T15:04:05Z07:00\": cannot parse \" 20:20:00\" as \"T\" accessing config", + }, } for _, tc := range testCases { diff --git a/x-pack/filebeat/input/awss3/input_benchmark_test.go b/x-pack/filebeat/input/awss3/input_benchmark_test.go index 319d8c3b9aa3..959148ffd27f 100644 --- a/x-pack/filebeat/input/awss3/input_benchmark_test.go +++ b/x-pack/filebeat/input/awss3/input_benchmark_test.go @@ -352,6 +352,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult s3ObjectHandler: s3EventHandlerFactory, states: states, provider: "provider", + filterProvider: newFilterProvider(&config), } s3Poller.run(ctx) diff --git a/x-pack/filebeat/input/awss3/s3_filters.go b/x-pack/filebeat/input/awss3/s3_filters.go new file mode 100644 index 000000000000..a34716586038 --- /dev/null +++ b/x-pack/filebeat/input/awss3/s3_filters.go @@ -0,0 +1,123 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "sync" + "time" + + "github.com/elastic/elastic-agent-libs/logp" +) + +const ( + filterOldestTime = "oldestTimeFilter" + filterStartTime = "startTimeFilter" +) + +// filterProvider exposes filters that needs to be applied on derived state. +// Once configured, retrieve filter applier using getApplierFunc. +type filterProvider struct { + cfg *config + + staticFilters []filter + once sync.Once +} + +func newFilterProvider(cfg *config) *filterProvider { + fp := &filterProvider{ + cfg: cfg, + } + + // derive static filters + if cfg.StartTimestamp != "" { + // note - errors should not occur as this has validated prior reaching here + parse, _ := time.Parse(time.RFC3339, cfg.StartTimestamp) + fp.staticFilters = append(fp.staticFilters, newStartTimestampFilter(parse)) + } + + return fp +} + +// getApplierFunc returns aggregated filters valid at the time of retrival. +// Applier return true if state is valid for processing according to the underlying filter collection. +func (f *filterProvider) getApplierFunc() func(log *logp.Logger, s state) bool { + filters := map[string]filter{} + + if f.cfg.IgnoreOlder != 0 { + timeFilter := newOldestTimeFilter(f.cfg.IgnoreOlder, time.Now()) + filters[timeFilter.getID()] = timeFilter + } + + for _, f := range f.staticFilters { + filters[f.getID()] = f + } + + f.once.Do(func() { + // Ignore the oldest time filter once for initial startup only if start timestamp filter is defined + // This allows users to ingest desired data from start time onwards. + if filters[filterStartTime] != nil { + delete(filters, filterOldestTime) + } + }) + + return func(log *logp.Logger, s state) bool { + for _, f := range filters { + if !f.isValid(s) { + log.Debugf("skipping processing of object '%s' by filter '%s'", s.Key, f.getID()) + return false + } + } + + return true + } +} + +// filter defines the fileter implementation contract +type filter interface { + getID() string + isValid(objState state) (valid bool) +} + +// startTimestampFilter - filter out entries based on provided start time stamp, compared to filtering object's last modified times stamp. +type startTimestampFilter struct { + id string + timeStart time.Time +} + +func newStartTimestampFilter(start time.Time) *startTimestampFilter { + return &startTimestampFilter{ + id: filterStartTime, + timeStart: start, + } +} + +func (s startTimestampFilter) isValid(objState state) bool { + return s.timeStart.Before(objState.LastModified) +} + +func (s startTimestampFilter) getID() string { + return s.id +} + +// oldestTimeFilter - filter out entries based on calculated oldest modified time, compared to filtering object's last modified times stamp. +type oldestTimeFilter struct { + id string + timeOldest time.Time +} + +func newOldestTimeFilter(timespan time.Duration, now time.Time) *oldestTimeFilter { + return &oldestTimeFilter{ + id: filterOldestTime, + timeOldest: now.Add(-1 * timespan), + } +} + +func (s oldestTimeFilter) isValid(objState state) bool { + return s.timeOldest.Before(objState.LastModified) +} + +func (s oldestTimeFilter) getID() string { + return s.id +} diff --git a/x-pack/filebeat/input/awss3/s3_filters_test.go b/x-pack/filebeat/input/awss3/s3_filters_test.go new file mode 100644 index 000000000000..0350b4952cdd --- /dev/null +++ b/x-pack/filebeat/input/awss3/s3_filters_test.go @@ -0,0 +1,179 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package awss3 + +import ( + "testing" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + + "github.com/stretchr/testify/assert" +) + +func Test_filterProvider(t *testing.T) { + t.Run("Configuration check", func(t *testing.T) { + cfg := config{ + StartTimestamp: "2024-11-26T21:00:00Z", + IgnoreOlder: 10 * time.Minute, + } + + fProvider := newFilterProvider(&cfg) + + assert.Equal(t, 1, len(fProvider.staticFilters)) + assert.Equal(t, filterStartTime, fProvider.staticFilters[0].getID()) + }) + + logger := logp.NewLogger("test-logger") + + tests := []struct { + name string + cfg *config + inputState state + runFilterForCount int + expectFilterResults []bool + }{ + { + name: "Simple run - all valid result", + cfg: &config{ + StartTimestamp: "2024-11-26T21:00:00Z", + IgnoreOlder: 10 * time.Minute, + }, + inputState: newState("bucket", "key", "eTag", time.Now()), + runFilterForCount: 1, + expectFilterResults: []bool{true}, + }, + { + name: "Simple run - all invalid result", + cfg: &config{ + StartTimestamp: "2024-11-26T21:00:00Z", + }, + inputState: newState("bucket", "key", "eTag", time.Unix(0, 0)), + runFilterForCount: 1, + expectFilterResults: []bool{false}, + }, + { + name: "Simple run - no filters hence valid result", + cfg: &config{}, + inputState: newState("bucket", "key", "eTag", time.Now()), + runFilterForCount: 1, + expectFilterResults: []bool{true}, + }, + { + name: "Single filter - ignore old invalid result", + cfg: &config{ + IgnoreOlder: 1 * time.Minute, + }, + inputState: newState("bucket", "key", "eTag", time.Unix(time.Now().Add(-2*time.Minute).Unix(), 0)), + runFilterForCount: 1, + expectFilterResults: []bool{false}, + }, + { + name: "Combined filters - ignore old won't affect first run if timestamp is given but will affect thereafter", + cfg: &config{ + StartTimestamp: "2024-11-26T21:00:00Z", + IgnoreOlder: 10 * time.Minute, + }, + inputState: newState("bucket", "key", "eTag", time.Unix(1732654860, 0)), // 2024-11-26T21:01:00Z in epoch + runFilterForCount: 3, + expectFilterResults: []bool{true, false, false}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + provider := newFilterProvider(test.cfg) + results := make([]bool, 0, test.runFilterForCount) + + for i := 0; i < test.runFilterForCount; i++ { + applierFunc := provider.getApplierFunc() + results = append(results, applierFunc(logger, test.inputState)) + } + + assert.Equal(t, test.expectFilterResults, results) + }) + } +} + +func Test_startTimestampFilter(t *testing.T) { + t.Run("Configuration check", func(t *testing.T) { + entry := newState("bucket", "key", "eTag", time.Now()) + + oldTimeFilter := newStartTimestampFilter(time.Now().Add(-2 * time.Minute)) + + assert.Equal(t, filterStartTime, oldTimeFilter.getID()) + assert.True(t, oldTimeFilter.isValid(entry)) + }) + + tests := []struct { + name string + timeStamp time.Time + input state + result bool + }{ + { + name: "State valid", + timeStamp: time.Now().Add(-10 * time.Minute), + input: newState("bucket", "key", "eTag", time.Now()), + result: true, + }, + + { + name: "State invalid", + timeStamp: time.Now(), + input: newState("bucket", "key", "eTag", time.Now().Add(-10*time.Minute)), + result: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + timeFilter := newStartTimestampFilter(test.timeStamp) + assert.Equal(t, test.result, timeFilter.isValid(test.input)) + }) + } + +} + +func Test_oldestTimeFilter(t *testing.T) { + t.Run("configuration check", func(t *testing.T) { + duration := time.Duration(1) * time.Second + entry := newState("bucket", "key", "eTag", time.Now()) + + oldTimeFilter := newOldestTimeFilter(duration, time.Now()) + + assert.Equal(t, filterOldestTime, oldTimeFilter.getID()) + assert.True(t, oldTimeFilter.isValid(entry)) + }) + + tests := []struct { + name string + duration time.Duration + input state + result bool + }{ + { + name: "State valid", + duration: time.Duration(1) * time.Minute, + input: newState("bucket", "key", "eTag", time.Now()), + result: true, + }, + + { + name: "State invalid", + duration: time.Duration(1) * time.Minute, + input: newState("bucket", "key", "eTag", time.Now().Add(-10*time.Minute)), + result: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + timeFilter := newOldestTimeFilter(test.duration, time.Now()) + assert.Equal(t, test.result, timeFilter.isValid(test.input)) + }) + } + +} diff --git a/x-pack/filebeat/input/awss3/s3_input.go b/x-pack/filebeat/input/awss3/s3_input.go index c2d3b8446cc4..ad20579aae49 100644 --- a/x-pack/filebeat/input/awss3/s3_input.go +++ b/x-pack/filebeat/input/awss3/s3_input.go @@ -36,6 +36,7 @@ type s3PollerInput struct { metrics *inputMetrics s3ObjectHandler s3ObjectHandlerFactory states *states + filterProvider *filterProvider } func newS3PollerInput( @@ -43,11 +44,11 @@ func newS3PollerInput( awsConfig awssdk.Config, store beater.StateStore, ) (v2.Input, error) { - return &s3PollerInput{ - config: config, - awsConfig: awsConfig, - store: store, + config: config, + awsConfig: awsConfig, + store: store, + filterProvider: newFilterProvider(&config), }, nil } @@ -199,9 +200,10 @@ func (in *s3PollerInput) workerLoop(ctx context.Context, workChan <-chan state) // These IDs are intended to be used for state clean-up. func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) (knownStateIDSlice []string, ok bool) { defer close(workChan) - bucketName := getBucketNameFromARN(in.config.getBucketARN()) + isStateValid := in.filterProvider.getApplierFunc() + errorBackoff := backoff.NewEqualJitterBackoff(ctx.Done(), 1, 120) circuitBreaker := 0 paginator := in.s3.ListObjectsPaginator(bucketName, in.config.BucketListPrefix) @@ -233,10 +235,14 @@ func (in *s3PollerInput) readerLoop(ctx context.Context, workChan chan<- state) in.metrics.s3ObjectsListedTotal.Add(uint64(totListedObjects)) for _, object := range page.Contents { state := newState(bucketName, *object.Key, *object.ETag, *object.LastModified) - knownStateIDSlice = append(knownStateIDSlice, state.ID()) + if !isStateValid(in.log, state) { + continue + } + // add to known states only if valid for processing + knownStateIDSlice = append(knownStateIDSlice, state.ID()) if in.states.IsProcessed(state) { - in.log.Debugw("skipping state.", "state", state) + in.log.Debugw("skipping state processing as already processed.", "state", state) continue } diff --git a/x-pack/filebeat/input/awss3/s3_test.go b/x-pack/filebeat/input/awss3/s3_test.go index 2f79cb44a48c..5518a1808e1d 100644 --- a/x-pack/filebeat/input/awss3/s3_test.go +++ b/x-pack/filebeat/input/awss3/s3_test.go @@ -9,13 +9,14 @@ import ( "testing" "time" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" - - "github.com/elastic/elastic-agent-libs/logp" ) func TestS3Poller(t *testing.T) { @@ -130,21 +131,24 @@ func TestS3Poller(t *testing.T) { s3ObjProc := newS3ObjectProcessorFactory(nil, mockAPI, nil, backupConfig{}) states, err := newStates(nil, store, listPrefix) require.NoError(t, err, "states creation must succeed") + + cfg := config{ + NumberOfWorkers: numberOfWorkers, + BucketListInterval: pollInterval, + BucketARN: bucket, + BucketListPrefix: listPrefix, + RegionName: "region", + } poller := &s3PollerInput{ - log: logp.NewLogger(inputName), - config: config{ - NumberOfWorkers: numberOfWorkers, - BucketListInterval: pollInterval, - BucketARN: bucket, - BucketListPrefix: listPrefix, - RegionName: "region", - }, + log: logp.NewLogger(inputName), + config: cfg, s3: mockAPI, pipeline: pipeline, s3ObjectHandler: s3ObjProc, states: states, provider: "provider", metrics: newInputMetrics("", nil, 0), + filterProvider: newFilterProvider(&cfg), } poller.runPoll(ctx) }) @@ -268,6 +272,14 @@ func TestS3Poller(t *testing.T) { s3ObjProc := newS3ObjectProcessorFactory(nil, mockS3, nil, backupConfig{}) states, err := newStates(nil, store, listPrefix) require.NoError(t, err, "states creation must succeed") + + cfg := config{ + NumberOfWorkers: numberOfWorkers, + BucketListInterval: pollInterval, + BucketARN: bucket, + BucketListPrefix: "key", + RegionName: "region", + } poller := &s3PollerInput{ log: logp.NewLogger(inputName), config: config{ @@ -283,15 +295,254 @@ func TestS3Poller(t *testing.T) { states: states, provider: "provider", metrics: newInputMetrics("", nil, 0), + filterProvider: newFilterProvider(&cfg), } poller.run(ctx) }) } -func TestS3ReaderLoop(t *testing.T) { - -} - -func TestS3WorkerLoop(t *testing.T) { - +func Test_S3StateHandling(t *testing.T) { + bucket := "bucket" + logger := logp.NewLogger(inputName) + fixedTimeNow := time.Now() + + tests := []struct { + name string + s3Objects []types.Object + config *config + initStates []state + runPollFor int + expectStateIDs []string + }{ + { + name: "State unchanged - registry backed state", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + }, + initStates: []state{newState(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + runPollFor: 1, + expectStateIDs: []string{stateID(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + }, + { + name: "State cleanup - remove existing registry entry based on ignore older filter", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + IgnoreOlder: 1 * time.Second, + }, + initStates: []state{newState(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + runPollFor: 1, + expectStateIDs: []string{}, + }, + { + name: "State cleanup - remove existing registry entry based on timestamp filter", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + StartTimestamp: "2024-11-27T12:00:00Z", + }, + initStates: []state{newState(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + runPollFor: 1, + expectStateIDs: []string{}, + }, + { + name: "State updated - no filters", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + }, + runPollFor: 1, + expectStateIDs: []string{stateID(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + }, + { + name: "State updated - ignore old filter", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(fixedTimeNow), + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + IgnoreOlder: 1 * time.Hour, + }, + runPollFor: 1, + expectStateIDs: []string{stateID(bucket, "obj-A", "etag-A", fixedTimeNow)}, + }, + { + name: "State updated - timestamp filter", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(fixedTimeNow), + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + StartTimestamp: "2024-11-26T12:00:00Z", + }, + runPollFor: 1, + expectStateIDs: []string{stateID(bucket, "obj-A", "etag-A", fixedTimeNow)}, + }, + { + name: "State updated - combined filters of ignore old and timestamp entry exist after first run", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + IgnoreOlder: 1 * time.Hour, + StartTimestamp: "2024-11-20T12:00:00Z", + }, + // run once to validate initial coverage of entries till start timestamp + runPollFor: 1, + expectStateIDs: []string{stateID(bucket, "obj-A", "etag-A", time.Unix(1732622400, 0))}, // 2024-11-26T12:00:00Z + }, + { + name: "State updated - combined filters of ignore old and timestamp remove entry after second run", + s3Objects: []types.Object{ + { + Key: aws.String("obj-A"), + ETag: aws.String("etag-A"), + LastModified: aws.Time(time.Unix(1732622400, 0)), // 2024-11-26T12:00:00Z + }, + }, + config: &config{ + NumberOfWorkers: 1, + BucketListInterval: 1 * time.Second, + BucketARN: bucket, + IgnoreOlder: 1 * time.Hour, + StartTimestamp: "2024-11-20T12:00:00Z", + }, + // run twice to validate removal by ignore old filter + runPollFor: 2, + expectStateIDs: []string{}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // given - setup and mocks + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctrl, ctx := gomock.WithContext(ctx, t) + defer ctrl.Finish() + + mockS3API := NewMockS3API(ctrl) + mockS3Pager := NewMockS3Pager(ctrl) + mockObjHandler := NewMockS3ObjectHandlerFactory(ctrl) + mockS3ObjectHandler := NewMockS3ObjectHandler(ctrl) + + gomock.InOrder( + mockS3API.EXPECT(). + ListObjectsPaginator(gomock.Eq(bucket), ""). + AnyTimes(). + DoAndReturn(func(_, _ string) s3Pager { + return mockS3Pager + }), + ) + + for i := 0; i < test.runPollFor; i++ { + mockS3Pager.EXPECT().HasMorePages().Times(1).DoAndReturn(func() bool { return true }) + mockS3Pager.EXPECT().HasMorePages().Times(1).DoAndReturn(func() bool { return false }) + } + + mockS3Pager.EXPECT(). + NextPage(gomock.Any()). + Times(test.runPollFor). + DoAndReturn(func(_ context.Context, optFns ...func(*s3.Options)) (*s3.ListObjectsV2Output, error) { + return &s3.ListObjectsV2Output{Contents: test.s3Objects}, nil + }) + + mockObjHandler.EXPECT().Create(gomock.Any(), gomock.Any()).AnyTimes().Return(mockS3ObjectHandler) + mockS3ObjectHandler.EXPECT().ProcessS3Object(gomock.Any(), gomock.Any()).AnyTimes(). + DoAndReturn(func(log *logp.Logger, eventCallback func(e beat.Event)) error { + eventCallback(beat.Event{}) + return nil + }) + + store := openTestStatestore() + s3States, err := newStates(logger, store, "") + require.NoError(t, err, "States creation must succeed") + + // Note - add init states as if we are deriving them from registry + for _, st := range test.initStates { + err := s3States.AddState(st) + require.NoError(t, err, "State add should not error") + } + + poller := &s3PollerInput{ + log: logger, + config: *test.config, + s3: mockS3API, + pipeline: newFakePipeline(), + s3ObjectHandler: mockObjHandler, + states: s3States, + metrics: newInputMetrics("state-test: "+test.name, nil, 0), + filterProvider: newFilterProvider(test.config), + } + + // when - run polling for desired time + for i := 0; i < test.runPollFor; i++ { + poller.runPoll(ctx) + <-time.After(500 * time.Millisecond) + } + + // then - desired state entries + + // state must only contain expected state IDs + require.Equal(t, len(test.expectStateIDs), len(s3States.states)) + for _, id := range test.expectStateIDs { + if s3States.states[id] == nil { + t.Errorf("state with ID %s should exist", id) + } + } + }) + } }