From d35660d2c767f569683c68018604da348d5669f9 Mon Sep 17 00:00:00 2001 From: Kaan Yalti Date: Tue, 25 Feb 2025 12:05:36 -0500 Subject: [PATCH] enhancement(4534): removed encryption and encrytion related logic from segments (#42848) * enhancement(4534): removed encryption and encrytion related logic from segments * enhancement(4534): removed encryption related tests and config * enhancement(4534): updated documentation * enhancement(4534): ran make update * enhancement(4534): updated changelog * enhancement(4534): replaces rand with rand/v2 * enhancement(4534): remove commented config var (cherry picked from commit 42c4aa0af9f1d6f28e98a675dadc824fe2c60c11) --- CHANGELOG.next.asciidoc | 1 + .../queue/diskqueue/benchmark_test.go | 88 ++++------ .../queue/diskqueue/compression_test.go | 21 ++- libbeat/publisher/queue/diskqueue/config.go | 3 - .../diskqueue/docs/on-disk-structures.md | 12 +- .../queue/diskqueue/docs/schemaV2.pic | 2 - .../queue/diskqueue/docs/schemaV2.svg | 18 +- .../queue/diskqueue/enc_compress_test.go | 76 -------- .../publisher/queue/diskqueue/encryption.go | 166 ------------------ .../queue/diskqueue/encryption_test.go | 75 -------- libbeat/publisher/queue/diskqueue/segments.go | 79 +-------- .../queue/diskqueue/segments_test.go | 72 +------- 12 files changed, 70 insertions(+), 543 deletions(-) delete mode 100644 libbeat/publisher/queue/diskqueue/enc_compress_test.go delete mode 100644 libbeat/publisher/queue/diskqueue/encryption.go delete mode 100644 libbeat/publisher/queue/diskqueue/encryption_test.go diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 379109d6732f..3ee2511f9c9f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -18,6 +18,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix metrics not being ingested, due to "Limit of total fields [10000] has been exceeded while adding new fields [...]". The total fields limit has been increased to 12500. No significant performance impact on Elasticsearch is anticipated. {pull}41640[41640] - Fix templates and docs to use correct `--` version of command line arguments. {issue}42038[42038] {pull}42060[42060] - removed support for a single `-` to precede multi-letter command line arguments. Use `--` instead. {issue}42117[42117] {pull}42209[42209] +- Removed encryption from diskqueue V2 for fips compliance {issue}4534[4534]{pull}42848[42848] *Auditbeat* diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 7665c4fd780b..c84868732a8f 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -30,7 +30,7 @@ package diskqueue import ( - "math/rand" + "math/rand/v2" "testing" "time" @@ -45,7 +45,7 @@ var ( // constant event time eventTime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC) - //sample event messages, so size of every frame isn't identical + // sample event messages, so size of every frame isn't identical msgs = []string{ "192.168.33.1 - - [26/Dec/2016:16:22:00 +0000] \"GET / HTTP/1.1\" 200 484 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36\"", "{\"eventVersion\":\"1.05\",\"userIdentity\":{\"type\":\"IAMUser\",\"principalId\":\"EXAMPLE_ID\",\"arn\":\"arn:aws:iam::0123456789012:user/Alice\",\"accountId\":\"0123456789012\",\"accessKeyId\":\"EXAMPLE_KEY\",\"userName\":\"Alice\",\"sessionContext\":{\"sessionIssuer\":{},\"webIdFederationData\":{},\"attributes\":{\"mfaAuthenticated\":\"true\",\"creationDate\":\"2020-01-08T15:12:16Z\"}},\"invokedBy\":\"signin.amazonaws.com\"},\"eventTime\":\"2020-01-08T20:58:45Z\",\"eventSource\":\"cloudtrail.amazonaws.com\",\"eventName\":\"UpdateTrail\",\"awsRegion\":\"us-west-2\",\"sourceIPAddress\":\"127.0.0.1\",\"userAgent\":\"signin.amazonaws.com\",\"requestParameters\":{\"name\":\"arn:aws:cloudtrail:us-west-2:0123456789012:trail/TEST-trail\",\"s3BucketName\":\"test-cloudtrail-bucket\",\"snsTopicName\":\"\",\"isMultiRegionTrail\":true,\"enableLogFileValidation\":false,\"kmsKeyId\":\"\"},\"responseElements\":{\"name\":\"TEST-trail\",\"s3BucketName\":\"test-cloudtrail-bucket\",\"snsTopicName\":\"\",\"snsTopicARN\":\"\",\"includeGlobalServiceEvents\":true,\"isMultiRegionTrail\":true,\"trailARN\":\"arn:aws:cloudtrail:us-west-2:0123456789012:trail/TEST-trail\",\"logFileValidationEnabled\":false,\"isOrganizationTrail\":false},\"requestID\":\"EXAMPLE-f3da-42d1-84f5-EXAMPLE\",\"eventID\":\"EXAMPLE-b5e9-4846-8407-EXAMPLE\",\"readOnly\":false,\"eventType\":\"AwsApiCall\",\"recipientAccountId\":\"0123456789012\"}", @@ -63,7 +63,7 @@ func makePublisherEvent(r *rand.Rand) publisher.Event { Content: beat.Event{ Timestamp: eventTime, Fields: mapstr.M{ - "message": msgs[r.Intn(len(msgs))], + "message": msgs[r.IntN(len(msgs))], }, }, } @@ -73,12 +73,10 @@ func makePublisherEvent(r *rand.Rand) publisher.Event { // hold the queue. Location of the temporary directory is stored in // the queue settings. Call `cleanup` when done with the queue to // close the queue and remove the temp dir. -func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue, queue.Producer) { +func setup(b *testing.B, compress bool, protobuf bool) (*diskQueue, queue.Producer) { s := DefaultSettings() s.Path = b.TempDir() - if encrypt { - s.EncryptionKey = []byte("testtesttesttest") - } + s.UseCompression = compress q, err := NewQueue(logp.L(), nil, s, nil) if err != nil { @@ -138,14 +136,14 @@ func produceThenConsume(r *rand.Rand, p queue.Producer, q *diskQueue, num_events // benchmarkQueue is a wrapper for produceAndConsume, it tries to limit // timers to just produceAndConsume -func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool, async bool, protobuf bool, b *testing.B) { +func benchmarkQueue(num_events int, batch_size int, compress bool, async bool, protobuf bool, b *testing.B) { b.ResetTimer() var err error for n := 0; n < b.N; n++ { b.StopTimer() - r := rand.New(rand.NewSource(1)) - q, p := setup(b, encrypt, compress, protobuf) + r := rand.New(rand.NewPCG(1, 2)) + q, p := setup(b, compress, protobuf) b.StartTimer() if async { if err = produceAndConsume(r, p, q, num_events, batch_size); err != nil { @@ -164,76 +162,50 @@ func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool, // Async benchmarks func BenchmarkAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, false, true, false, b) + benchmarkQueue(1000, 10, false, true, false, b) } + func BenchmarkAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, false, true, false, b) -} -func BenchmarkEncryptAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, false, true, false, b) -} -func BenchmarkEncryptAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, false, true, false, b) + benchmarkQueue(100000, 1000, false, true, false, b) } + func BenchmarkCompressAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, true, true, false, b) + benchmarkQueue(1000, 10, true, true, false, b) } + func BenchmarkCompressAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, true, true, false, b) -} -func BenchmarkEncryptCompressAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, true, true, false, b) -} -func BenchmarkEncryptCompressAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, true, true, false, b) + benchmarkQueue(100000, 1000, true, true, false, b) } + func BenchmarkProtoAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, false, true, true, b) + benchmarkQueue(1000, 10, false, true, true, b) } + func BenchmarkProtoAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, false, true, true, b) -} -func BenchmarkEncCompProtoAsync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, true, true, true, b) -} -func BenchmarkEncCompProtoAsync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, true, true, true, b) + benchmarkQueue(100000, 1000, false, true, true, b) } // Sync Benchmarks func BenchmarkSync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, false, false, false, b) + benchmarkQueue(1000, 10, false, false, false, b) } + func BenchmarkSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, false, false, false, b) -} -func BenchmarkEncryptSync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, false, false, false, b) -} -func BenchmarkEncryptSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, false, false, false, b) + benchmarkQueue(100000, 1000, false, false, false, b) } + func BenchmarkCompressSync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, true, false, false, b) + benchmarkQueue(1000, 10, true, false, false, b) } + func BenchmarkCompressSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, true, false, false, b) -} -func BenchmarkEncryptCompressSync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, true, false, false, b) -} -func BenchmarkEncryptCompressSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, true, false, false, b) + benchmarkQueue(100000, 1000, true, false, false, b) } + func BenchmarkProtoSync1k(b *testing.B) { - benchmarkQueue(1000, 10, false, false, false, true, b) + benchmarkQueue(1000, 10, false, false, true, b) } + func BenchmarkProtoSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, false, false, false, true, b) -} -func BenchmarkEncCompProtoSync1k(b *testing.B) { - benchmarkQueue(1000, 10, true, true, false, true, b) -} -func BenchmarkEncCompProtoSync100k(b *testing.B) { - benchmarkQueue(100000, 1000, true, true, false, true, b) + benchmarkQueue(100000, 1000, false, false, true, b) } diff --git a/libbeat/publisher/queue/diskqueue/compression_test.go b/libbeat/publisher/queue/diskqueue/compression_test.go index 2b684e0f28dd..e4e89cf7f3d6 100644 --- a/libbeat/publisher/queue/diskqueue/compression_test.go +++ b/libbeat/publisher/queue/diskqueue/compression_test.go @@ -47,7 +47,8 @@ func TestCompressionReader(t *testing.T) { 0x00, 0x00, 0x80, 0x61, 0x62, 0x63, 0x0a, 0x00, 0x00, 0x00, 0x00, 0x6c, - 0x3e, 0x7b, 0x08, 0x00}, + 0x3e, 0x7b, 0x08, 0x00, + }, }, "abc compressed with pierrec lz4": { plaintext: []byte("abc"), @@ -57,7 +58,8 @@ func TestCompressionReader(t *testing.T) { 0x00, 0x00, 0x80, 0x61, 0x62, 0x63, 0x00, 0x00, 0x00, 0x00, 0xff, 0x53, - 0xd1, 0x32}, + 0xd1, 0x32, + }, }, } @@ -85,7 +87,8 @@ func TestCompressionWriter(t *testing.T) { 0x00, 0x00, 0x80, 0x61, 0x62, 0x63, 0x00, 0x00, 0x00, 0x00, 0xff, 0x53, - 0xd1, 0x32}, + 0xd1, 0x32, + }, }, } @@ -100,6 +103,16 @@ func TestCompressionWriter(t *testing.T) { } } +func NopWriteCloseSyncer(w io.WriteCloser) WriteCloseSyncer { + return nopWriteCloseSyncer{w} +} + +type nopWriteCloseSyncer struct { + io.WriteCloser +} + +func (nopWriteCloseSyncer) Sync() error { return nil } + func TestCompressionRoundTrip(t *testing.T) { tests := map[string]struct { plaintext []byte @@ -141,7 +154,7 @@ func TestCompressionSync(t *testing.T) { src1 := bytes.NewReader(tc.plaintext) _, err := io.Copy(cw, src1) assert.Nil(t, err, name) - //prior to v4.1.15 of pierrec/lz4 there was a + // prior to v4.1.15 of pierrec/lz4 there was a // bug that prevented writing after a Flush. // The call to Sync here exercises Flush. err = cw.Sync() diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index 56e9d35e08d2..309292e025c4 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -64,9 +64,6 @@ type Settings struct { RetryInterval time.Duration MaxRetryInterval time.Duration - // EncryptionKey is used to encrypt data if SchemaVersion 2 is used. - EncryptionKey []byte - // UseCompression enables or disables LZ4 compression UseCompression bool } diff --git a/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md b/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md index 81d83e922b1a..e51dd1b89fb6 100644 --- a/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md +++ b/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md @@ -58,21 +58,11 @@ a count of the number of frames in the segment, which is an unsigned flags, which signify options. The size of options is 32-bits in little-endian format. -If no fields are set in the options field, then un-encrypted frames -follow the header. - -If the options field has the first bit set, then encryption is -enabled. In which case, the next 128-bits are the initialization -vector and the rest of the file is encrypted frames. +If no fields are set in the options field, then uncompressed frames follow the header. If the options field has the second bit set, then compression is enabled. In which case, LZ4 compressed frames follow the header. -If both the first and second bit of the options field are set, then -both compression and encryption are enabled. The next 128-bits are -the initialization vector and the rest of the file is LZ4 compressed -frames. - If the options field has the third bit set, then Google Protobuf is used to serialize the data in the frame instead of CBOR. diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic b/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic index 7cf0a4425c4c..44bbfd6ab504 100644 --- a/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic @@ -2,5 +2,3 @@ boxht = 0.25 VERSION: box "version (uint32)" wid 4; COUNT: box "count (uint32)" wid 4 with .nw at VERSION.sw; OPTIONS: box "options (uint32)" wid 4 with .nw at COUNT.sw; -IV: box "initialization vector (128 bits)" wid 4 ht 1 with .nw at OPTIONS.sw -FRAME: box "Encrypted Frames" dashed wid 4 ht 2 with .nw at IV.sw; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg b/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg index 76f5a51feccf..85928aba47e6 100644 --- a/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg @@ -1,13 +1,9 @@ - - -version (uint32) - -count (uint32) - -options (uint32) - -initialization vector (128 bits) - -Encrypted Frames + + +version (uint32) + +count (uint32) + +options (uint32) diff --git a/libbeat/publisher/queue/diskqueue/enc_compress_test.go b/libbeat/publisher/queue/diskqueue/enc_compress_test.go deleted file mode 100644 index 637765c24cb5..000000000000 --- a/libbeat/publisher/queue/diskqueue/enc_compress_test.go +++ /dev/null @@ -1,76 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package diskqueue - -import ( - "bytes" - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestEncryptionCompressionRoundTrip(t *testing.T) { - tests := map[string]struct { - plaintext []byte - }{ - "1 rune": {plaintext: []byte("a")}, - "16 runes": {plaintext: []byte("bbbbbbbbbbbbbbbb")}, - "17 runes": {plaintext: []byte("ccccccccccccccccc")}, - "small json": {plaintext: []byte("{\"message\":\"2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA\"}")}, - "large json": {plaintext: []byte("{\"message\":\"{\\\"CacheCacheStatus\\\":\\\"hit\\\",\\\"CacheResponseBytes\\\":26888,\\\"CacheResponseStatus\\\":200,\\\"CacheTieredFill\\\":true,\\\"ClientASN\\\":1136,\\\"ClientCountry\\\":\\\"nl\\\",\\\"ClientDeviceType\\\":\\\"desktop\\\",\\\"ClientIP\\\":\\\"89.160.20.156\\\",\\\"ClientIPClass\\\":\\\"noRecord\\\",\\\"ClientRequestBytes\\\":5324,\\\"ClientRequestHost\\\":\\\"eqlplayground.io\\\",\\\"ClientRequestMethod\\\":\\\"GET\\\",\\\"ClientRequestPath\\\":\\\"/40865/bundles/plugin/securitySolution/8.0.0/securitySolution.chunk.9.js\\\",\\\"ClientRequestProtocol\\\":\\\"HTTP/1.1\\\",\\\"ClientRequestReferer\\\":\\\"https://eqlplayground.io/s/eqldemo/app/security/timelines/default?sourcerer=(default:!(.siem-signals-eqldemo))&timerange=(global:(linkTo:!(),timerange:(from:%272021-03-03T19:55:15.519Z%27,fromStr:now-24h,kind:relative,to:%272021-03-04T19:55:15.519Z%27,toStr:now)),timeline:(linkTo:!(),timerange:(from:%272020-03-04T19:55:28.684Z%27,fromStr:now-1y,kind:relative,to:%272021-03-04T19:55:28.692Z%27,toStr:now)))&timeline=(activeTab:eql,graphEventId:%27%27,id:%2769f93840-7d23-11eb-866c-79a0609409ba%27,isOpen:!t)\\\",\\\"ClientRequestURI\\\":\\\"/40865/bundles/plugin/securitySolution/8.0.0/securitySolution.chunk.9.js\\\",\\\"ClientRequestUserAgent\\\":\\\"Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/91.0.4472.124Safari/537.36\\\",\\\"ClientSSLCipher\\\":\\\"NONE\\\",\\\"ClientSSLProtocol\\\":\\\"none\\\",\\\"ClientSrcPort\\\":0,\\\"ClientXRequestedWith\\\":\\\"\\\",\\\"EdgeColoCode\\\":\\\"33.147.138.217\\\",\\\"EdgeColoID\\\":20,\\\"EdgeEndTimestamp\\\":1625752958875000000,\\\"EdgePathingOp\\\":\\\"wl\\\",\\\"EdgePathingSrc\\\":\\\"macro\\\",\\\"EdgePathingStatus\\\":\\\"nr\\\",\\\"EdgeRateLimitAction\\\":\\\"\\\",\\\"EdgeRateLimitID\\\":0,\\\"EdgeRequestHost\\\":\\\"eqlplayground.io\\\",\\\"EdgeResponseBytes\\\":24743,\\\"EdgeResponseCompressionRatio\\\":0,\\\"EdgeResponseContentType\\\":\\\"application/javascript\\\",\\\"EdgeResponseStatus\\\":200,\\\"EdgeServerIP\\\":\\\"89.160.20.156\\\",\\\"EdgeStartTimestamp\\\":1625752958812000000,\\\"FirewallMatchesActions\\\":[],\\\"FirewallMatchesRuleIDs\\\":[],\\\"FirewallMatchesSources\\\":[],\\\"OriginIP\\\":\\\"\\\",\\\"OriginResponseBytes\\\":0,\\\"OriginResponseHTTPExpires\\\":\\\"\\\",\\\"OriginResponseHTTPLastModified\\\":\\\"\\\",\\\"OriginResponseStatus\\\":0,\\\"OriginResponseTime\\\":0,\\\"OriginSSLProtocol\\\":\\\"unknown\\\",\\\"ParentRayID\\\":\\\"66b9d9f88b5b4c4f\\\",\\\"RayID\\\":\\\"66b9d9f890ae4c4f\\\",\\\"SecurityLevel\\\":\\\"off\\\",\\\"WAFAction\\\":\\\"unknown\\\",\\\"WAFFlags\\\":\\\"0\\\",\\\"WAFMatchedVar\\\":\\\"\\\",\\\"WAFProfile\\\":\\\"unknown\\\",\\\"WAFRuleID\\\":\\\"\\\",\\\"WAFRuleMessage\\\":\\\"\\\",\\\"WorkerCPUTime\\\":0,\\\"WorkerStatus\\\":\\\"unknown\\\",\\\"WorkerSubrequest\\\":true,\\\"WorkerSubrequestCount\\\":0,\\\"ZoneID\\\":393347122}\"}")}, - } - - for name, tc := range tests { - pr, pw := io.Pipe() - key := []byte("keykeykeykeykeyk") - src := bytes.NewReader(tc.plaintext) - var dst bytes.Buffer - var tEncBuf bytes.Buffer - var tCompBuf bytes.Buffer - - go func() { - ew, err := NewEncryptionWriter(NopWriteCloseSyncer(pw), key) - assert.Nil(t, err, name) - cw := NewCompressionWriter(ew) - _, err = io.Copy(cw, src) - assert.Nil(t, err, name) - err = cw.Close() - assert.Nil(t, err, name) - }() - - ter := io.TeeReader(pr, &tEncBuf) - er, err := NewEncryptionReader(io.NopCloser(ter), key) - assert.Nil(t, err, name) - - tcr := io.TeeReader(er, &tCompBuf) - - cr := NewCompressionReader(io.NopCloser(tcr)) - - _, err = io.Copy(&dst, cr) - assert.Nil(t, err, name) - // Check round trip worked - assert.Equal(t, tc.plaintext, dst.Bytes(), name) - // Check that cipher text and plaintext don't match - assert.NotEqual(t, tc.plaintext, tEncBuf.Bytes(), name) - // Check that compressed text and plaintext don't match - assert.NotEqual(t, tc.plaintext, tCompBuf.Bytes(), name) - // Check that compressed text and ciphertext don't match - assert.NotEqual(t, tEncBuf.Bytes(), tCompBuf.Bytes(), name) - } -} diff --git a/libbeat/publisher/queue/diskqueue/encryption.go b/libbeat/publisher/queue/diskqueue/encryption.go deleted file mode 100644 index 497442017eb4..000000000000 --- a/libbeat/publisher/queue/diskqueue/encryption.go +++ /dev/null @@ -1,166 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package diskqueue - -import ( - "bytes" - "crypto/aes" - "crypto/cipher" - "crypto/rand" - "fmt" - "io" -) - -const ( - // KeySize is 128-bit - KeySize = 16 -) - -// EncryptionReader allows reading from a AES-128-CTR stream -type EncryptionReader struct { - src io.ReadCloser - stream cipher.Stream - block cipher.Block - iv []byte - ciphertext []byte -} - -// NewEncryptionReader returns a new AES-128-CTR decrypter -func NewEncryptionReader(r io.ReadCloser, key []byte) (*EncryptionReader, error) { - if len(key) != KeySize { - return nil, fmt.Errorf("key must be %d bytes long", KeySize) - } - - er := &EncryptionReader{} - er.src = r - - // turn key into block & save - block, err := aes.NewCipher(key) - if err != nil { - return nil, err - } - er.block = block - - // read IV from the io.ReadCloser - iv := make([]byte, aes.BlockSize) - if _, err := io.ReadFull(er.src, iv); err != nil { - return nil, err - } - er.iv = iv - - // create Stream - er.stream = cipher.NewCTR(block, iv) - - return er, nil -} - -func (er *EncryptionReader) Read(buf []byte) (int, error) { - if cap(er.ciphertext) >= len(buf) { - er.ciphertext = er.ciphertext[:len(buf)] - } else { - er.ciphertext = make([]byte, len(buf)) - } - n, err := er.src.Read(er.ciphertext) - if err != nil { - return n, err - } - er.stream.XORKeyStream(buf, er.ciphertext) - return n, nil -} - -func (er *EncryptionReader) Close() error { - return er.src.Close() -} - -// Reset Sets up stream again, assumes that caller has already set the -// src to the iv -func (er *EncryptionReader) Reset() error { - iv := make([]byte, aes.BlockSize) - if _, err := io.ReadFull(er.src, iv); err != nil { - return err - } - if !bytes.Equal(iv, er.iv) { - return fmt.Errorf("different iv, something is wrong") - } - - // create Stream - er.stream = cipher.NewCTR(er.block, iv) - return nil -} - -// EncryptionWriter allows writing to a AES-128-CTR stream -type EncryptionWriter struct { - dst WriteCloseSyncer - stream cipher.Stream - ciphertext []byte -} - -// NewEncryptionWriter returns a new AES-128-CTR stream encryptor -func NewEncryptionWriter(w WriteCloseSyncer, key []byte) (*EncryptionWriter, error) { - if len(key) != KeySize { - return nil, fmt.Errorf("key must be %d bytes long", KeySize) - } - - ew := &EncryptionWriter{} - - // turn key into block - block, err := aes.NewCipher(key) - if err != nil { - return nil, err - } - - // create random IV - iv := make([]byte, aes.BlockSize) - if _, err := io.ReadFull(rand.Reader, iv); err != nil { - return nil, err - } - - // create stream - stream := cipher.NewCTR(block, iv) - - //write IV - n, err := w.Write(iv) - if err != nil { - return nil, err - } - if n != len(iv) { - return nil, io.ErrShortWrite - } - - ew.dst = w - ew.stream = stream - return ew, nil -} - -func (ew *EncryptionWriter) Write(buf []byte) (int, error) { - if cap(ew.ciphertext) >= len(buf) { - ew.ciphertext = ew.ciphertext[:len(buf)] - } else { - ew.ciphertext = make([]byte, len(buf)) - } - ew.stream.XORKeyStream(ew.ciphertext, buf) - return ew.dst.Write(ew.ciphertext) -} - -func (ew *EncryptionWriter) Close() error { - return ew.dst.Close() -} - -func (ew *EncryptionWriter) Sync() error { - return ew.dst.Sync() -} diff --git a/libbeat/publisher/queue/diskqueue/encryption_test.go b/libbeat/publisher/queue/diskqueue/encryption_test.go deleted file mode 100644 index fb956d699b16..000000000000 --- a/libbeat/publisher/queue/diskqueue/encryption_test.go +++ /dev/null @@ -1,75 +0,0 @@ -// Licensed to Elasticsearch B.V. under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Elasticsearch B.V. licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package diskqueue - -import ( - "bytes" - "crypto/aes" - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func NopWriteCloseSyncer(w io.WriteCloser) WriteCloseSyncer { - return nopWriteCloseSyncer{w} -} - -type nopWriteCloseSyncer struct { - io.WriteCloser -} - -func (nopWriteCloseSyncer) Sync() error { return nil } - -func TestEncryptionRoundTrip(t *testing.T) { - tests := map[string]struct { - plaintext []byte - }{ - "8 bits": {plaintext: []byte("a")}, - "128 bits": {plaintext: []byte("bbbbbbbbbbbbbbbb")}, - "136 bits": {plaintext: []byte("ccccccccccccccccc")}, - } - for name, tc := range tests { - pr, pw := io.Pipe() - src := bytes.NewReader(tc.plaintext) - var dst bytes.Buffer - var teeBuf bytes.Buffer - key := []byte("kkkkkkkkkkkkkkkk") - - go func() { - //NewEncryptionWriter writes iv, so needs to be in go routine - ew, err := NewEncryptionWriter(NopWriteCloseSyncer(pw), key) - assert.Nil(t, err, name) - _, err = io.Copy(ew, src) - assert.Nil(t, err, name) - ew.Close() - }() - - tr := io.TeeReader(pr, &teeBuf) - er, err := NewEncryptionReader(io.NopCloser(tr), key) - assert.Nil(t, err, name) - _, err = io.Copy(&dst, er) - assert.Nil(t, err, name) - // Check round trip worked - assert.Equal(t, tc.plaintext, dst.Bytes(), name) - // Check that iv & cipher text were written - assert.Equal(t, len(tc.plaintext)+aes.BlockSize, teeBuf.Len(), name) - // Check that cipher text and plaintext don't match - assert.NotEqual(t, tc.plaintext, teeBuf.Bytes()[aes.BlockSize:], name) - } -} diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index 7e3661f6e5b4..11eeb9991c6a 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -33,7 +33,6 @@ import ( // diskQueueSegments encapsulates segment-related queue metadata. type diskQueueSegments struct { - // A list of the segments that have not yet been completely written, sorted // by increasing segment ID. When the first entry has been completely // written, it is removed from this list and appended to reading. @@ -132,7 +131,7 @@ type segmentHeader struct { // Only present in schema version >= 1. frameCount uint32 - // options holds flags to enable features, for example encryption. + // options holds flags to enable features, for example compression. options uint32 } @@ -151,7 +150,7 @@ const currentSegmentVersion = 2 const segmentHeaderSize = 12 const ( - ENABLE_ENCRYPTION uint32 = 1 << iota // 0x1 + _ uint32 = 1 << iota // 0x1 ENABLE_COMPRESSION // 0x2 ENABLE_PROTOBUF // 0x4 ) @@ -256,28 +255,14 @@ func (segment *queueSegment) getReader(queueSettings Settings) (*segmentReader, sr.serializationFormat = SerializationCBOR } - if (header.options & ENABLE_ENCRYPTION) == ENABLE_ENCRYPTION { - sr.er, err = NewEncryptionReader(sr.src, queueSettings.EncryptionKey) - if err != nil { - sr.src.Close() - return nil, fmt.Errorf("couldn't create encryption reader: %w", err) - } - } if (header.options & ENABLE_COMPRESSION) == ENABLE_COMPRESSION { - if sr.er != nil { - sr.cr = NewCompressionReader(sr.er) - } else { - sr.cr = NewCompressionReader(sr.src) - } + sr.cr = NewCompressionReader(sr.src) } return sr, nil } -// getWriter sets up the segmentWriter. The order of encryption and -// compression is important. If both options are enabled we want -// encrypted compressed data not compressed encrypted data. This is -// because encryption will mask the repetions in the data making -// compression much less effective. getWriter should only be called +// getWriter sets up the segmentWriter. +// getWriter should only be called. // from the writer loop. func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, error) { var options uint32 @@ -287,10 +272,6 @@ func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, return nil, err } - if len(queueSettings.EncryptionKey) > 0 { - options = options | ENABLE_ENCRYPTION - } - if queueSettings.UseCompression { options = options | ENABLE_COMPRESSION } @@ -302,20 +283,8 @@ func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, return nil, err } - if (options & ENABLE_ENCRYPTION) == ENABLE_ENCRYPTION { - sw.ew, err = NewEncryptionWriter(sw.dst, queueSettings.EncryptionKey) - if err != nil { - sw.dst.Close() - return nil, fmt.Errorf("couldn't create encryption writer: %w", err) - } - } - if (options & ENABLE_COMPRESSION) == ENABLE_COMPRESSION { - if sw.ew != nil { - sw.cw = NewCompressionWriter(sw.ew) - } else { - sw.cw = NewCompressionWriter(sw.dst) - } + sw.cw = NewCompressionWriter(sw.dst) } return sw, nil @@ -474,7 +443,6 @@ func (segments *diskQueueSegments) sizeOnDisk() uint64 { // less compressable. type segmentReader struct { src io.ReadSeekCloser - er *EncryptionReader cr *CompressionReader serializationFormat SerializationFormat } @@ -483,9 +451,6 @@ func (r *segmentReader) Read(p []byte) (int, error) { if r.cr != nil { return r.cr.Read(p) } - if r.er != nil { - return r.er.Read(p) - } return r.src.Read(p) } @@ -493,9 +458,6 @@ func (r *segmentReader) Close() error { if r.cr != nil { return r.cr.Close() } - if r.er != nil { - return r.er.Close() - } return r.src.Close() } @@ -508,31 +470,12 @@ func (r *segmentReader) Seek(offset int64, whence int) (int64, error) { if _, err := r.src.Seek(segmentHeaderSize, io.SeekStart); err != nil { return 0, fmt.Errorf("could not seek past segment header: %w", err) } - if r.er != nil { - if err := r.er.Reset(); err != nil { - return 0, fmt.Errorf("could not reset encryption: %w", err) - } - } if err := r.cr.Reset(); err != nil { return 0, fmt.Errorf("could not reset compression: %w", err) } written, err := io.CopyN(io.Discard, r.cr, (offset+int64(whence))-segmentHeaderSize) return written + segmentHeaderSize, err } - if r.er != nil { - //can't seek before segment header - if (offset + int64(whence)) < segmentHeaderSize { - return 0, fmt.Errorf("illegal seek offset %d, whence %d", offset, whence) - } - if _, err := r.src.Seek(segmentHeaderSize, io.SeekStart); err != nil { - return 0, fmt.Errorf("could not seek past segment header: %w", err) - } - if err := r.er.Reset(); err != nil { - return 0, fmt.Errorf("could not reset encryption: %w", err) - } - written, err := io.CopyN(io.Discard, r.er, (offset+int64(whence))-segmentHeaderSize) - return written + segmentHeaderSize, err - } return r.src.Seek(offset, whence) } @@ -545,7 +488,6 @@ func (r *segmentReader) Seek(offset int64, whence int) (int64, error) { // data less compressable. type segmentWriter struct { dst *os.File - ew *EncryptionWriter cw *CompressionWriter } @@ -553,9 +495,6 @@ func (w *segmentWriter) Write(p []byte) (int, error) { if w.cw != nil { return w.cw.Write(p) } - if w.ew != nil { - return w.ew.Write(p) - } return w.dst.Write(p) } @@ -563,9 +502,6 @@ func (w *segmentWriter) Close() error { if w.cw != nil { return w.cw.Close() } - if w.ew != nil { - return w.ew.Close() - } return w.dst.Close() } @@ -573,9 +509,6 @@ func (w *segmentWriter) Sync() error { if w.cw != nil { return w.cw.Sync() } - if w.ew != nil { - return w.ew.Sync() - } return w.dst.Sync() } diff --git a/libbeat/publisher/queue/diskqueue/segments_test.go b/libbeat/publisher/queue/diskqueue/segments_test.go index b80bba22e895..44721debeefd 100644 --- a/libbeat/publisher/queue/diskqueue/segments_test.go +++ b/libbeat/publisher/queue/diskqueue/segments_test.go @@ -27,43 +27,25 @@ import ( func TestSegmentsRoundTrip(t *testing.T) { tests := map[string]struct { id segmentID - encrypt bool compress bool plaintext []byte }{ - "No Encryption or Compression": { + "No Compression": { id: 0, - encrypt: false, compress: false, plaintext: []byte("no encryption or compression"), }, - "Encryption Only": { - id: 1, - encrypt: true, - compress: false, - plaintext: []byte("encryption only"), - }, - "Compression Only": { + "With Compression": { id: 2, - encrypt: false, compress: true, plaintext: []byte("compression only"), }, - "Encryption and Compression": { - id: 3, - encrypt: true, - compress: true, - plaintext: []byte("encryption and compression"), - }, } dir := t.TempDir() for name, tc := range tests { dst := make([]byte, len(tc.plaintext)) settings := DefaultSettings() settings.Path = dir - if tc.encrypt { - settings.EncryptionKey = []byte("keykeykeykeykeyk") - } settings.UseCompression = tc.compress qs := &queueSegment{ id: tc.id, @@ -86,7 +68,7 @@ func TestSegmentsRoundTrip(t *testing.T) { assert.Equal(t, len(dst), n, name) - //make sure we read back what we wrote + // make sure we read back what we wrote assert.Equal(t, tc.plaintext, dst, name) _, err = sr.Read(dst) @@ -101,31 +83,16 @@ func TestSegmentsRoundTrip(t *testing.T) { func TestSegmentReaderSeek(t *testing.T) { tests := map[string]struct { id segmentID - encrypt bool compress bool plaintexts [][]byte }{ - "No Encryption or compression": { + "No Compression": { id: 0, - encrypt: false, - compress: false, - plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, - }, - "Encryption Only": { - id: 1, - encrypt: true, compress: false, plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, }, - "Compression Only": { + "With Compression": { id: 2, - encrypt: false, - compress: true, - plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, - }, - "Encryption and Compression": { - id: 3, - encrypt: true, compress: true, plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, }, @@ -134,9 +101,6 @@ func TestSegmentReaderSeek(t *testing.T) { for name, tc := range tests { settings := DefaultSettings() settings.Path = dir - if tc.encrypt { - settings.EncryptionKey = []byte("keykeykeykeykeyk") - } settings.UseCompression = tc.compress qs := &queueSegment{ @@ -154,7 +118,7 @@ func TestSegmentReaderSeek(t *testing.T) { sw.Close() sr, err := qs.getReader(settings) assert.Nil(t, err, name) - //seek to second data piece + // seek to second data piece n, err := sr.Seek(segmentHeaderSize+int64(len(tc.plaintexts[0])), io.SeekStart) assert.Nil(t, err, name) assert.Equal(t, segmentHeaderSize+int64(len(tc.plaintexts[0])), n, name) @@ -171,35 +135,18 @@ func TestSegmentReaderSeek(t *testing.T) { func TestSegmentReaderSeekLocations(t *testing.T) { tests := map[string]struct { id segmentID - encrypt bool compress bool plaintexts [][]byte location int64 }{ - "No Encryption or Compression": { + "No Compression": { id: 0, - encrypt: false, compress: false, plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, location: -1, }, - "Encryption": { - id: 1, - encrypt: true, - compress: false, - plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, - location: 2, - }, "Compression": { id: 1, - encrypt: false, - compress: true, - plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, - location: 2, - }, - "Encryption and Compression": { - id: 1, - encrypt: true, compress: true, plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, location: 2, @@ -209,9 +156,6 @@ func TestSegmentReaderSeekLocations(t *testing.T) { for name, tc := range tests { settings := DefaultSettings() settings.Path = dir - if tc.encrypt { - settings.EncryptionKey = []byte("keykeykeykeykeyk") - } settings.UseCompression = tc.compress qs := &queueSegment{ id: tc.id, @@ -226,7 +170,7 @@ func TestSegmentReaderSeekLocations(t *testing.T) { sw.Close() sr, err := qs.getReader(settings) assert.Nil(t, err, name) - //seek to location + // seek to location _, err = sr.Seek(tc.location, io.SeekStart) assert.NotNil(t, err, name) }