From 99696b87673c84c37d7a5bf5e04e582b11afe9ee Mon Sep 17 00:00:00 2001 From: Kaan Yalti Date: Thu, 13 Feb 2025 10:49:17 -0500 Subject: [PATCH 1/2] enhancement(5423): added logic to replaces scheduler with long-wait scheduler in case of exceeded unauth response limit (#6619) * enhancement(5423): added logic to replaces scheduler with long-wait scheduler in case of exceeded unauth response limit * enhancement(5423): removed default case from type switch, added unit tests * enhancement(5423): added blackbox functional tests for gateway Run * enhancement(5423): added changelog * enhancement(5423): remove tryReplaceScheduler, update tests * enhancement(5423): added SetDuration function, added mock scheduler to tests, simplified scheduler usage --- ...ceived-too-many-unathorized-responses.yaml | 30 ++++++ .../gateway/fleet/fleet_gateway.go | 39 +++---- .../gateway/fleet/fleet_gateway_test.go | 102 ++++++++++++++++++ internal/pkg/scheduler/scheduler.go | 12 +++ 4 files changed, 165 insertions(+), 18 deletions(-) create mode 100644 changelog/fragments/1738199968-update-scheduler-when-received-too-many-unathorized-responses.yaml diff --git a/changelog/fragments/1738199968-update-scheduler-when-received-too-many-unathorized-responses.yaml b/changelog/fragments/1738199968-update-scheduler-when-received-too-many-unathorized-responses.yaml new file mode 100644 index 00000000000..182e4fa62da --- /dev/null +++ b/changelog/fragments/1738199968-update-scheduler-when-received-too-many-unathorized-responses.yaml @@ -0,0 +1,30 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: enhancement + +# Change summary; a 80ish characters long description of the change. +summary: Updated the fleet gateway so that when the number of unauthorized fleet responses exceeds the set limit, instead of unenrolling, the gateway starts checking in less frequently. + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: "elastic-agent" +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/6619 +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/5428 diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go index a369282fd68..a8980a3e08e 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway.go @@ -30,15 +30,18 @@ import ( const maxUnauthCounter int = 6 // Consts for states at fleet checkin -const fleetStateDegraded = "DEGRADED" -const fleetStateOnline = "online" -const fleetStateError = "error" -const fleetStateStarting = "starting" +const ( + fleetStateDegraded = "DEGRADED" + fleetStateOnline = "online" + fleetStateError = "error" + fleetStateStarting = "starting" +) // Default Configuration for the Fleet Gateway. var defaultGatewaySettings = &fleetGatewaySettings{ - Duration: 1 * time.Second, // time between successful calls - Jitter: 500 * time.Millisecond, // used as a jitter for duration + Duration: 1 * time.Second, // time between successful calls + Jitter: 500 * time.Millisecond, // used as a jitter for duration + ErrConsecutiveUnauthDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit Backoff: backoffSettings{ // time after a failed call Init: 60 * time.Second, Max: 10 * time.Minute, @@ -46,9 +49,10 @@ var defaultGatewaySettings = &fleetGatewaySettings{ } type fleetGatewaySettings struct { - Duration time.Duration `config:"checkin_frequency"` - Jitter time.Duration `config:"jitter"` - Backoff backoffSettings `config:"backoff"` + Duration time.Duration `config:"checkin_frequency"` + Jitter time.Duration `config:"jitter"` + Backoff backoffSettings `config:"backoff"` + ErrConsecutiveUnauthDuration time.Duration } type backoffSettings struct { @@ -90,7 +94,6 @@ func New( stateFetcher func() coordinator.State, stateStore stateStore, ) (*FleetGateway, error) { - scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter) return newFleetGatewayWithScheduler( log, @@ -356,17 +359,17 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, resp, took, err := cmd.Execute(ctx, req) if isUnauth(err) { f.unauthCounter++ - - if f.shouldUnenroll() { - f.log.Warnf("retrieved an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter) - return &fleetapi.CheckinResponse{ - Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}}, - }, took, nil + if f.shouldUseLongSched() { + f.log.Warnf("retrieved an invalid api key error '%d' times. will use long scheduler", f.unauthCounter) + f.scheduler.SetDuration(defaultGatewaySettings.ErrConsecutiveUnauthDuration) + return &fleetapi.CheckinResponse{}, took, nil } return nil, took, err } + f.scheduler.SetDuration(defaultGatewaySettings.Duration) + f.unauthCounter = 0 if err != nil { return nil, took, err @@ -384,8 +387,8 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse, return resp, took, nil } -// shouldUnenroll checks if the max number of trying an invalid key is reached -func (f *FleetGateway) shouldUnenroll() bool { +// shouldUseLongSched checks if the max number of trying an invalid key is reached +func (f *FleetGateway) shouldUseLongSched() bool { return f.unauthCounter > maxUnauthCounter } diff --git a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go index 4fcbc51a231..c5221134007 100644 --- a/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go +++ b/internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go @@ -27,6 +27,7 @@ import ( "github.com/elastic/elastic-agent/internal/pkg/agent/storage/store" "github.com/elastic/elastic-agent/internal/pkg/fleetapi" "github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop" + "github.com/elastic/elastic-agent/internal/pkg/fleetapi/client" "github.com/elastic/elastic-agent/internal/pkg/scheduler" agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client" "github.com/elastic/elastic-agent/pkg/core/logger" @@ -564,3 +565,104 @@ func TestAgentStateToString(t *testing.T) { }) } } + +type MockScheduler struct { + Duration time.Duration + Ticker *time.Ticker +} + +func (m *MockScheduler) WaitTick() <-chan time.Time { + return m.Ticker.C +} + +func (m *MockScheduler) SetDuration(d time.Duration) { + m.Duration = d +} + +func (m *MockScheduler) Stop() { + m.Ticker.Stop() +} + +func TestFleetGatewaySchedulerSwitch(t *testing.T) { + agentInfo := &testAgentInfo{} + settings := &fleetGatewaySettings{ + Duration: 1 * time.Second, + Backoff: backoffSettings{Init: 1 * time.Millisecond, Max: 2 * time.Millisecond}, + } + + tempSet := *defaultGatewaySettings + defaultGatewaySettings.Duration = 500 * time.Millisecond + defaultGatewaySettings.ErrConsecutiveUnauthDuration = 700 * time.Millisecond + defer func() { + *defaultGatewaySettings = tempSet + }() + + t.Run("if unauthorized responses exceed the set limit, the scheduler should be switched to the long-wait scheduler", withGateway(agentInfo, settings, func( + t *testing.T, + gateway coordinator.FleetGateway, + c *testingClient, + sch *scheduler.Stepper, + ) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + unauth := func(_ http.Header, _ io.Reader) (*http.Response, error) { + return nil, client.ErrInvalidAPIKey + } + + clientWaitFn := c.Answer(unauth) + g, ok := gateway.(*FleetGateway) + require.True(t, ok) + + ms := &MockScheduler{ + Duration: defaultGatewaySettings.Duration, + Ticker: time.NewTicker(defaultGatewaySettings.Duration), + } + g.scheduler = ms + errCh := runFleetGateway(ctx, gateway) + + for i := 0; i <= maxUnauthCounter; i++ { + <-clientWaitFn + } + + cancel() + err := <-errCh + require.NoError(t, err) + + require.Equal(t, ms.Duration, defaultGatewaySettings.ErrConsecutiveUnauthDuration) + })) + + t.Run("should switch back to short-wait scheduler if the a successful response is received", withGateway(agentInfo, settings, func( + t *testing.T, + gateway coordinator.FleetGateway, + c *testingClient, + sch *scheduler.Stepper, + ) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + unauth := func(_ http.Header, _ io.Reader) (*http.Response, error) { + resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`) + return resp, nil + } + + clientWaitFn := c.Answer(unauth) + g, ok := gateway.(*FleetGateway) + require.True(t, ok) + + ms := &MockScheduler{ + Duration: defaultGatewaySettings.ErrConsecutiveUnauthDuration, + Ticker: time.NewTicker(defaultGatewaySettings.ErrConsecutiveUnauthDuration), + } + g.scheduler = ms + errCh := runFleetGateway(ctx, gateway) + + <-clientWaitFn + + cancel() + err := <-errCh + require.NoError(t, err) + + require.Equal(t, ms.Duration, defaultGatewaySettings.Duration) + })) +} diff --git a/internal/pkg/scheduler/scheduler.go b/internal/pkg/scheduler/scheduler.go index f2b05476c02..65be4b1244d 100644 --- a/internal/pkg/scheduler/scheduler.go +++ b/internal/pkg/scheduler/scheduler.go @@ -14,6 +14,7 @@ import ( type Scheduler interface { WaitTick() <-chan time.Time Stop() + SetDuration(time.Duration) } // Stepper is a scheduler where each Tick is manually triggered, this is useful in scenario @@ -32,6 +33,9 @@ func (s *Stepper) WaitTick() <-chan time.Time { return s.C } +// Sets the wait duration for the scheduler. Noop for stepper scheduler +func (s *Stepper) SetDuration(_ time.Duration) {} + // Stop is stopping the scheduler, in the case of the Stepper scheduler nothing is done. func (s *Stepper) Stop() {} @@ -68,6 +72,10 @@ func (p *Periodic) WaitTick() <-chan time.Time { return rC } +func (p *Periodic) SetDuration(d time.Duration) { + p.Ticker = time.NewTicker(d) +} + // Stop stops the internal Ticker. // Note this will not close the internal channel is up to the developer to unblock the goroutine // using another mechanism. @@ -123,6 +131,10 @@ func (p *PeriodicJitter) WaitTick() <-chan time.Time { return p.C } +func (p *PeriodicJitter) SetDuration(d time.Duration) { + p.d = d +} + // Stop stops the PeriodicJitter scheduler. func (p *PeriodicJitter) Stop() { close(p.done) From 1a2e29e98aa14d28a8eeac874048055631a360ed Mon Sep 17 00:00:00 2001 From: Craig MacKenzie Date: Thu, 13 Feb 2025 17:35:51 -0500 Subject: [PATCH 2/2] Windows: Fix Windows job assignment to work on Go 1.23+ (#6825) * Windows: Use OpenProcess to assign job objects. * Dump local agent status when degraded in Fleet. * Add changelog. * Make sure no running agentbeat processes are left behind. * Fix double import * Silence windows lint errors * Fix handle leak and separate linter warning. * Ignore sub-processes when multiple agents should be allowed. * Remove PROCESS_QUERY_LIMITED_INFORMATION. The handle doesn't need it. * Fix old variable names. * Fix typo. --- ...signing-sub-processes-to-Job-objects..yaml | 32 +++++++++++++++++++ pkg/core/process/external_windows.go | 7 ++-- pkg/core/process/job_windows.go | 26 +++++++++++---- pkg/testing/fixture_install.go | 24 ++++++++++---- pkg/testing/tools/check/check.go | 9 +++++- pkg/testing/tools/tools.go | 2 +- testing/integration/logs_ingestion_test.go | 2 -- testing/integration/upgrade_fleet_test.go | 4 +-- 8 files changed, 84 insertions(+), 22 deletions(-) create mode 100644 changelog/fragments/1739375044-windows-change-how-process-handles-are-obtained-when-assigning-sub-processes-to-Job-objects..yaml diff --git a/changelog/fragments/1739375044-windows-change-how-process-handles-are-obtained-when-assigning-sub-processes-to-Job-objects..yaml b/changelog/fragments/1739375044-windows-change-how-process-handles-are-obtained-when-assigning-sub-processes-to-Job-objects..yaml new file mode 100644 index 00000000000..d5105e615cc --- /dev/null +++ b/changelog/fragments/1739375044-windows-change-how-process-handles-are-obtained-when-assigning-sub-processes-to-Job-objects..yaml @@ -0,0 +1,32 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: bug-fix + +# Change summary; a 80ish characters long description of the change. +summary: Change how Windows process handles are obtained when assigning sub-processes to Job objects. + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +#description: + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: "elastic-agent" + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/owner/repo/6825 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +#issue: https://github.com/owner/repo/1234 diff --git a/pkg/core/process/external_windows.go b/pkg/core/process/external_windows.go index 255012dd708..df054bcd421 100644 --- a/pkg/core/process/external_windows.go +++ b/pkg/core/process/external_windows.go @@ -10,6 +10,8 @@ import ( "os" "syscall" "time" + + "golang.org/x/sys/windows" ) const ( @@ -34,16 +36,17 @@ func externalProcess(proc *os.Process) { func isWindowsProcessExited(pid int) bool { const desiredAccess = syscall.STANDARD_RIGHTS_READ | syscall.PROCESS_QUERY_INFORMATION | syscall.SYNCHRONIZE - h, err := syscall.OpenProcess(desiredAccess, false, uint32(pid)) + h, err := windows.OpenProcess(desiredAccess, false, uint32(pid)) //nolint:gosec // G115 Conversion from int to uint32 is safe here. if err != nil { // failed to open handle, report exited return true } + defer windows.CloseHandle(h) //nolint:errcheck // No way to handle errors returned here so safe to ignore. // get exit code, this returns immediately in case it is still running // it returns exitCodeStillActive var ec uint32 - if err := syscall.GetExitCodeProcess(h, &ec); err != nil { + if err := windows.GetExitCodeProcess(h, &ec); err != nil { // failed to contact, report exited return true } diff --git a/pkg/core/process/job_windows.go b/pkg/core/process/job_windows.go index e0bcfdb1cba..fc55e611010 100644 --- a/pkg/core/process/job_windows.go +++ b/pkg/core/process/job_windows.go @@ -7,6 +7,7 @@ package process import ( + "fmt" "os" "unsafe" @@ -76,12 +77,23 @@ func (job Job) Assign(p *os.Process) error { if job == 0 || p == nil { return nil } - return windows.AssignProcessToJobObject( - windows.Handle(job), - windows.Handle((*process)(unsafe.Pointer(p)).Handle)) -} -type process struct { - Pid int - Handle uintptr + // To assign a process to a job, you need a handle to the process. Since os.Process provides no + // way to obtain it's underlying handle safely, get one with OpenProcess(). + // https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-openprocess + // This requires at least the PROCESS_SET_QUOTA and PROCESS_TERMINATE access rights. + // https://learn.microsoft.com/en-us/windows/win32/api/jobapi2/nf-jobapi2-assignprocesstojobobject + desiredAccess := uint32(windows.PROCESS_SET_QUOTA | windows.PROCESS_TERMINATE) + processHandle, err := windows.OpenProcess(desiredAccess, false, uint32(p.Pid)) //nolint:gosec // G115 Conversion from int to uint32 is safe here. + if err != nil { + return fmt.Errorf("opening process handle: %w", err) + } + defer windows.CloseHandle(processHandle) //nolint:errcheck // No way to handle errors returned here so safe to ignore. + + err = windows.AssignProcessToJobObject(windows.Handle(job), processHandle) + if err != nil { + return fmt.Errorf("assigning to job object: %w", err) + } + + return nil } diff --git a/pkg/testing/fixture_install.go b/pkg/testing/fixture_install.go index 7988d310815..1c39d592deb 100644 --- a/pkg/testing/fixture_install.go +++ b/pkg/testing/fixture_install.go @@ -200,7 +200,7 @@ func (f *Fixture) installFunc(ctx context.Context, installOpts *InstallOpts, sho // check for running agents before installing, but only if not installed into a namespace whose point is allowing two agents at once. if installOpts != nil && !installOpts.Develop && installOpts.Namespace == "" { - assert.Empty(f.t, getElasticAgentProcesses(f.t), "there should be no running agent at beginning of Install()") + assert.Empty(f.t, getElasticAgentAndAgentbeatProcesses(f.t), "there should be no running agent at beginning of Install()") } switch f.packageFormat { @@ -287,19 +287,22 @@ func (f *Fixture) installNoPkgManager(ctx context.Context, installOpts *InstallO f.t.Cleanup(func() { // check for running agents after uninstall had a chance to run - processes := getElasticAgentProcesses(f.t) - // there can be a single agent left when using --develop mode if f.installOpts != nil && f.installOpts.Namespace != "" { - assert.LessOrEqualf(f.t, len(processes), 1, "More than one agent left running at the end of the test when second agent in namespace %s was used: %v", f.installOpts.Namespace, processes) + // Only consider the main agent process and not sub-processes so that we can detect when + // multiple agents are running without needing to know the number of input sub-processes to expect. + agentProcesses := getElasticAgentProcesses(f.t) + assert.LessOrEqualf(f.t, len(agentProcesses), 1, "More than one agent left running at the end of the test when second agent in namespace %s was used: %v", f.installOpts.Namespace, agentProcesses) // The agent left running has to be the non-development agent. The development agent should be uninstalled first as a convention. - if len(processes) > 0 { - assert.NotContainsf(f.t, processes[0].Cmdline, paths.InstallDirNameForNamespace(f.installOpts.Namespace), - "The agent installed into namespace %s was left running at the end of the test or was not uninstalled first: %v", f.installOpts.Namespace, processes) + if len(agentProcesses) > 0 { + assert.NotContainsf(f.t, agentProcesses[0].Cmdline, paths.InstallDirNameForNamespace(f.installOpts.Namespace), + "The agent installed into namespace %s was left running at the end of the test or was not uninstalled first: %v", f.installOpts.Namespace, agentProcesses) } return } + // If not using an installation namespace, there should be no elastic-agent or agentbeat processes left running. + processes := getElasticAgentAndAgentbeatProcesses(f.t) assert.Empty(f.t, processes, "there should be no running agent at the end of the test") }) @@ -409,6 +412,13 @@ func getElasticAgentProcesses(t *gotesting.T) []runningProcess { return getProcesses(t, `.*elastic\-agent.*`) } +// Includes both the main elastic-agent process and the agentbeat sub-processes for ensuring +// that no sub-processes are orphaned from their parent process and left running. This +// primarily tests that Windows Job Object assignment works. +func getElasticAgentAndAgentbeatProcesses(t *gotesting.T) []runningProcess { + return getProcesses(t, `.*(elastic\-agent|agentbeat).*`) +} + func getProcesses(t *gotesting.T, regex string) []runningProcess { procStats := agentsystemprocess.Stats{ Procs: []string{regex}, diff --git a/pkg/testing/tools/check/check.go b/pkg/testing/tools/check/check.go index 00f2ee92957..90925b4d947 100644 --- a/pkg/testing/tools/check/check.go +++ b/pkg/testing/tools/check/check.go @@ -47,6 +47,7 @@ func ConnectedToFleet(ctx context.Context, t *testing.T, fixture *integrationtes // for use with assert.Eventually or require.Eventually. func FleetAgentStatus(ctx context.Context, t *testing.T, + fixture *integrationtest.Fixture, client *kibana.Client, policyID, expectedStatus string) func() bool { @@ -61,7 +62,13 @@ func FleetAgentStatus(ctx context.Context, return true } - t.Logf("Agent fleet status: %s", currentStatus) + agentStatus, err := fixture.ExecStatus(ctx) + if err != nil { + t.Logf("Agent fleet status: %s Error getting local status: %s", currentStatus, err) + return false + } + + t.Logf("Agent fleet status: %s Local status: %v", currentStatus, agentStatus) return false } } diff --git a/pkg/testing/tools/tools.go b/pkg/testing/tools/tools.go index b36e9a4fc81..df105f20d32 100644 --- a/pkg/testing/tools/tools.go +++ b/pkg/testing/tools/tools.go @@ -154,7 +154,7 @@ func InstallAgentForPolicyWithToken(ctx context.Context, t *testing.T, // Wait for Agent to be healthy require.Eventually( t, - check.FleetAgentStatus(ctx, t, kibClient, policyID, "online"), + check.FleetAgentStatus(ctx, t, agentFixture, kibClient, policyID, "online"), timeout, 10*time.Second, "Elastic Agent status is not online", diff --git a/testing/integration/logs_ingestion_test.go b/testing/integration/logs_ingestion_test.go index b6bdef4e31b..daefef20bc3 100644 --- a/testing/integration/logs_ingestion_test.go +++ b/testing/integration/logs_ingestion_test.go @@ -32,7 +32,6 @@ import ( atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" "github.com/elastic/elastic-agent/pkg/testing/tools" - "github.com/elastic/elastic-agent/pkg/testing/tools/check" "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" "github.com/elastic/elastic-agent/pkg/testing/tools/testcontext" "github.com/elastic/elastic-agent/testing/installtest" @@ -102,7 +101,6 @@ func TestLogIngestionFleetManaged(t *testing.T) { createPolicyReq) require.NoError(t, err) t.Logf("created policy: %s", policy.ID) - check.ConnectedToFleet(ctx, t, agentFixture, 5*time.Minute) // 3. Ensure installation is correct. require.NoError(t, installtest.CheckSuccess(ctx, agentFixture, installOpts.BasePath, &installtest.CheckOpts{Privileged: installOpts.Privileged})) diff --git a/testing/integration/upgrade_fleet_test.go b/testing/integration/upgrade_fleet_test.go index 156baeffa14..dc78e4a24c0 100644 --- a/testing/integration/upgrade_fleet_test.go +++ b/testing/integration/upgrade_fleet_test.go @@ -412,7 +412,7 @@ func testUpgradeFleetManagedElasticAgent( t.Log("Waiting for enrolled Agent status to be online...") require.Eventually(t, check.FleetAgentStatus( - ctx, t, kibClient, policyResp.ID, "online"), + ctx, t, startFixture, kibClient, policyResp.ID, "online"), 2*time.Minute, 10*time.Second, "Agent status is not online") @@ -446,7 +446,7 @@ func testUpgradeFleetManagedElasticAgent( require.NoError(t, err) t.Log("Waiting for enrolled Agent status to be online...") - require.Eventually(t, check.FleetAgentStatus(ctx, t, kibClient, policyResp.ID, "online"), 10*time.Minute, 15*time.Second, "Agent status is not online") + require.Eventually(t, check.FleetAgentStatus(ctx, t, startFixture, kibClient, policyResp.ID, "online"), 10*time.Minute, 15*time.Second, "Agent status is not online") // wait for version require.Eventually(t, func() bool {