From 95c25c95f7f27cf1a761937401507adadb3bc311 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Tue, 11 Feb 2025 13:51:02 -0300 Subject: [PATCH 1/6] otel: fix flaky behavior on TestFBOtelRestartE2E This test starts the collector with a timeout, but the error returned is not always a context cancelled, sometimes it returns err == nil, which is also fine, just not handled properly. While at it, fix some other issues I found while testing: - Using require inside a goroutine calls runtime.GoExit on failure, meaning the test exits immediatelly without doing any cleanup. Use assert in those cases. --- testing/integration/otel_test.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 937dbdd365..c6d4efa6f2 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1176,6 +1176,8 @@ exporters: flush_timeout: 1s mapping: mode: bodymap + logs_dynamic_id: + enabled: true service: pipelines: logs: @@ -1226,14 +1228,14 @@ service: } _, err = inputFile.Write([]byte(fmt.Sprintf(`{"id": "%d", "message": "%d"}`, i, i))) - require.NoErrorf(t, err, "failed to write line %d to temp file", i) + assert.NoErrorf(t, err, "failed to write line %d to temp file", i) _, err = inputFile.Write([]byte("\n")) - require.NoErrorf(t, err, "failed to write newline to temp file") + assert.NoErrorf(t, err, "failed to write newline to temp file") inputLinesCounter.Add(1) time.Sleep(100 * time.Millisecond) } err = inputFile.Close() - require.NoError(t, err, "failed to close input file") + assert.NoError(t, err, "failed to close input file") }() t.Cleanup(func() { @@ -1253,7 +1255,7 @@ service: go func() { err = fixture.RunOtelWithClient(fCtx) cancel() - require.True(t, errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err) + assert.True(t, err == nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err) close(stoppedCh) }() @@ -1314,9 +1316,8 @@ service: require.True(t, found, "expected message field in document %q", hit.Source) msg, ok := message.(string) require.True(t, ok, "expected message field to be a string, got %T", message) - if _, found := uniqueIngestedLogs[msg]; found { - t.Logf("log line %q was ingested more than once", message) - } + _, found = uniqueIngestedLogs[msg] + require.False(t, found, "found duplicated log message %q", msg) uniqueIngestedLogs[msg] = struct{}{} } actualHits.UniqueHits = len(uniqueIngestedLogs) From fc472e4b49b12bc4ce49658fb5592fbc17e58c75 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 20 Feb 2025 11:52:11 -0300 Subject: [PATCH 2/6] don't fail if ignored field is equal --- testing/integration/otel_test.go | 9 --------- 1 file changed, 9 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index f4ad589e34..31edf5cd90 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1352,15 +1352,6 @@ func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f) } - // If the ignored field exists and is equal in both maps then it shouldn't be ignored - if hasKeyM1 && hasKeyM2 { - valM1, _ := flatM1.GetValue(f) - valM2, _ := flatM2.GetValue(f) - if valM1 == valM2 { - assert.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f) - } - } - flatM1.Delete(f) flatM2.Delete(f) } From ac211c6277b299cd33081f6e5dd14d1bdd100b65 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Fri, 21 Feb 2025 08:56:17 -0300 Subject: [PATCH 3/6] use a different index name to avoid conflicts --- testing/integration/otel_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 31edf5cd90..c7081f2758 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1365,7 +1365,7 @@ func TestFBOtelRestartE2E(t *testing.T) { // It starts a filebeat receiver, waits for some logs and then stops it. // It then restarts the collector for the remaining of the test. // At the end it asserts that the unique number of logs in ES is equal to the number of - // lines in the input file. It is likely that there are duplicates due to the restart. + // lines in the input file. info := define.Require(t, define.Requirements{ Group: Default, Local: true, @@ -1395,7 +1395,8 @@ func TestFBOtelRestartE2E(t *testing.T) { esApiKey, err := createESApiKey(info.ESClient) require.NoError(t, err, "error creating API key") require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey) - index := "logs-integration-default" + // Use a unique index to avoid conflicts with other parallel runners + index := strings.ToLower("logs-generic-default-" + randStr(8)) otelConfigTemplate := `receivers: filebeatreceiver: filebeat: From 2080d6f52003f889c20d1acb3dd06159e61a7709 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 27 Feb 2025 08:25:13 -0300 Subject: [PATCH 4/6] Update testing/integration/otel_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Paolo ChilĂ  --- testing/integration/otel_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index c7081f2758..47f4630493 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1574,8 +1574,7 @@ service: require.True(t, found, "expected message field in document %q", hit.Source) msg, ok := message.(string) require.True(t, ok, "expected message field to be a string, got %T", message) - _, found = uniqueIngestedLogs[msg] - require.False(t, found, "found duplicated log message %q", msg) + require.NotContainsf(uniqueIngestedLogs, msg, "found duplicated log message %q", msg) uniqueIngestedLogs[msg] = struct{}{} } actualHits.UniqueHits = len(uniqueIngestedLogs) From c42edc5b9da42688bce0b66a1499839411ae5d9e Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 27 Feb 2025 08:31:01 -0300 Subject: [PATCH 5/6] Update testing/integration/otel_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Paolo ChilĂ  --- testing/integration/otel_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index 47f4630493..f76e456594 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1488,7 +1488,7 @@ service: _, err = inputFile.Write([]byte(fmt.Sprintf(`{"id": "%d", "message": "%d"}`, i, i))) assert.NoErrorf(t, err, "failed to write line %d to temp file", i) _, err = inputFile.Write([]byte("\n")) - assert.NoErrorf(t, err, "failed to write newline to temp file") + assert.NoError(t, err, "failed to write newline to temp file") inputLinesCounter.Add(1) time.Sleep(100 * time.Millisecond) } From 4a6beef3e49c029b9bbe76852765cd9207145bb3 Mon Sep 17 00:00:00 2001 From: Mauri de Souza Meneguzzo Date: Thu, 27 Feb 2025 08:46:50 -0300 Subject: [PATCH 6/6] use assert.Conditionf for error check --- testing/integration/otel_test.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/testing/integration/otel_test.go b/testing/integration/otel_test.go index f76e456594..90c9ba0e52 100644 --- a/testing/integration/otel_test.go +++ b/testing/integration/otel_test.go @@ -1513,7 +1513,9 @@ service: go func() { err = fixture.RunOtelWithClient(fCtx) cancel() - assert.True(t, err == nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err) + assert.Conditionf(t, func() bool { + return err == nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) + }, "unexpected error: %v", err) close(stoppedCh) }() @@ -1574,7 +1576,7 @@ service: require.True(t, found, "expected message field in document %q", hit.Source) msg, ok := message.(string) require.True(t, ok, "expected message field to be a string, got %T", message) - require.NotContainsf(uniqueIngestedLogs, msg, "found duplicated log message %q", msg) + require.NotContainsf(t, uniqueIngestedLogs, msg, "found duplicated log message %q", msg) uniqueIngestedLogs[msg] = struct{}{} } actualHits.UniqueHits = len(uniqueIngestedLogs)