From 0baabb300245d2757f1711b4d13d4c9cec21126e Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Mon, 24 Feb 2025 23:28:29 +0100 Subject: [PATCH 1/9] feat(fips): disallow non-compliant crypto in fingerprint processor (#42598) * feat(fips): disallow non-compliant crypto in fingerprint processor do not allow md5 and sha1 config values in fingerprint processor * refactor: avoid duplicate maps and add tests * lint: fix linter issues --- .../fingerprint/fingerprint_test.go | 26 +++++++------- libbeat/processors/fingerprint/hash.go | 9 ++--- .../processors/fingerprint/hash_fips_test.go | 34 ++++++++++++++++++ libbeat/processors/fingerprint/hash_nofips.go | 35 ++++++++++++++++++ .../fingerprint/hash_nofips_test.go | 36 +++++++++++++++++++ 5 files changed, 121 insertions(+), 19 deletions(-) create mode 100644 libbeat/processors/fingerprint/hash_fips_test.go create mode 100644 libbeat/processors/fingerprint/hash_nofips.go create mode 100644 libbeat/processors/fingerprint/hash_nofips_test.go diff --git a/libbeat/processors/fingerprint/fingerprint_test.go b/libbeat/processors/fingerprint/fingerprint_test.go index 8d66762e479c..6e36ebfc41c5 100644 --- a/libbeat/processors/fingerprint/fingerprint_test.go +++ b/libbeat/processors/fingerprint/fingerprint_test.go @@ -19,7 +19,7 @@ package fingerprint import ( "fmt" - "math/rand" + "math/rand/v2" "strconv" "testing" "time" @@ -129,11 +129,11 @@ func TestHashMethods(t *testing.T) { "xxhash": {"37bc50682fba6686"}, } - for method, test := range tests { - t.Run(method, func(t *testing.T) { + for _, method := range hashes { + t.Run(method.Name, func(t *testing.T) { testConfig, err := config.NewConfigFrom(mapstr.M{ "fields": []string{"field1", "field2"}, - "method": method, + "method": method.Name, }) assert.NoError(t, err) @@ -150,7 +150,7 @@ func TestHashMethods(t *testing.T) { v, err := newEvent.GetValue("fingerprint") assert.NoError(t, err) - assert.Equal(t, test.expected, v) + assert.Equal(t, tests[method.Name].expected, v) }) } } @@ -212,16 +212,16 @@ func TestEncoding(t *testing.T) { tests := map[string]struct { expectedFingerprint string }{ - "hex": {"8934ca639027aab1ee9f3944d4d6bd1e"}, - "base32": {"RE2MUY4QE6VLD3U7HFCNJVV5DY======"}, - "base64": {"iTTKY5AnqrHunzlE1Na9Hg=="}, + "hex": {"49f15f7c03c606b4bdf43f60481842954ff7b45a020a22a1d0911d76f170c798"}, + "base32": {"JHYV67ADYYDLJPPUH5QEQGCCSVH7PNC2AIFCFIOQSEOXN4LQY6MA===="}, + "base64": {"SfFffAPGBrS99D9gSBhClU/3tFoCCiKh0JEddvFwx5g="}, } for encoding, test := range tests { t.Run(encoding, func(t *testing.T) { testConfig, err := config.NewConfigFrom(mapstr.M{ "fields": []string{"field2", "nested.field"}, - "method": "md5", + "method": "sha256", "encoding": encoding, }) assert.NoError(t, err) @@ -465,12 +465,12 @@ func TestProcessorStringer(t *testing.T) { testConfig, err := config.NewConfigFrom(mapstr.M{ "fields": []string{"field1"}, "encoding": "hex", - "method": "md5", + "method": "sha256", }) require.NoError(t, err) p, err := New(testConfig) require.NoError(t, err) - require.Equal(t, `fingerprint={"Method":"md5","Encoding":"hex","Fields":["field1"],"TargetField":"fingerprint","IgnoreMissing":false}`, fmt.Sprint(p)) + require.Equal(t, `fingerprint={"Method":"sha256","Encoding":"hex","Fields":["field1"],"TargetField":"fingerprint","IgnoreMissing":false}`, fmt.Sprint(p)) } func BenchmarkHashMethods(b *testing.B) { @@ -497,7 +497,7 @@ func BenchmarkHashMethods(b *testing.B) { } func nRandomEvents(num int) []beat.Event { - prng := rand.New(rand.NewSource(12345)) + prng := rand.New(rand.NewPCG(0, 12345)) const charset = "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + @@ -508,7 +508,7 @@ func nRandomEvents(num int) []beat.Event { events := make([]beat.Event, 0, num) for i := 0; i < num; i++ { for j := range b { - b[j] = charset[prng.Intn(charsetLen)] + b[j] = charset[prng.IntN(charsetLen)] } events = append(events, beat.Event{ Fields: mapstr.M{ diff --git a/libbeat/processors/fingerprint/hash.go b/libbeat/processors/fingerprint/hash.go index 1c8cf146a147..61f151486a3e 100644 --- a/libbeat/processors/fingerprint/hash.go +++ b/libbeat/processors/fingerprint/hash.go @@ -18,8 +18,6 @@ package fingerprint import ( - "crypto/md5" - "crypto/sha1" "crypto/sha256" "crypto/sha512" "hash" @@ -37,14 +35,13 @@ type hashMethod func() hash.Hash var hashes = map[string]namedHashMethod{} func init() { - for _, h := range []namedHashMethod{ - {Name: "md5", Hash: md5.New}, - {Name: "sha1", Hash: sha1.New}, + fipsApprovedHashes := []namedHashMethod{ {Name: "sha256", Hash: sha256.New}, {Name: "sha384", Hash: sha512.New384}, {Name: "sha512", Hash: sha512.New}, {Name: "xxhash", Hash: newXxHash}, - } { + } + for _, h := range fipsApprovedHashes { hashes[h.Name] = h } } diff --git a/libbeat/processors/fingerprint/hash_fips_test.go b/libbeat/processors/fingerprint/hash_fips_test.go new file mode 100644 index 000000000000..8beeb4b3e5f5 --- /dev/null +++ b/libbeat/processors/fingerprint/hash_fips_test.go @@ -0,0 +1,34 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build requirefips + +package fingerprint + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHashMethod(t *testing.T) { + require.Len(t, hashes, 4) + require.Contains(t, hashes, "sha256") + require.Contains(t, hashes, "sha384") + require.Contains(t, hashes, "sha512") + require.Contains(t, hashes, "xxhash") +} diff --git a/libbeat/processors/fingerprint/hash_nofips.go b/libbeat/processors/fingerprint/hash_nofips.go new file mode 100644 index 000000000000..16a002b84d14 --- /dev/null +++ b/libbeat/processors/fingerprint/hash_nofips.go @@ -0,0 +1,35 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build !requirefips + +package fingerprint + +import ( + "crypto/md5" + "crypto/sha1" +) + +func init() { + nonFipsApprovedHashes := []namedHashMethod{ + {Name: "md5", Hash: md5.New}, + {Name: "sha1", Hash: sha1.New}, + } + for _, h := range nonFipsApprovedHashes { + hashes[h.Name] = h + } +} diff --git a/libbeat/processors/fingerprint/hash_nofips_test.go b/libbeat/processors/fingerprint/hash_nofips_test.go new file mode 100644 index 000000000000..865e6249e640 --- /dev/null +++ b/libbeat/processors/fingerprint/hash_nofips_test.go @@ -0,0 +1,36 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build !requirefips + +package fingerprint + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestHashMethod(t *testing.T) { + require.Len(t, hashes, 6) + require.Contains(t, hashes, "md5") + require.Contains(t, hashes, "sha1") + require.Contains(t, hashes, "sha256") + require.Contains(t, hashes, "sha384") + require.Contains(t, hashes, "sha512") + require.Contains(t, hashes, "xxhash") +} From da802a88cbf970027b80599a86cb9af2e964adc4 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Tue, 25 Feb 2025 19:20:01 +1030 Subject: [PATCH 2/9] mod: update elastic/mito to version v1.17.0 (#42851) This adds: - two parameter tail function and its conjugate (elastic/mito#81) - array sum function (elastic/mito#82) --- CHANGELOG.next.asciidoc | 1 + NOTICE.txt | 4 +-- go.mod | 2 +- go.sum | 4 +-- .../filebeat/docs/inputs/input-cel.asciidoc | 4 ++- x-pack/filebeat/input/cel/input_test.go | 29 +++++++++++++++++++ 6 files changed, 38 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 62b8ef12b66c..0b58e035277b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -424,6 +424,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - The journald input is now generally available. {pull}42107[42107] - Add metrics for number of events and pages published by HTTPJSON input. {issue}42340[42340] {pull}42442[42442] - Add `etw` input fallback to attach an already existing session. {pull}42847[42847] +- Update CEL mito extensions to v1.17.0. {pull}42851[42851] *Auditbeat* diff --git a/NOTICE.txt b/NOTICE.txt index 4ff9df845df0..afc3813e3c76 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -15516,11 +15516,11 @@ limitations under the License. -------------------------------------------------------------------------------- Dependency : github.com/elastic/mito -Version: v1.16.0 +Version: v1.17.0 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/mito@v1.16.0/LICENSE: +Contents of probable licence file $GOMODCACHE/github.com/elastic/mito@v1.17.0/LICENSE: Apache License diff --git a/go.mod b/go.mod index cb9e95e37532..30f5522581b2 100644 --- a/go.mod +++ b/go.mod @@ -181,7 +181,7 @@ require ( github.com/elastic/go-elasticsearch/v8 v8.17.0 github.com/elastic/go-quark v0.3.0 github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727 - github.com/elastic/mito v1.16.0 + github.com/elastic/mito v1.17.0 github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 github.com/elastic/sarama v1.19.1-0.20241120141909-c7eabfcee7e5 github.com/elastic/tk-btf v0.1.0 diff --git a/go.sum b/go.sum index 19de7bd540aa..5d1702535981 100644 --- a/go.sum +++ b/go.sum @@ -390,8 +390,8 @@ github.com/elastic/gopacket v1.1.20-0.20241002174017-e8c5fda595e6 h1:VgOx6omXIMK github.com/elastic/gopacket v1.1.20-0.20241002174017-e8c5fda595e6/go.mod h1:riddUzxTSBpJXk3qBHtYr4qOhFhT6k/1c0E3qkQjQpA= github.com/elastic/gosigar v0.14.3 h1:xwkKwPia+hSfg9GqrCUKYdId102m9qTJIIr7egmK/uo= github.com/elastic/gosigar v0.14.3/go.mod h1:iXRIGg2tLnu7LBdpqzyQfGDEidKCfWcCMS0WKyPWoMs= -github.com/elastic/mito v1.16.0 h1:7UYy1OpJ8rlr4nzy/HDYQHuHjUIDMCofk5ICalYC2LA= -github.com/elastic/mito v1.16.0/go.mod h1:J+wCf4HccW2YoSFmZMGu+d06gN+WmnIlj5ehBqine74= +github.com/elastic/mito v1.17.0 h1:UEEFfQy5WhS6vVvMPMwHvdn5rH24eBJMb2ZOlGBkI5s= +github.com/elastic/mito v1.17.0/go.mod h1:nG5MoOsgJwVlglhlANiBFmHWqoIjrpbR5vy612wE8yE= github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015 h1:z8cC8GASpPo8yKlbnXI36HQ/BM9wYjhBPNbDjAWm0VU= github.com/elastic/mock-es v0.0.0-20240712014503-e5b47ece0015/go.mod h1:qH9DX/Dmflz6EAtaks/+2SsdQzecVAKE174Zl66hk7E= github.com/elastic/pkcs8 v1.0.0 h1:HhitlUKxhN288kcNcYkjW6/ouvuwJWd9ioxpjnD9jVA= diff --git a/x-pack/filebeat/docs/inputs/input-cel.asciidoc b/x-pack/filebeat/docs/inputs/input-cel.asciidoc index 42ca34dd57bb..b238545350c0 100644 --- a/x-pack/filebeat/docs/inputs/input-cel.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-cel.asciidoc @@ -1,7 +1,7 @@ [role="xpack"] :type: cel -:mito_version: v1.16.0 +:mito_version: v1.17.0 :mito_docs: https://pkg.go.dev/github.com/elastic/mito@{mito_version} [id="{beatname_lc}-input-{type}"] @@ -160,9 +160,11 @@ As noted above the `cel` input provides functions, macros, and global variables ** {mito_docs}/lib#hdr-Drop[Drop] ** {mito_docs}/lib#hdr-Drop_Empty[Drop Empty] ** {mito_docs}/lib#hdr-Flatten[Flatten] +** {mito_docs}/lib#hdr-Front[Front] ** {mito_docs}/lib#hdr-Keys[Keys] ** {mito_docs}/lib#hdr-Max[Max] ** {mito_docs}/lib#hdr-Min[Min] +** {mito_docs}/lib#hdr-Sum[Sum] ** {mito_docs}/lib#hdr-Tail[Tail] ** {mito_docs}/lib#hdr-Values[Values] ** {mito_docs}/lib#hdr-With[With] diff --git a/x-pack/filebeat/input/cel/input_test.go b/x-pack/filebeat/input/cel/input_test.go index 7ccfb9ccbee2..6c1f90a60dda 100644 --- a/x-pack/filebeat/input/cel/input_test.go +++ b/x-pack/filebeat/input/cel/input_test.go @@ -95,6 +95,35 @@ var inputTests = []struct { }, }}, }, + { + name: "hello_world_sum", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":string(sum([1,2,3,4]))}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, + }, + want: []map[string]interface{}{{ + "message": "10", + }}, + }, + { + name: "hello_world_front_and_tail_2", + config: map[string]interface{}{ + "interval": 1, + "program": `{"events":[{"message":front([1,2,3,4,5],2)}, {"message":tail([1,2,3,4,5],2)}]}`, + "state": nil, + "resource": map[string]interface{}{ + "url": "", + }, + }, + want: []map[string]interface{}{ + {"message": []any{1.0, 2.0}}, + {"message": []any{3.0, 4.0, 5.0}}, + }, + }, { name: "bad_events_type", config: map[string]interface{}{ From bbfa29021664ae4d5f4a67465593ec441dad6c13 Mon Sep 17 00:00:00 2001 From: kruskall <99559985+kruskall@users.noreply.github.com> Date: Tue, 25 Feb 2025 12:51:10 +0100 Subject: [PATCH 3/9] feat(fips): return an error when creating a kerberos client (#42597) * feat(fips): return an error when creating a kerberos client kerberos lib is implementing a lot of crypto :( * test: add kerberos client tests --------- Co-authored-by: Pierre HILBERT --- libbeat/common/transport/kerberos/client.go | 43 ++------------- .../common/transport/kerberos/client_fips.go | 29 ++++++++++ .../transport/kerberos/client_fips_test.go | 39 ++++++++++++++ .../transport/kerberos/client_nofips.go | 53 +++++++++++++++++++ .../transport/kerberos/client_nofips_test.go | 39 ++++++++++++++ 5 files changed, 163 insertions(+), 40 deletions(-) create mode 100644 libbeat/common/transport/kerberos/client_fips.go create mode 100644 libbeat/common/transport/kerberos/client_fips_test.go create mode 100644 libbeat/common/transport/kerberos/client_nofips.go create mode 100644 libbeat/common/transport/kerberos/client_nofips_test.go diff --git a/libbeat/common/transport/kerberos/client.go b/libbeat/common/transport/kerberos/client.go index 3561f235bd06..3c05d9c69930 100644 --- a/libbeat/common/transport/kerberos/client.go +++ b/libbeat/common/transport/kerberos/client.go @@ -18,48 +18,11 @@ package kerberos import ( - "fmt" "net/http" - - krbclient "github.com/jcmturner/gokrb5/v8/client" - krbconfig "github.com/jcmturner/gokrb5/v8/config" - "github.com/jcmturner/gokrb5/v8/keytab" - "github.com/jcmturner/gokrb5/v8/spnego" ) -type Client struct { - spClient *spnego.Client -} - -func NewClient(config *Config, httpClient *http.Client, esurl string) (*Client, error) { - var krbClient *krbclient.Client - krbConf, err := krbconfig.Load(config.ConfigPath) - if err != nil { - return nil, fmt.Errorf("error creating Kerberos client: %w", err) - } - - switch config.AuthType { - case authKeytab: - kTab, err := keytab.Load(config.KeyTabPath) - if err != nil { - return nil, fmt.Errorf("cannot load keytab file %s: %w", config.KeyTabPath, err) - } - krbClient = krbclient.NewWithKeytab(config.Username, config.Realm, kTab, krbConf) - case authPassword: - krbClient = krbclient.NewWithPassword(config.Username, config.Realm, config.Password, krbConf) - default: - return nil, InvalidAuthType - } - - return &Client{ - spClient: spnego.NewClient(krbClient, httpClient, ""), - }, nil -} - -func (c *Client) Do(req *http.Request) (*http.Response, error) { - return c.spClient.Do(req) -} +type Client interface { + Do(req *http.Request) (*http.Response, error) -func (c *Client) CloseIdleConnections() { - c.spClient.CloseIdleConnections() + CloseIdleConnections() } diff --git a/libbeat/common/transport/kerberos/client_fips.go b/libbeat/common/transport/kerberos/client_fips.go new file mode 100644 index 000000000000..13d0b8f74d1a --- /dev/null +++ b/libbeat/common/transport/kerberos/client_fips.go @@ -0,0 +1,29 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build requirefips + +package kerberos + +import ( + "errors" + "net/http" +) + +func NewClient(config *Config, httpClient *http.Client, esurl string) (Client, error) { + return nil, errors.New("kerberos is not supported in fips mode") +} diff --git a/libbeat/common/transport/kerberos/client_fips_test.go b/libbeat/common/transport/kerberos/client_fips_test.go new file mode 100644 index 000000000000..dc45a590786e --- /dev/null +++ b/libbeat/common/transport/kerberos/client_fips_test.go @@ -0,0 +1,39 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build requirefips + +package kerberos + +import ( + "net/http" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewClient(t *testing.T) { + cfg, err := os.CreateTemp(t.TempDir(), "config") + require.NoError(t, err) + c, err := NewClient(&Config{ + AuthType: authPassword, + ConfigPath: cfg.Name(), + }, http.DefaultClient, "") + require.Nil(t, c) + require.EqualError(t, err, "kerberos is not supported in fips mode") +} diff --git a/libbeat/common/transport/kerberos/client_nofips.go b/libbeat/common/transport/kerberos/client_nofips.go new file mode 100644 index 000000000000..f734cb750164 --- /dev/null +++ b/libbeat/common/transport/kerberos/client_nofips.go @@ -0,0 +1,53 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build !requirefips + +package kerberos + +import ( + "fmt" + "net/http" + + krbclient "github.com/jcmturner/gokrb5/v8/client" + krbconfig "github.com/jcmturner/gokrb5/v8/config" + "github.com/jcmturner/gokrb5/v8/keytab" + "github.com/jcmturner/gokrb5/v8/spnego" +) + +func NewClient(config *Config, httpClient *http.Client, esurl string) (Client, error) { + var krbClient *krbclient.Client + krbConf, err := krbconfig.Load(config.ConfigPath) + if err != nil { + return nil, fmt.Errorf("error creating Kerberos client: %w", err) + } + + switch config.AuthType { + case authKeytab: + kTab, err := keytab.Load(config.KeyTabPath) + if err != nil { + return nil, fmt.Errorf("cannot load keytab file %s: %w", config.KeyTabPath, err) + } + krbClient = krbclient.NewWithKeytab(config.Username, config.Realm, kTab, krbConf) + case authPassword: + krbClient = krbclient.NewWithPassword(config.Username, config.Realm, config.Password, krbConf) + default: + return nil, InvalidAuthType + } + + return spnego.NewClient(krbClient, httpClient, ""), nil +} diff --git a/libbeat/common/transport/kerberos/client_nofips_test.go b/libbeat/common/transport/kerberos/client_nofips_test.go new file mode 100644 index 000000000000..d64f9cc87f04 --- /dev/null +++ b/libbeat/common/transport/kerberos/client_nofips_test.go @@ -0,0 +1,39 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build !requirefips + +package kerberos + +import ( + "net/http" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewClient(t *testing.T) { + cfg, err := os.CreateTemp(t.TempDir(), "config") + require.NoError(t, err) + c, err := NewClient(&Config{ + AuthType: authPassword, + ConfigPath: cfg.Name(), + }, http.DefaultClient, "") + require.Nil(t, err) + require.NotNil(t, c) +} From 9dad72a7aa1f3f2393adbe5fb89544165b8c4b7a Mon Sep 17 00:00:00 2001 From: Victor Martinez Date: Tue, 25 Feb 2025 16:28:12 +0100 Subject: [PATCH 4/9] github-actions: dependabot configuration (#42437) --- .github/dependabot.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 2b2bc132b1c3..6427c5ff284a 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -66,7 +66,8 @@ updates: # GitHub actions - package-ecosystem: "github-actions" - directory: "/" + directories: + - "/" schedule: interval: "weekly" day: "sunday" From 42c4aa0af9f1d6f28e98a675dadc824fe2c60c11 Mon Sep 17 00:00:00 2001 From: Kaan Yalti Date: Tue, 25 Feb 2025 12:05:36 -0500 Subject: [PATCH 5/9] enhancement(4534): removed encryption and encrytion related logic from segments (#42848) * enhancement(4534): removed encryption and encrytion related logic from segments * enhancement(4534): removed encryption related tests and config * enhancement(4534): updated documentation * enhancement(4534): ran make update * enhancement(4534): updated changelog * enhancement(4534): replaces rand with rand/v2 * enhancement(4534): remove commented config var --- CHANGELOG.next.asciidoc | 1 + .../queue/diskqueue/benchmark_test.go | 88 ++++------ .../queue/diskqueue/compression_test.go | 21 ++- libbeat/publisher/queue/diskqueue/config.go | 3 - .../diskqueue/docs/on-disk-structures.md | 12 +- .../queue/diskqueue/docs/schemaV2.pic | 2 - .../queue/diskqueue/docs/schemaV2.svg | 18 +- .../queue/diskqueue/enc_compress_test.go | 76 -------- .../publisher/queue/diskqueue/encryption.go | 166 ------------------ .../queue/diskqueue/encryption_test.go | 75 -------- libbeat/publisher/queue/diskqueue/segments.go | 79 +-------- .../queue/diskqueue/segments_test.go | 72 +------- 12 files changed, 70 insertions(+), 543 deletions(-) delete mode 100644 libbeat/publisher/queue/diskqueue/enc_compress_test.go delete mode 100644 libbeat/publisher/queue/diskqueue/encryption.go delete mode 100644 libbeat/publisher/queue/diskqueue/encryption_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0b58e035277b..6da9b2653ee7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -20,6 +20,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Replace default Ubuntu-based images with UBI-minimal-based ones {pull}42150[42150] - Fix templates and docs to use correct `--` version of command line arguments. {issue}42038[42038] {pull}42060[42060] - removed support for a single `-` to precede multi-letter command line arguments. Use `--` instead. {issue}42117[42117] {pull}42209[42209] +- Removed encryption from diskqueue V2 for fips compliance {issue}4534[4534]{pull}42848[42848] *Auditbeat* diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 7665c4fd780b..c84868732a8f 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -30,7 +30,7 @@ package diskqueue import ( - "math/rand" + "math/rand/v2" "testing" "time" @@ -45,7 +45,7 @@ var ( // constant event time eventTime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) - //sample event messages, so size of every frame isn't identical + // sample event messages, so size of every frame isn't identical msgs = []string{ "192.168.33.1 - - [26/Dec/2016:16:22:00 +0000] \"GET / HTTP/1.1\" 200 484 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36\"", "{\"eventVersion\":\"1.05\",\"userIdentity\":{\"type\":\"IAMUser\",\"principalId\":\"EXAMPLE_ID\",\"arn\":\"arn:aws:iam::0123456789012:user/Alice\",\"accountId\":\"0123456789012\",\"accessKeyId\":\"EXAMPLE_KEY\",\"userName\":\"Alice\",\"sessionContext\":{\"sessionIssuer\":{},\"webIdFederationData\":{},\"attributes\":{\"mfaAuthenticated\":\"true\",\"creationDate\":\"2020-01-08T15:12:16Z\"}},\"invokedBy\":\"signin.amazonaws.com\"},\"eventTime\":\"2020-01-08T20:58:45Z\",\"eventSource\":\"cloudtrail.amazonaws.com\",\"eventName\":\"UpdateTrail\",\"awsRegion\":\"us-west-2\",\"sourceIPAddress\":\"127.0.0.1\",\"userAgent\":\"signin.amazonaws.com\",\"requestParameters\":{\"name\":\"arn:aws:cloudtrail:us-west-2:0123456789012:trail/TEST-trail\",\"s3BucketName\":\"test-cloudtrail-bucket\",\"snsTopicName\":\"\",\"isMultiRegionTrail\":true,\"enableLogFileValidation\":false,\"kmsKeyId\":\"\"},\"responseElements\":{\"name\":\"TEST-trail\",\"s3BucketName\":\"test-cloudtrail-bucket\",\"snsTopicName\":\"\",\"snsTopicARN\":\"\",\"includeGlobalServiceEvents\":true,\"isMultiRegionTrail\":true,\"trailARN\":\"arn:aws:cloudtrail:us-west-2:0123456789012:trail/TEST-trail\",\"logFileValidationEnabled\":false,\"isOrganizationTrail\":false},\"requestID\":\"EXAMPLE-f3da-42d1-84f5-EXAMPLE\",\"eventID\":\"EXAMPLE-b5e9-4846-8407-EXAMPLE\",\"readOnly\":false,\"eventType\":\"AwsApiCall\",\"recipientAccountId\":\"0123456789012\"}", @@ -63,7 +63,7 @@ func makePublisherEvent(r *rand.Rand) publisher.Event { Content: beat.Event{ Timestamp: eventTime, Fields: mapstr.M{ - "message": msgs[r.Intn(len(msgs))], + "message": msgs[r.IntN(len(msgs))], }, }, } @@ -73,12 +73,10 @@ func makePublisherEvent(r *rand.Rand) publisher.Event { // hold the queue. Location of the temporary directory is stored in // the queue settings. Call `cleanup` when done with the queue to // close the queue and remove the temp dir. -func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue, queue.Producer) { +func setup(b *testing.B, compress bool, protobuf bool) (*diskQueue, queue.Producer) { s := DefaultSettings() s.Path = b.TempDir() - if encrypt { - s.EncryptionKey = []byte("testtesttesttest") - } + s.UseCompression = compress q, err := NewQueue(logp.L(), nil, s, nil) if err != nil { @@ -138,14 +136,14 @@ func produceThenConsume(r *rand.Rand, p queue.Producer, q *diskQueue, num_events // benchmarkQueue is a wrapper for produceAndConsume, it tries to limit // timers to just produceAndConsume -func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool, async bool, protobuf bool, b *testing.B) { +func benchmarkQueue(num_events int, batch_size int, compress bool, async bool, protobuf bool, b *testing.B) { b.ResetTimer() var err error for n := 0; n < b.N; n++ { b.StopTimer() - r := rand.New(rand.NewSource(1)) - q, p := setup(b, encrypt, compress, protobuf) + r := rand.New(rand.NewPCG(1, 2)) + q, p := setup(b, compress, protobuf) b.StartTimer() if async { if err = produceAndConsume(r, p, q, num_events, batch_size); err != nil { @@ -164,76 +162,50 @@ func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool, // Async benchmarks func BenchmarkAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, false, true, false, b) + benchmarkQueue(1000, 10, false, true, false, b) } + func BenchmarkAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, false, true, false, b) -} -func BenchmarkEncryptAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, false, true, false, b) -} -func BenchmarkEncryptAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, false, true, false, b) + benchmarkQueue(100000, 1000, false, true, false, b) } + func BenchmarkCompressAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, true, true, false, b) + benchmarkQueue(1000, 10, true, true, false, b) } + func BenchmarkCompressAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, true, true, false, b) -} -func BenchmarkEncryptCompressAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, true, true, false, b) -} -func BenchmarkEncryptCompressAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, true, true, false, b) + benchmarkQueue(100000, 1000, true, true, false, b) } + func BenchmarkProtoAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, false, true, true, b) + benchmarkQueue(1000, 10, false, true, true, b) } + func BenchmarkProtoAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, false, true, true, b) -} -func BenchmarkEncCompProtoAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, true, true, true, b) -} -func BenchmarkEncCompProtoAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, true, true, true, b) + benchmarkQueue(100000, 1000, false, true, true, b) } // Sync Benchmarks func BenchmarkSync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, false, false, false, b) + benchmarkQueue(1000, 10, false, false, false, b) } + func BenchmarkSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, false, false, false, b) -} -func BenchmarkEncryptSync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, false, false, false, b) -} -func BenchmarkEncryptSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, false, false, false, b) + benchmarkQueue(100000, 1000, false, false, false, b) } + func BenchmarkCompressSync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, true, false, false, b) + benchmarkQueue(1000, 10, true, false, false, b) } + func BenchmarkCompressSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, true, false, false, b) -} -func BenchmarkEncryptCompressSync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, true, false, false, b) -} -func BenchmarkEncryptCompressSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, true, false, false, b) + benchmarkQueue(100000, 1000, true, false, false, b) } + func BenchmarkProtoSync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, false, false, true, b) + benchmarkQueue(1000, 10, false, false, true, b) } + func BenchmarkProtoSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, false, false, true, b) -} -func BenchmarkEncCompProtoSync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, true, false, true, b) -} -func BenchmarkEncCompProtoSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, true, false, true, b) + benchmarkQueue(100000, 1000, false, false, true, b) } diff --git a/libbeat/publisher/queue/diskqueue/compression_test.go b/libbeat/publisher/queue/diskqueue/compression_test.go index 2b684e0f28dd..e4e89cf7f3d6 100644 --- a/libbeat/publisher/queue/diskqueue/compression_test.go +++ b/libbeat/publisher/queue/diskqueue/compression_test.go @@ -47,7 +47,8 @@ func TestCompressionReader(t *testing.T) { 0x00, 0x00, 0x80, 0x61, 0x62, 0x63, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x6c, - 0x3e, 0x7b, 0x08, 0x00}, + 0x3e, 0x7b, 0x08, 0x00, + }, }, "abc compressed with pierrec lz4": { plaintext: []byte("abc"), @@ -57,7 +58,8 @@ func TestCompressionReader(t *testing.T) { 0x00, 0x00, 0x80, 0x61, 0x62, 0x63, 0x00, 0x00, 0x00, 0x00, 0xff, 0x53, - 0xd1, 0x32}, + 0xd1, 0x32, + }, }, } @@ -85,7 +87,8 @@ func TestCompressionWriter(t *testing.T) { 0x00, 0x00, 0x80, 0x61, 0x62, 0x63, 0x00, 0x00, 0x00, 0x00, 0xff, 0x53, - 0xd1, 0x32}, + 0xd1, 0x32, + }, }, } @@ -100,6 +103,16 @@ func TestCompressionWriter(t *testing.T) { } } +func NopWriteCloseSyncer(w io.WriteCloser) WriteCloseSyncer { + return nopWriteCloseSyncer{w} +} + +type nopWriteCloseSyncer struct { + io.WriteCloser +} + +func (nopWriteCloseSyncer) Sync() error { return nil } + func TestCompressionRoundTrip(t *testing.T) { tests := map[string]struct { plaintext []byte @@ -141,7 +154,7 @@ func TestCompressionSync(t *testing.T) { src1 := bytes.NewReader(tc.plaintext) _, err := io.Copy(cw, src1) assert.Nil(t, err, name) - //prior to v4.1.15 of pierrec/lz4 there was a + // prior to v4.1.15 of pierrec/lz4 there was a // bug that prevented writing after a Flush. // The call to Sync here exercises Flush. err = cw.Sync() diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index 56e9d35e08d2..309292e025c4 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -64,9 +64,6 @@ type Settings struct { RetryInterval time.Duration MaxRetryInterval time.Duration - // EncryptionKey is used to encrypt data if SchemaVersion 2 is used. - EncryptionKey []byte - // UseCompression enables or disables LZ4 compression UseCompression bool } diff --git a/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md b/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md index 81d83e922b1a..e51dd1b89fb6 100644 --- a/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md +++ b/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md @@ -58,21 +58,11 @@ a count of the number of frames in the segment, which is an unsigned flags, which signify options. The size of options is 32-bits in little-endian format. -If no fields are set in the options field, then un-encrypted frames -follow the header. - -If the options field has the first bit set, then encryption is -enabled. In which case, the next 128-bits are the initialization -vector and the rest of the file is encrypted frames. +If no fields are set in the options field, then uncompressed frames follow the header. If the options field has the second bit set, then compression is enabled. In which case, LZ4 compressed frames follow the header. -If both the first and second bit of the options field are set, then -both compression and encryption are enabled. The next 128-bits are -the initialization vector and the rest of the file is LZ4 compressed -frames. - If the options field has the third bit set, then Google Protobuf is used to serialize the data in the frame instead of CBOR. diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic b/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic index 7cf0a4425c4c..44bbfd6ab504 100644 --- a/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic @@ -2,5 +2,3 @@ boxht = 0.25 VERSION: box "version (uint32)" wid 4; COUNT: box "count (uint32)" wid 4 with .nw at VERSION.sw; OPTIONS: box "options (uint32)" wid 4 with .nw at COUNT.sw; -IV: box "initialization vector (128 bits)" wid 4 ht 1 with .nw at OPTIONS.sw -FRAME: box "Encrypted Frames" dashed wid 4 ht 2 with .nw at IV.sw; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg b/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg index 76f5a51feccf..85928aba47e6 100644 --- a/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg @@ -1,13 +1,9 @@ - - -version (uint32) - -count (uint32) - -options (uint32) - -initialization vector (128 bits) - -Encrypted Frames + + +version (uint32) + +count (uint32) + +options (uint32) diff --git a/libbeat/publisher/queue/diskqueue/enc_compress_test.go b/libbeat/publisher/queue/diskqueue/enc_compress_test.go deleted file mode 100644 index 637765c24cb5..000000000000 --- a/libbeat/publisher/queue/diskqueue/enc_compress_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package diskqueue - -import ( - "bytes" - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestEncryptionCompressionRoundTrip(t *testing.T) { - tests := map[string]struct { - plaintext []byte - }{ - "1 rune": {plaintext: []byte("a")}, - "16 runes": {plaintext: []byte("bbbbbbbbbbbbbbbb")}, - "17 runes": {plaintext: []byte("ccccccccccccccccc")}, - "small json": {plaintext: []byte("{\"message\":\"2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA\"}")}, - "large json": {plaintext: []byte("{\"message\":\"{\\\"CacheCacheStatus\\\":\\\"hit\\\",\\\"CacheResponseBytes\\\":26888,\\\"CacheResponseStatus\\\":200,\\\"CacheTieredFill\\\":true,\\\"ClientASN\\\":1136,\\\"ClientCountry\\\":\\\"nl\\\",\\\"ClientDeviceType\\\":\\\"desktop\\\",\\\"ClientIP\\\":\\\"89.160.20.156\\\",\\\"ClientIPClass\\\":\\\"noRecord\\\",\\\"ClientRequestBytes\\\":5324,\\\"ClientRequestHost\\\":\\\"eqlplayground.io\\\",\\\"ClientRequestMethod\\\":\\\"GET\\\",\\\"ClientRequestPath\\\":\\\"/40865/bundles/plugin/securitySolution/8.0.0/securitySolution.chunk.9.js\\\",\\\"ClientRequestProtocol\\\":\\\"HTTP/1.1\\\",\\\"ClientRequestReferer\\\":\\\"https://eqlplayground.io/s/eqldemo/app/security/timelines/default?sourcerer=(default:!(.siem-signals-eqldemo))&timerange=(global:(linkTo:!(),timerange:(from:%272021-03-03T19:55:15.519Z%27,fromStr:now-24h,kind:relative,to:%272021-03-04T19:55:15.519Z%27,toStr:now)),timeline:(linkTo:!(),timerange:(from:%272020-03-04T19:55:28.684Z%27,fromStr:now-1y,kind:relative,to:%272021-03-04T19:55:28.692Z%27,toStr:now)))&timeline=(activeTab:eql,graphEventId:%27%27,id:%2769f93840-7d23-11eb-866c-79a0609409ba%27,isOpen:!t)\\\",\\\"ClientRequestURI\\\":\\\"/40865/bundles/plugin/securitySolution/8.0.0/securitySolution.chunk.9.js\\\",\\\"ClientRequestUserAgent\\\":\\\"Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/91.0.4472.124Safari/537.36\\\",\\\"ClientSSLCipher\\\":\\\"NONE\\\",\\\"ClientSSLProtocol\\\":\\\"none\\\",\\\"ClientSrcPort\\\":0,\\\"ClientXRequestedWith\\\":\\\"\\\",\\\"EdgeColoCode\\\":\\\"33.147.138.217\\\",\\\"EdgeColoID\\\":20,\\\"EdgeEndTimestamp\\\":1625752958875000000,\\\"EdgePathingOp\\\":\\\"wl\\\",\\\"EdgePathingSrc\\\":\\\"macro\\\",\\\"EdgePathingStatus\\\":\\\"nr\\\",\\\"EdgeRateLimitAction\\\":\\\"\\\",\\\"EdgeRateLimitID\\\":0,\\\"EdgeRequestHost\\\":\\\"eqlplayground.io\\\",\\\"EdgeResponseBytes\\\":24743,\\\"EdgeResponseCompressionRatio\\\":0,\\\"EdgeResponseContentType\\\":\\\"application/javascript\\\",\\\"EdgeResponseStatus\\\":200,\\\"EdgeServerIP\\\":\\\"89.160.20.156\\\",\\\"EdgeStartTimestamp\\\":1625752958812000000,\\\"FirewallMatchesActions\\\":[],\\\"FirewallMatchesRuleIDs\\\":[],\\\"FirewallMatchesSources\\\":[],\\\"OriginIP\\\":\\\"\\\",\\\"OriginResponseBytes\\\":0,\\\"OriginResponseHTTPExpires\\\":\\\"\\\",\\\"OriginResponseHTTPLastModified\\\":\\\"\\\",\\\"OriginResponseStatus\\\":0,\\\"OriginResponseTime\\\":0,\\\"OriginSSLProtocol\\\":\\\"unknown\\\",\\\"ParentRayID\\\":\\\"66b9d9f88b5b4c4f\\\",\\\"RayID\\\":\\\"66b9d9f890ae4c4f\\\",\\\"SecurityLevel\\\":\\\"off\\\",\\\"WAFAction\\\":\\\"unknown\\\",\\\"WAFFlags\\\":\\\"0\\\",\\\"WAFMatchedVar\\\":\\\"\\\",\\\"WAFProfile\\\":\\\"unknown\\\",\\\"WAFRuleID\\\":\\\"\\\",\\\"WAFRuleMessage\\\":\\\"\\\",\\\"WorkerCPUTime\\\":0,\\\"WorkerStatus\\\":\\\"unknown\\\",\\\"WorkerSubrequest\\\":true,\\\"WorkerSubrequestCount\\\":0,\\\"ZoneID\\\":393347122}\"}")}, - } - - for name, tc := range tests { - pr, pw := io.Pipe() - key := []byte("keykeykeykeykeyk") - src := bytes.NewReader(tc.plaintext) - var dst bytes.Buffer - var tEncBuf bytes.Buffer - var tCompBuf bytes.Buffer - - go func() { - ew, err := NewEncryptionWriter(NopWriteCloseSyncer(pw), key) - assert.Nil(t, err, name) - cw := NewCompressionWriter(ew) - _, err = io.Copy(cw, src) - assert.Nil(t, err, name) - err = cw.Close() - assert.Nil(t, err, name) - }() - - ter := io.TeeReader(pr, &tEncBuf) - er, err := NewEncryptionReader(io.NopCloser(ter), key) - assert.Nil(t, err, name) - - tcr := io.TeeReader(er, &tCompBuf) - - cr := NewCompressionReader(io.NopCloser(tcr)) - - _, err = io.Copy(&dst, cr) - assert.Nil(t, err, name) - // Check round trip worked - assert.Equal(t, tc.plaintext, dst.Bytes(), name) - // Check that cipher text and plaintext don't match - assert.NotEqual(t, tc.plaintext, tEncBuf.Bytes(), name) - // Check that compressed text and plaintext don't match - assert.NotEqual(t, tc.plaintext, tCompBuf.Bytes(), name) - // Check that compressed text and ciphertext don't match - assert.NotEqual(t, tEncBuf.Bytes(), tCompBuf.Bytes(), name) - } -} diff --git a/libbeat/publisher/queue/diskqueue/encryption.go b/libbeat/publisher/queue/diskqueue/encryption.go deleted file mode 100644 index 497442017eb4..000000000000 --- a/libbeat/publisher/queue/diskqueue/encryption.go +++ /dev/null @@ -1,166 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package diskqueue - -import ( - "bytes" - "crypto/aes" - "crypto/cipher" - "crypto/rand" - "fmt" - "io" -) - -const ( - // KeySize is 128-bit - KeySize = 16 -) - -// EncryptionReader allows reading from a AES-128-CTR stream -type EncryptionReader struct { - src io.ReadCloser - stream cipher.Stream - block cipher.Block - iv []byte - ciphertext []byte -} - -// NewEncryptionReader returns a new AES-128-CTR decrypter -func NewEncryptionReader(r io.ReadCloser, key []byte) (*EncryptionReader, error) { - if len(key) != KeySize { - return nil, fmt.Errorf("key must be %d bytes long", KeySize) - } - - er := &EncryptionReader{} - er.src = r - - // turn key into block & save - block, err := aes.NewCipher(key) - if err != nil { - return nil, err - } - er.block = block - - // read IV from the io.ReadCloser - iv := make([]byte, aes.BlockSize) - if _, err := io.ReadFull(er.src, iv); err != nil { - return nil, err - } - er.iv = iv - - // create Stream - er.stream = cipher.NewCTR(block, iv) - - return er, nil -} - -func (er *EncryptionReader) Read(buf []byte) (int, error) { - if cap(er.ciphertext) >= len(buf) { - er.ciphertext = er.ciphertext[:len(buf)] - } else { - er.ciphertext = make([]byte, len(buf)) - } - n, err := er.src.Read(er.ciphertext) - if err != nil { - return n, err - } - er.stream.XORKeyStream(buf, er.ciphertext) - return n, nil -} - -func (er *EncryptionReader) Close() error { - return er.src.Close() -} - -// Reset Sets up stream again, assumes that caller has already set the -// src to the iv -func (er *EncryptionReader) Reset() error { - iv := make([]byte, aes.BlockSize) - if _, err := io.ReadFull(er.src, iv); err != nil { - return err - } - if !bytes.Equal(iv, er.iv) { - return fmt.Errorf("different iv, something is wrong") - } - - // create Stream - er.stream = cipher.NewCTR(er.block, iv) - return nil -} - -// EncryptionWriter allows writing to a AES-128-CTR stream -type EncryptionWriter struct { - dst WriteCloseSyncer - stream cipher.Stream - ciphertext []byte -} - -// NewEncryptionWriter returns a new AES-128-CTR stream encryptor -func NewEncryptionWriter(w WriteCloseSyncer, key []byte) (*EncryptionWriter, error) { - if len(key) != KeySize { - return nil, fmt.Errorf("key must be %d bytes long", KeySize) - } - - ew := &EncryptionWriter{} - - // turn key into block - block, err := aes.NewCipher(key) - if err != nil { - return nil, err - } - - // create random IV - iv := make([]byte, aes.BlockSize) - if _, err := io.ReadFull(rand.Reader, iv); err != nil { - return nil, err - } - - // create stream - stream := cipher.NewCTR(block, iv) - - //write IV - n, err := w.Write(iv) - if err != nil { - return nil, err - } - if n != len(iv) { - return nil, io.ErrShortWrite - } - - ew.dst = w - ew.stream = stream - return ew, nil -} - -func (ew *EncryptionWriter) Write(buf []byte) (int, error) { - if cap(ew.ciphertext) >= len(buf) { - ew.ciphertext = ew.ciphertext[:len(buf)] - } else { - ew.ciphertext = make([]byte, len(buf)) - } - ew.stream.XORKeyStream(ew.ciphertext, buf) - return ew.dst.Write(ew.ciphertext) -} - -func (ew *EncryptionWriter) Close() error { - return ew.dst.Close() -} - -func (ew *EncryptionWriter) Sync() error { - return ew.dst.Sync() -} diff --git a/libbeat/publisher/queue/diskqueue/encryption_test.go b/libbeat/publisher/queue/diskqueue/encryption_test.go deleted file mode 100644 index fb956d699b16..000000000000 --- a/libbeat/publisher/queue/diskqueue/encryption_test.go +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package diskqueue - -import ( - "bytes" - "crypto/aes" - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func NopWriteCloseSyncer(w io.WriteCloser) WriteCloseSyncer { - return nopWriteCloseSyncer{w} -} - -type nopWriteCloseSyncer struct { - io.WriteCloser -} - -func (nopWriteCloseSyncer) Sync() error { return nil } - -func TestEncryptionRoundTrip(t *testing.T) { - tests := map[string]struct { - plaintext []byte - }{ - "8 bits": {plaintext: []byte("a")}, - "128 bits": {plaintext: []byte("bbbbbbbbbbbbbbbb")}, - "136 bits": {plaintext: []byte("ccccccccccccccccc")}, - } - for name, tc := range tests { - pr, pw := io.Pipe() - src := bytes.NewReader(tc.plaintext) - var dst bytes.Buffer - var teeBuf bytes.Buffer - key := []byte("kkkkkkkkkkkkkkkk") - - go func() { - //NewEncryptionWriter writes iv, so needs to be in go routine - ew, err := NewEncryptionWriter(NopWriteCloseSyncer(pw), key) - assert.Nil(t, err, name) - _, err = io.Copy(ew, src) - assert.Nil(t, err, name) - ew.Close() - }() - - tr := io.TeeReader(pr, &teeBuf) - er, err := NewEncryptionReader(io.NopCloser(tr), key) - assert.Nil(t, err, name) - _, err = io.Copy(&dst, er) - assert.Nil(t, err, name) - // Check round trip worked - assert.Equal(t, tc.plaintext, dst.Bytes(), name) - // Check that iv & cipher text were written - assert.Equal(t, len(tc.plaintext)+aes.BlockSize, teeBuf.Len(), name) - // Check that cipher text and plaintext don't match - assert.NotEqual(t, tc.plaintext, teeBuf.Bytes()[aes.BlockSize:], name) - } -} diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index 7e3661f6e5b4..11eeb9991c6a 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -33,7 +33,6 @@ import ( // diskQueueSegments encapsulates segment-related queue metadata. type diskQueueSegments struct { - // A list of the segments that have not yet been completely written, sorted // by increasing segment ID. When the first entry has been completely // written, it is removed from this list and appended to reading. @@ -132,7 +131,7 @@ type segmentHeader struct { // Only present in schema version >= 1. frameCount uint32 - // options holds flags to enable features, for example encryption. + // options holds flags to enable features, for example compression. options uint32 } @@ -151,7 +150,7 @@ const currentSegmentVersion = 2 const segmentHeaderSize = 12 const ( - ENABLE_ENCRYPTION uint32 = 1 << iota // 0x1 + _ uint32 = 1 << iota // 0x1 ENABLE_COMPRESSION // 0x2 ENABLE_PROTOBUF // 0x4 ) @@ -256,28 +255,14 @@ func (segment *queueSegment) getReader(queueSettings Settings) (*segmentReader, sr.serializationFormat = SerializationCBOR } - if (header.options & ENABLE_ENCRYPTION) == ENABLE_ENCRYPTION { - sr.er, err = NewEncryptionReader(sr.src, queueSettings.EncryptionKey) - if err != nil { - sr.src.Close() - return nil, fmt.Errorf("couldn't create encryption reader: %w", err) - } - } if (header.options & ENABLE_COMPRESSION) == ENABLE_COMPRESSION { - if sr.er != nil { - sr.cr = NewCompressionReader(sr.er) - } else { - sr.cr = NewCompressionReader(sr.src) - } + sr.cr = NewCompressionReader(sr.src) } return sr, nil } -// getWriter sets up the segmentWriter. The order of encryption and -// compression is important. If both options are enabled we want -// encrypted compressed data not compressed encrypted data. This is -// because encryption will mask the repetions in the data making -// compression much less effective. getWriter should only be called +// getWriter sets up the segmentWriter. +// getWriter should only be called. // from the writer loop. func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, error) { var options uint32 @@ -287,10 +272,6 @@ func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, return nil, err } - if len(queueSettings.EncryptionKey) > 0 { - options = options | ENABLE_ENCRYPTION - } - if queueSettings.UseCompression { options = options | ENABLE_COMPRESSION } @@ -302,20 +283,8 @@ func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, return nil, err } - if (options & ENABLE_ENCRYPTION) == ENABLE_ENCRYPTION { - sw.ew, err = NewEncryptionWriter(sw.dst, queueSettings.EncryptionKey) - if err != nil { - sw.dst.Close() - return nil, fmt.Errorf("couldn't create encryption writer: %w", err) - } - } - if (options & ENABLE_COMPRESSION) == ENABLE_COMPRESSION { - if sw.ew != nil { - sw.cw = NewCompressionWriter(sw.ew) - } else { - sw.cw = NewCompressionWriter(sw.dst) - } + sw.cw = NewCompressionWriter(sw.dst) } return sw, nil @@ -474,7 +443,6 @@ func (segments *diskQueueSegments) sizeOnDisk() uint64 { // less compressable. type segmentReader struct { src io.ReadSeekCloser - er *EncryptionReader cr *CompressionReader serializationFormat SerializationFormat } @@ -483,9 +451,6 @@ func (r *segmentReader) Read(p []byte) (int, error) { if r.cr != nil { return r.cr.Read(p) } - if r.er != nil { - return r.er.Read(p) - } return r.src.Read(p) } @@ -493,9 +458,6 @@ func (r *segmentReader) Close() error { if r.cr != nil { return r.cr.Close() } - if r.er != nil { - return r.er.Close() - } return r.src.Close() } @@ -508,31 +470,12 @@ func (r *segmentReader) Seek(offset int64, whence int) (int64, error) { if _, err := r.src.Seek(segmentHeaderSize, io.SeekStart); err != nil { return 0, fmt.Errorf("could not seek past segment header: %w", err) } - if r.er != nil { - if err := r.er.Reset(); err != nil { - return 0, fmt.Errorf("could not reset encryption: %w", err) - } - } if err := r.cr.Reset(); err != nil { return 0, fmt.Errorf("could not reset compression: %w", err) } written, err := io.CopyN(io.Discard, r.cr, (offset+int64(whence))-segmentHeaderSize) return written + segmentHeaderSize, err } - if r.er != nil { - //can't seek before segment header - if (offset + int64(whence)) < segmentHeaderSize { - return 0, fmt.Errorf("illegal seek offset %d, whence %d", offset, whence) - } - if _, err := r.src.Seek(segmentHeaderSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("could not seek past segment header: %w", err) - } - if err := r.er.Reset(); err != nil { - return 0, fmt.Errorf("could not reset encryption: %w", err) - } - written, err := io.CopyN(io.Discard, r.er, (offset+int64(whence))-segmentHeaderSize) - return written + segmentHeaderSize, err - } return r.src.Seek(offset, whence) } @@ -545,7 +488,6 @@ func (r *segmentReader) Seek(offset int64, whence int) (int64, error) { // data less compressable. type segmentWriter struct { dst *os.File - ew *EncryptionWriter cw *CompressionWriter } @@ -553,9 +495,6 @@ func (w *segmentWriter) Write(p []byte) (int, error) { if w.cw != nil { return w.cw.Write(p) } - if w.ew != nil { - return w.ew.Write(p) - } return w.dst.Write(p) } @@ -563,9 +502,6 @@ func (w *segmentWriter) Close() error { if w.cw != nil { return w.cw.Close() } - if w.ew != nil { - return w.ew.Close() - } return w.dst.Close() } @@ -573,9 +509,6 @@ func (w *segmentWriter) Sync() error { if w.cw != nil { return w.cw.Sync() } - if w.ew != nil { - return w.ew.Sync() - } return w.dst.Sync() } diff --git a/libbeat/publisher/queue/diskqueue/segments_test.go b/libbeat/publisher/queue/diskqueue/segments_test.go index b80bba22e895..44721debeefd 100644 --- a/libbeat/publisher/queue/diskqueue/segments_test.go +++ b/libbeat/publisher/queue/diskqueue/segments_test.go @@ -27,43 +27,25 @@ import ( func TestSegmentsRoundTrip(t *testing.T) { tests := map[string]struct { id segmentID - encrypt bool compress bool plaintext []byte }{ - "No Encryption or Compression": { + "No Compression": { id: 0, - encrypt: false, compress: false, plaintext: []byte("no encryption or compression"), }, - "Encryption Only": { - id: 1, - encrypt: true, - compress: false, - plaintext: []byte("encryption only"), - }, - "Compression Only": { + "With Compression": { id: 2, - encrypt: false, compress: true, plaintext: []byte("compression only"), }, - "Encryption and Compression": { - id: 3, - encrypt: true, - compress: true, - plaintext: []byte("encryption and compression"), - }, } dir := t.TempDir() for name, tc := range tests { dst := make([]byte, len(tc.plaintext)) settings := DefaultSettings() settings.Path = dir - if tc.encrypt { - settings.EncryptionKey = []byte("keykeykeykeykeyk") - } settings.UseCompression = tc.compress qs := &queueSegment{ id: tc.id, @@ -86,7 +68,7 @@ func TestSegmentsRoundTrip(t *testing.T) { assert.Equal(t, len(dst), n, name) - //make sure we read back what we wrote + // make sure we read back what we wrote assert.Equal(t, tc.plaintext, dst, name) _, err = sr.Read(dst) @@ -101,31 +83,16 @@ func TestSegmentsRoundTrip(t *testing.T) { func TestSegmentReaderSeek(t *testing.T) { tests := map[string]struct { id segmentID - encrypt bool compress bool plaintexts [][]byte }{ - "No Encryption or compression": { + "No Compression": { id: 0, - encrypt: false, - compress: false, - plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, - }, - "Encryption Only": { - id: 1, - encrypt: true, compress: false, plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, }, - "Compression Only": { + "With Compression": { id: 2, - encrypt: false, - compress: true, - plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, - }, - "Encryption and Compression": { - id: 3, - encrypt: true, compress: true, plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, }, @@ -134,9 +101,6 @@ func TestSegmentReaderSeek(t *testing.T) { for name, tc := range tests { settings := DefaultSettings() settings.Path = dir - if tc.encrypt { - settings.EncryptionKey = []byte("keykeykeykeykeyk") - } settings.UseCompression = tc.compress qs := &queueSegment{ @@ -154,7 +118,7 @@ func TestSegmentReaderSeek(t *testing.T) { sw.Close() sr, err := qs.getReader(settings) assert.Nil(t, err, name) - //seek to second data piece + // seek to second data piece n, err := sr.Seek(segmentHeaderSize+int64(len(tc.plaintexts[0])), io.SeekStart) assert.Nil(t, err, name) assert.Equal(t, segmentHeaderSize+int64(len(tc.plaintexts[0])), n, name) @@ -171,35 +135,18 @@ func TestSegmentReaderSeek(t *testing.T) { func TestSegmentReaderSeekLocations(t *testing.T) { tests := map[string]struct { id segmentID - encrypt bool compress bool plaintexts [][]byte location int64 }{ - "No Encryption or Compression": { + "No Compression": { id: 0, - encrypt: false, compress: false, plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, location: -1, }, - "Encryption": { - id: 1, - encrypt: true, - compress: false, - plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, - location: 2, - }, "Compression": { id: 1, - encrypt: false, - compress: true, - plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, - location: 2, - }, - "Encryption and Compression": { - id: 1, - encrypt: true, compress: true, plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, location: 2, @@ -209,9 +156,6 @@ func TestSegmentReaderSeekLocations(t *testing.T) { for name, tc := range tests { settings := DefaultSettings() settings.Path = dir - if tc.encrypt { - settings.EncryptionKey = []byte("keykeykeykeykeyk") - } settings.UseCompression = tc.compress qs := &queueSegment{ id: tc.id, @@ -226,7 +170,7 @@ func TestSegmentReaderSeekLocations(t *testing.T) { sw.Close() sr, err := qs.getReader(settings) assert.Nil(t, err, name) - //seek to location + // seek to location _, err = sr.Seek(tc.location, io.SeekStart) assert.NotNil(t, err, name) } From 5197c4320a262ac850fa8bdb9718c2ba5e54027d Mon Sep 17 00:00:00 2001 From: "Alex K." <8418476+fearful-symmetry@users.noreply.github.com> Date: Tue, 25 Feb 2025 10:48:56 -0800 Subject: [PATCH 6/9] Add metrics for kernel_tracing provider, fix mutex issue (#42795) * add metrics for kernel_tracing provider, fix mutex issue * fix metrics setup, add tests * still tinkering with monitoring * add changelog --- CHANGELOG.next.asciidoc | 1 + .../sessionmd/add_session_metadata.go | 46 ++++++++++++++----- .../sessionmd/add_session_metadata_test.go | 22 +++++++++ .../kerneltracingprovider_linux.go | 32 +++++++++---- .../provider/kerneltracingprovider/metrics.go | 31 +++++++++++++ 5 files changed, 112 insertions(+), 20 deletions(-) create mode 100644 x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/metrics.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 6da9b2653ee7..12976ffcd40c 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -149,6 +149,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403] - hasher: Geneneral improvements and fixes. {pull}41863[41863] - hasher: Add a cached hasher for upcoming backend. {pull}41952[41952] - Split common tty definitions. {pull}42004[42004] +- Fix potential data loss in add_session_metadata. {pull}42795[42795] *Filebeat* diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go index ed6701e18064..6a5d115a5463 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata.go @@ -32,6 +32,9 @@ const ( logName = "processor." + processorName procfsType = "procfs" kernelTracingType = "kernel_tracing" + + regNameProcessDB = "processor.add_session_metadata.processdb" + regNameKernelTracing = "processor.add_session_metadata.kernel_tracing" ) // InitializeModule initializes this module. @@ -53,6 +56,31 @@ type addSessionMetadata struct { providerType string } +func genRegistry(reg *monitoring.Registry, base string) *monitoring.Registry { + // if more than one instance of the DB is running, start to increment the metrics keys. + // This is kind of an edge case, but best to handle it so monitoring does not explode. + // This seems like awkward code, but NewRegistry() loves to panic, so we need to be careful. + id := 0 + if reg.GetRegistry(base) != nil { + current := int(instanceID.Load()) + // because we call genRegistry() multiple times, make sure the registry doesn't exist before we iterate the counter + if current > 0 && reg.GetRegistry(fmt.Sprintf("%s.%d", base, current)) == nil { + id = current + } else { + id = int(instanceID.Add(1)) + } + + } + + regName := base + if id > 0 { + regName = fmt.Sprintf("%s.%d", base, id) + } + + metricsReg := reg.NewRegistry(regName) + return metricsReg +} + func New(cfg *cfg.C) (beat.Processor, error) { c := defaultConfig() if err := cfg.Unpack(&c); err != nil { @@ -60,18 +88,10 @@ func New(cfg *cfg.C) (beat.Processor, error) { } logger := logp.NewLogger(logName) - - id := int(instanceID.Add(1)) - regName := "processor.add_session_metadata.processdb" - // if more than one instance of the DB is running, start to increment the metrics keys. - if id > 1 { - regName = fmt.Sprintf("%s.%d", regName, id) - } - metricsReg := monitoring.Default.NewRegistry(regName) - + procDBReg := genRegistry(monitoring.Default, regNameProcessDB) ctx, cancel := context.WithCancel(context.Background()) reader := procfs.NewProcfsReader(*logger) - db, err := processdb.NewDB(ctx, metricsReg, reader, logger, c.DBReaperPeriod, c.ReapProcesses) + db, err := processdb.NewDB(ctx, procDBReg, reader, logger, c.DBReaperPeriod, c.ReapProcesses) if err != nil { cancel() return nil, fmt.Errorf("failed to create DB: %w", err) @@ -82,7 +102,8 @@ func New(cfg *cfg.C) (beat.Processor, error) { switch c.Backend { case "auto": - p, err = kerneltracingprovider.NewProvider(ctx, logger) + procDBReg := genRegistry(monitoring.Default, regNameKernelTracing) + p, err = kerneltracingprovider.NewProvider(ctx, logger, procDBReg) if err != nil { // Most likely cause of error is not supporting ebpf or kprobes on system, try procfs backfilledPIDs := db.ScrapeProcfs() @@ -108,7 +129,8 @@ func New(cfg *cfg.C) (beat.Processor, error) { } pType = procfsType case "kernel_tracing": - p, err = kerneltracingprovider.NewProvider(ctx, logger) + procDBReg := genRegistry(monitoring.Default, regNameKernelTracing) + p, err = kerneltracingprovider.NewProvider(ctx, logger, procDBReg) if err != nil { cancel() return nil, fmt.Errorf("failed to create kernel_tracing provider: %w", err) diff --git a/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go b/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go index 422af4c935c2..d0fa6fad0665 100644 --- a/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go +++ b/x-pack/auditbeat/processors/sessionmd/add_session_metadata_test.go @@ -8,6 +8,7 @@ package sessionmd import ( "context" + "fmt" "testing" "time" @@ -339,6 +340,27 @@ var ( logger = logp.NewLogger("add_session_metadata_test") ) +func TestMetricsSetup(t *testing.T) { + // init a metrics registry multiple times with the same name, ensure we don't panic, and the names are correct + reg := monitoring.NewRegistry() + firstName := "test.metrics" + secondName := "other.stuff" + genRegistry(reg, firstName) + require.NotNil(t, reg.Get(firstName)) + + genRegistry(reg, firstName) + require.NotNil(t, reg.Get(fmt.Sprintf("%s.1", firstName))) + + genRegistry(reg, secondName) + require.NotNil(t, reg.Get(secondName)) + require.Nil(t, reg.Get(fmt.Sprintf("%s.1", secondName))) + + genRegistry(reg, secondName) + require.NotNil(t, reg.Get(secondName)) + require.NotNil(t, reg.Get(fmt.Sprintf("%s.1", secondName))) + require.Nil(t, reg.Get(fmt.Sprintf("%s.2", secondName))) +} + func TestEnrich(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*15) defer cancel() diff --git a/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go index e57c5d693557..c94e6bb1dec9 100644 --- a/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go +++ b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/kerneltracingprovider_linux.go @@ -25,6 +25,7 @@ import ( "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/provider" "github.com/elastic/beats/v7/x-pack/auditbeat/processors/sessionmd/types" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/monitoring" ) type prvdr struct { @@ -82,7 +83,7 @@ func readPIDNsInode() (uint64, error) { } // NewProvider returns a new instance of kerneltracingprovider -func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, error) { +func NewProvider(ctx context.Context, logger *logp.Logger, reg *monitoring.Registry) (provider.Provider, error) { attr := quark.DefaultQueueAttr() attr.Flags = quark.QQ_ALL_BACKENDS | quark.QQ_ENTRY_LEADER | quark.QQ_NO_SNAPSHOT qq, err := quark.OpenQueue(attr, 64) @@ -90,6 +91,8 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e return nil, fmt.Errorf("open queue: %w", err) } + procMetrics := NewStats(reg) + p := &prvdr{ ctx: ctx, logger: logger, @@ -102,7 +105,10 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e backoffSkipped: 0, } - go func(ctx context.Context, qq *quark.Queue, logger *logp.Logger, p *prvdr) { + go func(ctx context.Context, qq *quark.Queue, logger *logp.Logger, p *prvdr, stats *Stats) { + + lastUpdate := time.Now() + defer qq.Close() for ctx.Err() == nil { p.qqMtx.Lock() @@ -112,6 +118,19 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e logger.Errorw("get events from quark, no more process enrichment from this processor will be done", "error", err) break } + if time.Since(lastUpdate) > time.Second*5 { + p.qqMtx.Lock() + metrics := qq.Stats() + p.qqMtx.Unlock() + + stats.Aggregations.Set(metrics.Aggregations) + stats.Insertions.Set(metrics.Insertions) + stats.Lost.Set(metrics.Lost) + stats.NonAggregations.Set(metrics.NonAggregations) + stats.Removals.Set(metrics.Removals) + lastUpdate = time.Now() + } + if len(events) == 0 { err = qq.Block() if err != nil { @@ -120,7 +139,7 @@ func NewProvider(ctx context.Context, logger *logp.Logger) (provider.Provider, e } } } - }(ctx, qq, logger, p) + }(ctx, qq, logger, p, procMetrics) bootID, err = readBootID() if err != nil { @@ -150,11 +169,8 @@ const ( // does not exceed a reasonable threshold that would delay all other events processed by auditbeat. When in the backoff state, enrichment // will proceed without waiting for the process data to exist in the cache, likely resulting in missing enrichment data. func (p *prvdr) Sync(_ *beat.Event, pid uint32) error { - p.qqMtx.Lock() - defer p.qqMtx.Unlock() - // If pid is already in qq, return immediately - if _, found := p.qq.Lookup(int(pid)); found { + if _, found := p.lookupLocked(pid); found { return nil } @@ -169,7 +185,7 @@ func (p *prvdr) Sync(_ *beat.Event, pid uint32) error { nextWait := 5 * time.Millisecond for { waited := time.Since(start) - if _, found := p.qq.Lookup(int(pid)); found { + if _, found := p.lookupLocked(pid); found { p.logger.Debugw("got process that was missing ", "waited", waited) p.combinedWait = p.combinedWait + waited return nil diff --git a/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/metrics.go b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/metrics.go new file mode 100644 index 000000000000..3f7378306c26 --- /dev/null +++ b/x-pack/auditbeat/processors/sessionmd/provider/kerneltracingprovider/metrics.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build linux && (amd64 || arm64) && cgo + +package kerneltracingprovider + +import ( + "github.com/elastic/elastic-agent-libs/monitoring" +) + +// / Stats tracks the quark internal stats, which are integrated into the beats monitoring runtime +type Stats struct { + Insertions *monitoring.Uint + Removals *monitoring.Uint + Aggregations *monitoring.Uint + NonAggregations *monitoring.Uint + Lost *monitoring.Uint +} + +// / NewStats creates a new stats object +func NewStats(reg *monitoring.Registry) *Stats { + return &Stats{ + Insertions: monitoring.NewUint(reg, "insertions"), + Removals: monitoring.NewUint(reg, "removals"), + Aggregations: monitoring.NewUint(reg, "aggregations"), + NonAggregations: monitoring.NewUint(reg, "nonaggregations"), + Lost: monitoring.NewUint(reg, "lost"), + } +} From b14b1e026d94ab6b5bdc2dedc256d09fa53b5f0b Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 26 Feb 2025 06:54:11 -0300 Subject: [PATCH 7/9] fix(tests): ensure TestFilebeatOTelE2E does not fail if timestamps are equal (#42895) --- x-pack/filebeat/tests/integration/otel_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/x-pack/filebeat/tests/integration/otel_test.go b/x-pack/filebeat/tests/integration/otel_test.go index 7ae0bfff7d52..7a147add0dbe 100644 --- a/x-pack/filebeat/tests/integration/otel_test.go +++ b/x-pack/filebeat/tests/integration/otel_test.go @@ -165,15 +165,6 @@ func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) } - // If the ignored field exists and is equal in both maps then it shouldn't be ignored - if hasKeyM1 && hasKeyM2 { - valM1, _ := flatM1.GetValue(f) - valM2, _ := flatM2.GetValue(f) - if valM1 == valM2 { - assert.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f) - } - } - flatM1.Delete(f) flatM2.Delete(f) } From a4bc9f09ada841247be459710bdc814f9c69fc6d Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Wed, 26 Feb 2025 08:01:52 -0300 Subject: [PATCH 8/9] fix(tests): relax TestGroup_Go timeouts (#42862) --- .../filestream/internal/task/group_test.go | 27 +++++++++---------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/filebeat/input/filestream/internal/task/group_test.go b/filebeat/input/filestream/internal/task/group_test.go index 6ba0ac2cf1db..159bcfb5b13e 100644 --- a/filebeat/input/filestream/internal/task/group_test.go +++ b/filebeat/input/filestream/internal/task/group_test.go @@ -21,7 +21,7 @@ import ( "context" "errors" "fmt" - "math/rand" + "math/rand/v2" "strings" "sync" "sync/atomic" @@ -95,7 +95,7 @@ func TestGroup_Go(t *testing.T) { assert.Eventually(t, func() bool { return want == runningCount.Load() }, - time.Second, 100*time.Millisecond) + 1*time.Second, 10*time.Millisecond) }) t.Run("workloads wait for available worker", func(t *testing.T) { @@ -158,7 +158,7 @@ func TestGroup_Go(t *testing.T) { // Wait to ensure f1 and f2 are running, thus there is no workers free. assert.Eventually(t, func() bool { return int64(2) == runningCount.Load() }, - 100*time.Millisecond, time.Millisecond) + 1*time.Second, 10*time.Millisecond) err = g.Go(f3) require.NoError(t, err) @@ -170,7 +170,7 @@ func TestGroup_Go(t *testing.T) { func() bool { return f3Started.Load() }, - 100*time.Millisecond, time.Millisecond) + 1*time.Second, 10*time.Millisecond) // If f3 started, f2 must have finished assert.True(t, f2Finished.Load()) @@ -186,8 +186,8 @@ func TestGroup_Go(t *testing.T) { assert.Eventually(t, func() bool { return doneCount.Load() == 3 }, - 50*time.Millisecond, - time.Millisecond, + 1*time.Second, + 10*time.Millisecond, "not all goroutines finished") }) @@ -202,14 +202,13 @@ func TestGroup_Go(t *testing.T) { t.Run("without limit, all goroutines run", func(t *testing.T) { // 100 <= limit <= 10000 - limit := rand.Int63n(10000-100) + 100 + limit := rand.IntN(10000-100) + 100 t.Logf("running %d goroutines", limit) g := NewGroup(uint64(limit), time.Second, noopLogger{}, "") done := make(chan struct{}) var runningCounter atomic.Int64 - var i int64 - for i = 0; i < limit; i++ { + for i := 0; i < limit; i++ { err := g.Go(func(context.Context) error { runningCounter.Add(1) defer runningCounter.Add(-1) @@ -221,9 +220,9 @@ func TestGroup_Go(t *testing.T) { } assert.Eventually(t, - func() bool { return limit == runningCounter.Load() }, - 100*time.Millisecond, - time.Millisecond) + func() bool { return int64(limit) == runningCounter.Load() }, + 1*time.Second, + 10*time.Millisecond) close(done) err := g.Stop() @@ -253,7 +252,7 @@ func TestGroup_Go(t *testing.T) { assert.Eventually(t, func() bool { return count.Load() == want && logger.String() != "" - }, 100*time.Millisecond, time.Millisecond) + }, 1*time.Second, 10*time.Millisecond) err = g.Stop() require.NoError(t, err) @@ -286,7 +285,7 @@ func TestGroup_Go(t *testing.T) { assert.Eventually(t, func() bool { return count.Load() == want && logger.String() != "" - }, 100*time.Millisecond, time.Millisecond, "not all workloads finished") + }, 1*time.Second, 10*time.Millisecond, "not all workloads finished") assert.Contains(t, logger.String(), wantErr.Error()) From bd538ffb75907bca79066ec66a760dd8f638e3b2 Mon Sep 17 00:00:00 2001 From: Valentin Crettaz Date: Wed, 26 Feb 2025 13:33:13 +0100 Subject: [PATCH 9/9] Update Stack Monitoring data stream to 9 (#42823) * Update Stack Monitoring data stream to 9 * Fix linter issues * Append instead of set by index * Use goimports * Remove empty line Co-authored-by: Anderson Queiroz * Remove empty line Co-authored-by: Anderson Queiroz * Fix logged message Co-authored-by: Anderson Queiroz --------- Co-authored-by: Chris Earle Co-authored-by: Anderson Queiroz --- metricbeat/helper/elastic/elastic.go | 24 ++++++++++++++++------- metricbeat/helper/elastic/elastic_test.go | 10 +++++----- 2 files changed, 22 insertions(+), 12 deletions(-) diff --git a/metricbeat/helper/elastic/elastic.go b/metricbeat/helper/elastic/elastic.go index 25fa5835434f..a952497d9f83 100644 --- a/metricbeat/helper/elastic/elastic.go +++ b/metricbeat/helper/elastic/elastic.go @@ -18,8 +18,11 @@ package elastic import ( + "errors" "fmt" - "strings" + + "golang.org/x/text/cases" + "golang.org/x/text/language" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" @@ -84,7 +87,7 @@ func (p Product) String() string { // MakeXPackMonitoringIndexName method returns the name of the monitoring index for // a given product { elasticsearch, kibana, logstash, beats } func MakeXPackMonitoringIndexName(product Product) string { - const version = "8" + const version = "9" return fmt.Sprintf(".monitoring-%v-%v-mb", product.xPackMonitoringIndexString(), version) } @@ -100,7 +103,7 @@ func ReportErrorForMissingField(field string, product Product, r mb.ReporterV2) // MakeErrorForMissingField returns an error message for the given field being missing in an API // response received from a given product func MakeErrorForMissingField(field string, product Product) error { - return fmt.Errorf("Could not find field '%v' in %v API response", field, strings.Title(product.String())) + return fmt.Errorf("could not find field '%v' in %v API response", field, cases.Title(language.English).String(product.String())) } // IsFeatureAvailable returns whether a feature is available in the current product version @@ -120,7 +123,7 @@ func ReportAndLogError(err error, r mb.ReporterV2, l *logp.Logger) { // for it's date fields: https://github.com/elastic/elasticsearch/pull/36691 func FixTimestampField(m mapstr.M, field string) error { v, err := m.GetValue(field) - if err == mapstr.ErrKeyNotFound { + if errors.Is(err, mapstr.ErrKeyNotFound) { return nil } if err != nil { @@ -161,10 +164,17 @@ func NewModule(base *mb.BaseModule, xpackEnabledMetricsets []string, optionalXpa metricsets := xpackEnabledMetricsets if err == nil && cfgdMetricsets != nil { // Type cast the metricsets to a slice of strings - cfgdMetricsetsSlice := cfgdMetricsets.([]interface{}) - cfgdMetricsetsStrings := make([]string, len(cfgdMetricsetsSlice)) + cfgdMetricsetsSlice, ok := cfgdMetricsets.([]interface{}) + if !ok { + return nil, fmt.Errorf("configured metricsets are not an slice for module %s: %v", moduleName, cfgdMetricsets) + } + + cfgdMetricsetsStrings := make([]string, 0, len(cfgdMetricsetsSlice)) for i := range cfgdMetricsetsSlice { - cfgdMetricsetsStrings[i] = cfgdMetricsetsSlice[i].(string) + asString, ok := cfgdMetricsetsSlice[i].(string) + if ok { + cfgdMetricsetsStrings = append(cfgdMetricsetsStrings, asString) + } } // Add any optional metricsets which are not already configured diff --git a/metricbeat/helper/elastic/elastic_test.go b/metricbeat/helper/elastic/elastic_test.go index be5529928029..e6939cd42583 100644 --- a/metricbeat/helper/elastic/elastic_test.go +++ b/metricbeat/helper/elastic/elastic_test.go @@ -38,22 +38,22 @@ func TestMakeXPackMonitoringIndexName(t *testing.T) { { "Elasticsearch monitoring index", Elasticsearch, - ".monitoring-es-8-mb", + ".monitoring-es-9-mb", }, { "Kibana monitoring index", Kibana, - ".monitoring-kibana-8-mb", + ".monitoring-kibana-9-mb", }, { "Logstash monitoring index", Logstash, - ".monitoring-logstash-8-mb", + ".monitoring-logstash-9-mb", }, { "Beats monitoring index", Beats, - ".monitoring-beats-8-mb", + ".monitoring-beats-9-mb", }, } @@ -86,7 +86,7 @@ func TestReportErrorForMissingField(t *testing.T) { r := MockReporterV2{} err := ReportErrorForMissingField(field, Elasticsearch, r) - expectedError := fmt.Errorf("Could not find field '%v' in Elasticsearch API response", field) + expectedError := fmt.Errorf("could not find field '%v' in Elasticsearch API response", field) assert.Equal(t, expectedError, err) assert.Equal(t, expectedError, currentErr) }