Skip to content

Commit

Permalink
[8.15](backport #5375 #5409 #5361) Fix flaky tests (#5410)
Browse files Browse the repository at this point in the history
TestLogIngestionFleetManaged was failing because the namespace generated by the integration tests framework was not unique among different tests and test runs, so sometimes collisions would occurs causing some tests to be flaky.

TestDebLogIngestFleetManaged was failing because it also has got Beats logging connection errors before receiving the configuration from Elastic-Agent, now this message is also in the allow list.

When testing .deb the AGENT_KEEP_INSTALLED environment variable is respected.

When an integration test fails, the work directory created by the framework is now kept and its path is printed.

createTempDir register a test cleanup function to remove the folder it created, however, on Windows, this folder sometimes fails to be removed because there are still open file handlers for the files within the folder.

We fix this problem by retrying to remove the folder with a maximum overall wait time of 3 seconds. This is a very similar approach to what Go's t.TempDir does.

Fix the flakiness from TestUpgradeHandler* tests by re-working the
mockUpgradeManager, now it accepts a function for its Upgrade method
and their implementation is goroutine safe

TestEnvWithDefault
Now TestEnvWithDefault unsets all environment variables it sets,
allowing it to be run multiple times using -count.

TestContainerCMDEventToStderr
TestContainerCMDEventToStderr did not call agentFixture.Prepare early
enough leading to an empty STATE_PATH env var, so all state
information was in /usr/share/elastic-agent, which could make
subsequent tests to fail because they could read
/usr/share/elastic-agent/state/container-paths.yml and use a state
path different than the one set in the test.
  • Loading branch information
mergify[bot] authored Sep 6, 2024
1 parent 83800f0 commit dac3ffe
Show file tree
Hide file tree
Showing 11 changed files with 177 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package handlers

import (
"context"
"errors"
"sync/atomic"
"testing"
"time"

Expand All @@ -26,7 +28,15 @@ import (
)

type mockUpgradeManager struct {
msgChan chan string
UpgradeFn func(
ctx context.Context,
version string,
sourceURI string,
action *fleetapi.ActionUpgrade,
details *details.Details,
skipVerifyOverride bool,
skipDefaultPgp bool,
pgpBytes ...string) (reexec.ShutdownCallbackFn, error)
}

func (u *mockUpgradeManager) Upgradeable() bool {
Expand All @@ -37,15 +47,25 @@ func (u *mockUpgradeManager) Reload(rawConfig *config.Config) error {
return nil
}

func (u *mockUpgradeManager) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, details *details.Details, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) (_ reexec.ShutdownCallbackFn, err error) {
select {
case <-time.After(2 * time.Second):
u.msgChan <- "completed " + version
return nil, nil
case <-ctx.Done():
u.msgChan <- "canceled " + version
return nil, ctx.Err()
}
func (u *mockUpgradeManager) Upgrade(
ctx context.Context,
version string,
sourceURI string,
action *fleetapi.ActionUpgrade,
details *details.Details,
skipVerifyOverride bool,
skipDefaultPgp bool,
pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {

return u.UpgradeFn(
ctx,
version,
sourceURI,
action,
details,
skipVerifyOverride,
skipDefaultPgp,
pgpBytes...)
}

func (u *mockUpgradeManager) Ack(ctx context.Context, acker acker.Acker) error {
Expand All @@ -65,7 +85,7 @@ func TestUpgradeHandler(t *testing.T) {
log, _ := logger.New("", false)

agentInfo := &info.AgentInfo{}
msgChan := make(chan string)
upgradeCalledChan := make(chan struct{})

// Create and start the coordinator
c := coordinator.New(
Expand All @@ -75,7 +95,21 @@ func TestUpgradeHandler(t *testing.T) {
agentInfo,
component.RuntimeSpecs{},
nil,
&mockUpgradeManager{msgChan: msgChan},
&mockUpgradeManager{
UpgradeFn: func(
ctx context.Context,
version string,
sourceURI string,
action *fleetapi.ActionUpgrade,
details *details.Details,
skipVerifyOverride bool,
skipDefaultPgp bool,
pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {

upgradeCalledChan <- struct{}{}
return nil, nil
},
},
nil, nil, nil, nil, nil, false)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)
Expand All @@ -86,8 +120,13 @@ func TestUpgradeHandler(t *testing.T) {
ack := noopacker.New()
err := u.Handle(ctx, &a, ack)
require.NoError(t, err)
msg := <-msgChan
require.Equal(t, "completed 8.3.0", msg)

// Make sure this test does not dead lock or wait for too long
select {
case <-time.Tick(50 * time.Millisecond):
t.Fatal("mockUpgradeManager.Upgrade was not called")
case <-upgradeCalledChan:
}
}

func TestUpgradeHandlerSameVersion(t *testing.T) {
Expand All @@ -99,17 +138,37 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
log, _ := logger.New("", false)

agentInfo := &info.AgentInfo{}
msgChan := make(chan string)
upgradeCalledChan := make(chan struct{})

// Create and start the Coordinator
upgradeCalled := atomic.Bool{}
c := coordinator.New(
log,
configuration.DefaultConfiguration(),
logger.DefaultLogLevel,
agentInfo,
component.RuntimeSpecs{},
nil,
&mockUpgradeManager{msgChan: msgChan},
&mockUpgradeManager{
UpgradeFn: func(
ctx context.Context,
version string,
sourceURI string,
action *fleetapi.ActionUpgrade,
details *details.Details,
skipVerifyOverride bool,
skipDefaultPgp bool,
pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {

if upgradeCalled.CompareAndSwap(false, true) {
upgradeCalledChan <- struct{}{}
return nil, nil
}
err := errors.New("mockUpgradeManager.Upgrade called more than once")
t.Error(err.Error())
return nil, err
},
},
nil, nil, nil, nil, nil, false)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)
Expand All @@ -122,8 +181,13 @@ func TestUpgradeHandlerSameVersion(t *testing.T) {
err2 := u.Handle(ctx, &a, ack)
require.NoError(t, err1)
require.NoError(t, err2)
msg := <-msgChan
require.Equal(t, "completed 8.3.0", msg)

// Make sure this test does not dead lock or wait for too long
select {
case <-time.Tick(50 * time.Millisecond):
t.Fatal("mockUpgradeManager.Upgrade was not called")
case <-upgradeCalledChan:
}
}

func TestUpgradeHandlerNewVersion(t *testing.T) {
Expand All @@ -133,9 +197,9 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
defer cancel()

log, _ := logger.New("", false)
upgradeCalledChan := make(chan string)

agentInfo := &info.AgentInfo{}
msgChan := make(chan string)

// Create and start the Coordinator
c := coordinator.New(
Expand All @@ -145,7 +209,27 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
agentInfo,
component.RuntimeSpecs{},
nil,
&mockUpgradeManager{msgChan: msgChan},
&mockUpgradeManager{
UpgradeFn: func(
ctx context.Context,
version string,
sourceURI string,
action *fleetapi.ActionUpgrade,
details *details.Details,
skipVerifyOverride bool,
skipDefaultPgp bool,
pgpBytes ...string) (reexec.ShutdownCallbackFn, error) {

defer func() {
upgradeCalledChan <- version
}()
if version == "8.2.0" {
return nil, errors.New("upgrade to 8.2.0 will always fail")
}

return nil, nil
},
},
nil, nil, nil, nil, nil, false)
//nolint:errcheck // We don't need the termination state of the Coordinator
go c.Run(ctx)
Expand All @@ -156,13 +240,25 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
a2 := fleetapi.ActionUpgrade{Data: fleetapi.ActionUpgradeData{
Version: "8.5.0", SourceURI: "http://localhost"}}
ack := noopacker.New()

checkMsg := func(c <-chan string, expected, errMsg string) {
t.Helper()
// Make sure this test does not dead lock or wait for too long
// For some reason < 1s sometimes makes the test fail.
select {
case <-time.Tick(1300 * time.Millisecond):
t.Fatal("timed out waiting for Upgrade to return")
case msg := <-c:
require.Equal(t, expected, msg, errMsg)
}
}

// Send both upgrade actions, a1 will error before a2 succeeds
err1 := u.Handle(ctx, &a1, ack)
require.NoError(t, err1)
time.Sleep(1 * time.Second)
checkMsg(upgradeCalledChan, "8.2.0", "first call must be with version 8.2.0")

err2 := u.Handle(ctx, &a2, ack)
require.NoError(t, err2)
msg1 := <-msgChan
require.Equal(t, "canceled 8.2.0", msg1)
msg2 := <-msgChan
require.Equal(t, "completed 8.5.0", msg2)
checkMsg(upgradeCalledChan, "8.5.0", "second call to Upgrade must be with version 8.5.0")
}
4 changes: 2 additions & 2 deletions internal/pkg/agent/application/upgrade/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,14 +353,14 @@ func waitForWatcherWithTimeoutCreationFunc(ctx context.Context, log *logger.Logg
return fmt.Errorf("error starting update marker watcher: %w", err)
}

log.Info("waiting up to %s for upgrade watcher to set %s state in upgrade marker", waitTime, details.StateWatching)
log.Infof("waiting up to %s for upgrade watcher to set %s state in upgrade marker", waitTime, details.StateWatching)

for {
select {
case updMarker := <-markerWatcher.Watch():
if updMarker.Details != nil && updMarker.Details.State == details.StateWatching {
// watcher started and it is watching, all good
log.Info("upgrade watcher set %s state in upgrade marker: exiting wait loop", details.StateWatching)
log.Infof("upgrade watcher set %s state in upgrade marker: exiting wait loop", details.StateWatching)
return nil
}

Expand Down
2 changes: 2 additions & 0 deletions internal/pkg/agent/cmd/container_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@ func TestEnvWithDefault(t *testing.T) {
require.Equal(t, def, res)

err := os.Setenv(key1, "key1")
defer os.Unsetenv(key1)
if err != nil {
t.Skipf("could not export env var: %s", err)
}

err = os.Setenv(key2, "key2")
defer os.Unsetenv(key2)
if err != nil {
t.Skipf("could not export env var: %s", err)
}
Expand Down
19 changes: 7 additions & 12 deletions pkg/testing/define/define.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"sync"
"testing"

"github.com/gofrs/uuid/v5"

"github.com/elastic/elastic-agent-libs/kibana"
"github.com/elastic/go-elasticsearch/v8"
"github.com/elastic/go-sysinfo"
Expand Down Expand Up @@ -199,28 +201,21 @@ func getOSInfo() (*types.OSInfo, error) {
// getNamespace is a general namespace that the test can use that will ensure that it
// is unique and won't collide with other tests (even the same test from a different batch).
//
// this function uses a sha256 of the prefix, package and test name, to ensure that the
// This function uses a sha256 of an UUIDv4 to ensure that the
// length of the namespace is not over the 100 byte limit from Fleet
// see: https://www.elastic.co/guide/en/fleet/current/data-streams.html#data-streams-naming-scheme
func getNamespace(t *testing.T, local bool) (string, error) {
prefix := os.Getenv("TEST_DEFINE_PREFIX")
if prefix == "" {
if local {
prefix = "local"
}
if prefix == "" {
return "", errors.New("TEST_DEFINE_PREFIX must be defined by the test runner")
}
nsUUID, err := uuid.NewV4()
if err != nil {
return "", fmt.Errorf("cannot generate UUID V4: %w", err)
}
name := fmt.Sprintf("%s-%s", prefix, t.Name())
hasher := sha256.New()
hasher.Write([]byte(name))
hasher.Write([]byte(nsUUID.String()))

// Fleet API requires the namespace to be lowercased and not contain
// special characters.
namespace := strings.ToLower(base64.URLEncoding.EncodeToString(hasher.Sum(nil)))
namespace = noSpecialCharsRegexp.ReplaceAllString(namespace, "")

return namespace, nil
}

Expand Down
28 changes: 27 additions & 1 deletion pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

"github.com/elastic/elastic-agent/internal/pkg/agent/application/paths"
"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/agent/install"
"github.com/elastic/elastic-agent/pkg/component"
"github.com/elastic/elastic-agent/pkg/control"
"github.com/elastic/elastic-agent/pkg/control/v2/client"
Expand Down Expand Up @@ -206,7 +207,7 @@ func (f *Fixture) Prepare(ctx context.Context, components ...UsableComponent) er
if err != nil {
return err
}
workDir := f.t.TempDir()
workDir := createTempDir(f.t)
finalDir := filepath.Join(workDir, name)
err = ExtractArtifact(f.t, src, workDir)
if err != nil {
Expand Down Expand Up @@ -1196,6 +1197,31 @@ func performConfigure(ctx context.Context, c client.Client, cfg string, timeout
return nil
}

// createTempDir creates a temporary directory that will be
// removed after the tests passes. If the test fails, the
// directory is kept for further investigation.
//
// If the test is run with -v and fails the temporary directory is logged
func createTempDir(t *testing.T) string {
tempDir, err := os.MkdirTemp("", strings.ReplaceAll(t.Name(), "/", "-"))
if err != nil {
t.Fatalf("failed to make temp directory: %s", err)
}

cleanup := func() {
if !t.Failed() {
if err := install.RemovePath(tempDir); err != nil {
t.Errorf("could not remove temp dir '%s': %s", tempDir, err)
}
} else {
t.Logf("Temporary directory %q preserved for investigation/debugging", tempDir)
}
}
t.Cleanup(cleanup)

return tempDir
}

type AgentStatusOutput struct {
Info struct {
ID string `json:"id"`
Expand Down
7 changes: 7 additions & 0 deletions pkg/testing/fixture_install.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,7 @@ func (f *Fixture) installDeb(ctx context.Context, installOpts *InstallOpts, opts

f.t.Cleanup(func() {
f.t.Logf("[test %s] Inside fixture installDeb cleanup function", f.t.Name())

uninstallCtx, uninstallCancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer uninstallCancel()
// stop elastic-agent, non fatal if error, might have been stopped before this.
Expand All @@ -424,6 +425,12 @@ func (f *Fixture) installDeb(ctx context.Context, installOpts *InstallOpts, opts
if err != nil {
f.t.Logf("error systemctl stop elastic-agent: %s, output: %s", err, string(out))
}

if keepInstalledFlag() {
f.t.Logf("skipping uninstall; test failed and AGENT_KEEP_INSTALLED=true")
return
}

// apt-get purge elastic-agent
f.t.Logf("running 'sudo apt-get -y -q purge elastic-agent'")
out, err = exec.CommandContext(uninstallCtx, "sudo", "apt-get", "-y", "-q", "purge", "elastic-agent").CombinedOutput()
Expand Down
1 change: 1 addition & 0 deletions pkg/testing/tools/estools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ func PerformQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{
es.Search.WithContext(ctx),
es.Search.WithSize(300),
)

if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
}
Expand Down
Loading

0 comments on commit dac3ffe

Please sign in to comment.