Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x](backport #42848) enhancement(4534): removed encryption and encrytion related logic from segments #42891

Open
wants to merge 2 commits into
base: 8.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 30 additions & 58 deletions libbeat/publisher/queue/diskqueue/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
package diskqueue

import (
"math/rand"
"math/rand/v2"
"testing"
"time"

Expand All @@ -45,7 +45,7 @@ var (
// constant event time
eventTime = time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)

//sample event messages, so size of every frame isn't identical
// sample event messages, so size of every frame isn't identical
msgs = []string{
"192.168.33.1 - - [26/Dec/2016:16:22:00 +0000] \"GET / HTTP/1.1\" 200 484 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_0) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/54.0.2840.98 Safari/537.36\"",
"{\"eventVersion\":\"1.05\",\"userIdentity\":{\"type\":\"IAMUser\",\"principalId\":\"EXAMPLE_ID\",\"arn\":\"arn:aws:iam::0123456789012:user/Alice\",\"accountId\":\"0123456789012\",\"accessKeyId\":\"EXAMPLE_KEY\",\"userName\":\"Alice\",\"sessionContext\":{\"sessionIssuer\":{},\"webIdFederationData\":{},\"attributes\":{\"mfaAuthenticated\":\"true\",\"creationDate\":\"2020-01-08T15:12:16Z\"}},\"invokedBy\":\"signin.amazonaws.com\"},\"eventTime\":\"2020-01-08T20:58:45Z\",\"eventSource\":\"cloudtrail.amazonaws.com\",\"eventName\":\"UpdateTrail\",\"awsRegion\":\"us-west-2\",\"sourceIPAddress\":\"127.0.0.1\",\"userAgent\":\"signin.amazonaws.com\",\"requestParameters\":{\"name\":\"arn:aws:cloudtrail:us-west-2:0123456789012:trail/TEST-trail\",\"s3BucketName\":\"test-cloudtrail-bucket\",\"snsTopicName\":\"\",\"isMultiRegionTrail\":true,\"enableLogFileValidation\":false,\"kmsKeyId\":\"\"},\"responseElements\":{\"name\":\"TEST-trail\",\"s3BucketName\":\"test-cloudtrail-bucket\",\"snsTopicName\":\"\",\"snsTopicARN\":\"\",\"includeGlobalServiceEvents\":true,\"isMultiRegionTrail\":true,\"trailARN\":\"arn:aws:cloudtrail:us-west-2:0123456789012:trail/TEST-trail\",\"logFileValidationEnabled\":false,\"isOrganizationTrail\":false},\"requestID\":\"EXAMPLE-f3da-42d1-84f5-EXAMPLE\",\"eventID\":\"EXAMPLE-b5e9-4846-8407-EXAMPLE\",\"readOnly\":false,\"eventType\":\"AwsApiCall\",\"recipientAccountId\":\"0123456789012\"}",
Expand All @@ -63,7 +63,7 @@ func makePublisherEvent(r *rand.Rand) publisher.Event {
Content: beat.Event{
Timestamp: eventTime,
Fields: mapstr.M{
"message": msgs[r.Intn(len(msgs))],
"message": msgs[r.IntN(len(msgs))],
},
},
}
Expand All @@ -73,12 +73,10 @@ func makePublisherEvent(r *rand.Rand) publisher.Event {
// hold the queue. Location of the temporary directory is stored in
// the queue settings. Call `cleanup` when done with the queue to
// close the queue and remove the temp dir.
func setup(b *testing.B, encrypt bool, compress bool, protobuf bool) (*diskQueue, queue.Producer) {
func setup(b *testing.B, compress bool, protobuf bool) (*diskQueue, queue.Producer) {
s := DefaultSettings()
s.Path = b.TempDir()
if encrypt {
s.EncryptionKey = []byte("testtesttesttest")
}

s.UseCompression = compress
q, err := NewQueue(logp.L(), nil, s, nil)
if err != nil {
Expand Down Expand Up @@ -138,14 +136,14 @@ func produceThenConsume(r *rand.Rand, p queue.Producer, q *diskQueue, num_events

// benchmarkQueue is a wrapper for produceAndConsume, it tries to limit
// timers to just produceAndConsume
func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool, async bool, protobuf bool, b *testing.B) {
func benchmarkQueue(num_events int, batch_size int, compress bool, async bool, protobuf bool, b *testing.B) {
b.ResetTimer()
var err error

for n := 0; n < b.N; n++ {
b.StopTimer()
r := rand.New(rand.NewSource(1))
q, p := setup(b, encrypt, compress, protobuf)
r := rand.New(rand.NewPCG(1, 2))
q, p := setup(b, compress, protobuf)
b.StartTimer()
if async {
if err = produceAndConsume(r, p, q, num_events, batch_size); err != nil {
Expand All @@ -164,76 +162,50 @@ func benchmarkQueue(num_events int, batch_size int, encrypt bool, compress bool,

// Async benchmarks
func BenchmarkAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, true, false, b)
benchmarkQueue(1000, 10, false, true, false, b)
}

func BenchmarkAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, false, true, false, b)
}
func BenchmarkEncryptAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, false, true, false, b)
}
func BenchmarkEncryptAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, false, true, false, b)
benchmarkQueue(100000, 1000, false, true, false, b)
}

func BenchmarkCompressAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, true, true, false, b)
benchmarkQueue(1000, 10, true, true, false, b)
}

func BenchmarkCompressAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, true, true, false, b)
}
func BenchmarkEncryptCompressAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, true, false, b)
}
func BenchmarkEncryptCompressAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, true, true, false, b)
benchmarkQueue(100000, 1000, true, true, false, b)
}

func BenchmarkProtoAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, true, true, b)
benchmarkQueue(1000, 10, false, true, true, b)
}

func BenchmarkProtoAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, false, true, true, b)
}
func BenchmarkEncCompProtoAsync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, true, true, b)
}
func BenchmarkEncCompProtoAsync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, true, true, true, b)
benchmarkQueue(100000, 1000, false, true, true, b)
}

// Sync Benchmarks
func BenchmarkSync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, false, false, b)
benchmarkQueue(1000, 10, false, false, false, b)
}

func BenchmarkSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, false, false, false, b)
}
func BenchmarkEncryptSync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, false, false, false, b)
}
func BenchmarkEncryptSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, false, false, false, b)
benchmarkQueue(100000, 1000, false, false, false, b)
}

func BenchmarkCompressSync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, true, false, false, b)
benchmarkQueue(1000, 10, true, false, false, b)
}

func BenchmarkCompressSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, true, false, false, b)
}
func BenchmarkEncryptCompressSync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, false, false, b)
}
func BenchmarkEncryptCompressSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, true, false, false, b)
benchmarkQueue(100000, 1000, true, false, false, b)
}

func BenchmarkProtoSync1k(b *testing.B) {
benchmarkQueue(1000, 10, false, false, false, true, b)
benchmarkQueue(1000, 10, false, false, true, b)
}

func BenchmarkProtoSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, false, false, false, true, b)
}
func BenchmarkEncCompProtoSync1k(b *testing.B) {
benchmarkQueue(1000, 10, true, true, false, true, b)
}
func BenchmarkEncCompProtoSync100k(b *testing.B) {
benchmarkQueue(100000, 1000, true, true, false, true, b)
benchmarkQueue(100000, 1000, false, false, true, b)
}
21 changes: 17 additions & 4 deletions libbeat/publisher/queue/diskqueue/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ func TestCompressionReader(t *testing.T) {
0x00, 0x00, 0x80, 0x61,
0x62, 0x63, 0x0a, 0x00,
0x00, 0x00, 0x00, 0x6c,
0x3e, 0x7b, 0x08, 0x00},
0x3e, 0x7b, 0x08, 0x00,
},
},
"abc compressed with pierrec lz4": {
plaintext: []byte("abc"),
Expand All @@ -57,7 +58,8 @@ func TestCompressionReader(t *testing.T) {
0x00, 0x00, 0x80, 0x61,
0x62, 0x63, 0x00, 0x00,
0x00, 0x00, 0xff, 0x53,
0xd1, 0x32},
0xd1, 0x32,
},
},
}

