Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

otel: fix flakiness and various issues in TestFBOtelRestartE2E #6819

Draft
wants to merge 16 commits into
base: main
Choose a base branch
from
Draft
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 11 additions & 18 deletions testing/integration/otel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1352,15 +1352,6 @@ func assertMapsEqual(t *testing.T, m1, m2 mapstr.M, ignoredFields []string, msg
assert.Failf(t, msg, "ignored field %q does not exist in either map, please remove it from the ignored fields", f)
}

// If the ignored field exists and is equal in both maps then it shouldn't be ignored
if hasKeyM1 && hasKeyM2 {
valM1, _ := flatM1.GetValue(f)
valM2, _ := flatM2.GetValue(f)
if valM1 == valM2 {
assert.Failf(t, msg, "ignored field %q is equal in both maps, please remove it from the ignored fields", f)
}
}

flatM1.Delete(f)
flatM2.Delete(f)
}
Expand All @@ -1374,7 +1365,7 @@ func TestFBOtelRestartE2E(t *testing.T) {
// It starts a filebeat receiver, waits for some logs and then stops it.
// It then restarts the collector for the remaining of the test.
// At the end it asserts that the unique number of logs in ES is equal to the number of
// lines in the input file. It is likely that there are duplicates due to the restart.
// lines in the input file.
info := define.Require(t, define.Requirements{
Group: Default,
Local: true,
Expand Down Expand Up @@ -1404,7 +1395,8 @@ func TestFBOtelRestartE2E(t *testing.T) {
esApiKey, err := createESApiKey(info.ESClient)
require.NoError(t, err, "error creating API key")
require.True(t, len(esApiKey.Encoded) > 1, "api key is invalid %q", esApiKey)
index := "logs-integration-default"
// Use a unique index to avoid conflicts with other parallel runners
index := strings.ToLower("logs-generic-default-" + randStr(8))
otelConfigTemplate := `receivers:
filebeatreceiver:
filebeat:
Expand Down Expand Up @@ -1442,6 +1434,8 @@ exporters:
flush_timeout: 1s
mapping:
mode: bodymap
logs_dynamic_id:
enabled: true
service:
pipelines:
logs:
Expand Down Expand Up @@ -1492,14 +1486,14 @@ service:
}

_, err = inputFile.Write([]byte(fmt.Sprintf(`{"id": "%d", "message": "%d"}`, i, i)))
require.NoErrorf(t, err, "failed to write line %d to temp file", i)
assert.NoErrorf(t, err, "failed to write line %d to temp file", i)
_, err = inputFile.Write([]byte("\n"))
require.NoErrorf(t, err, "failed to write newline to temp file")
assert.NoErrorf(t, err, "failed to write newline to temp file")
inputLinesCounter.Add(1)
time.Sleep(100 * time.Millisecond)
}
err = inputFile.Close()
require.NoError(t, err, "failed to close input file")
assert.NoError(t, err, "failed to close input file")
}()

t.Cleanup(func() {
Expand All @@ -1519,7 +1513,7 @@ service:
go func() {
err = fixture.RunOtelWithClient(fCtx)
cancel()
require.True(t, errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err)
assert.True(t, err == nil || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled), "unexpected error: %v", err)
close(stoppedCh)
}()

Expand Down Expand Up @@ -1580,9 +1574,8 @@ service:
require.True(t, found, "expected message field in document %q", hit.Source)
msg, ok := message.(string)
require.True(t, ok, "expected message field to be a string, got %T", message)
if _, found := uniqueIngestedLogs[msg]; found {
t.Logf("log line %q was ingested more than once", message)
}
_, found = uniqueIngestedLogs[msg]
require.False(t, found, "found duplicated log message %q", msg)
uniqueIngestedLogs[msg] = struct{}{}
}
actualHits.UniqueHits = len(uniqueIngestedLogs)
Expand Down