Expand Down Expand Up @@ -85,7 +87,8 @@ func TestCompressionWriter(t *testing.T) {
0x00, 0x00, 0x80, 0x61,
0x62, 0x63, 0x00, 0x00,
0x00, 0x00, 0xff, 0x53,
0xd1, 0x32},
0xd1, 0x32,
},
},
}

Expand All @@ -100,6 +103,16 @@ func TestCompressionWriter(t *testing.T) {
}
}

func NopWriteCloseSyncer(w io.WriteCloser) WriteCloseSyncer {
return nopWriteCloseSyncer{w}
}

type nopWriteCloseSyncer struct {
io.WriteCloser
}

func (nopWriteCloseSyncer) Sync() error { return nil }

func TestCompressionRoundTrip(t *testing.T) {
tests := map[string]struct {
plaintext []byte
Expand Down Expand Up @@ -141,7 +154,7 @@ func TestCompressionSync(t *testing.T) {
src1 := bytes.NewReader(tc.plaintext)
_, err := io.Copy(cw, src1)
assert.Nil(t, err, name)
//prior to v4.1.15 of pierrec/lz4 there was a
// prior to v4.1.15 of pierrec/lz4 there was a
// bug that prevented writing after a Flush.
// The call to Sync here exercises Flush.
err = cw.Sync()
Expand Down
3 changes: 0 additions & 3 deletions libbeat/publisher/queue/diskqueue/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,6 @@ type Settings struct {
RetryInterval time.Duration
MaxRetryInterval time.Duration

// EncryptionKey is used to encrypt data if SchemaVersion 2 is used.
EncryptionKey []byte

// UseCompression enables or disables LZ4 compression
UseCompression bool
}
Expand Down
12 changes: 1 addition & 11 deletions libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,11 @@ a count of the number of frames in the segment, which is an unsigned
flags, which signify options. The size of options is 32-bits in
little-endian format.

If no fields are set in the options field, then un-encrypted frames
follow the header.

If the options field has the first bit set, then encryption is
enabled. In which case, the next 128-bits are the initialization
vector and the rest of the file is encrypted frames.
If no fields are set in the options field, then uncompressed frames follow the header.

If the options field has the second bit set, then compression is
enabled. In which case, LZ4 compressed frames follow the header.

If both the first and second bit of the options field are set, then
both compression and encryption are enabled. The next 128-bits are
the initialization vector and the rest of the file is LZ4 compressed
frames.

If the options field has the third bit set, then Google Protobuf is
used to serialize the data in the frame instead of CBOR.

Expand Down
2 changes: 0 additions & 2 deletions libbeat/publisher/queue/diskqueue/docs/schemaV2.pic
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,3 @@ boxht = 0.25
VERSION: box "version (uint32)" wid 4;
COUNT: box "count (uint32)" wid 4 with .nw at VERSION.sw;
OPTIONS: box "options (uint32)" wid 4 with .nw at COUNT.sw;
IV: box "initialization vector (128 bits)" wid 4 ht 1 with .nw at OPTIONS.sw
FRAME: box "Encrypted Frames" dashed wid 4 ht 2 with .nw at IV.sw;
18 changes: 7 additions & 11 deletions libbeat/publisher/queue/diskqueue/docs/schemaV2.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
76 changes: 0 additions & 76 deletions libbeat/publisher/queue/diskqueue/enc_compress_test.go

This file was deleted.

Loading
Loading