Skip to content

Commit

Permalink
Make enroll command backoff more conservative (#6983)
Browse files Browse the repository at this point in the history
* Make enroll command backoff consistent with other requests to Fleet

* Use updated backoff only for delayed enroll

* Fix incomplete comment

(cherry picked from commit ef9fc06)

# Conflicts:
#	internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
#	internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go
  • Loading branch information
swiatekm committed Mar 4, 2025
1 parent 4e4953a commit b319dce
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 20 deletions.
34 changes: 34 additions & 0 deletions changelog/fragments/1740399399-fleet-backoff.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Make enroll command backoff more conservative

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: |
The plain enroll command now has an initial delay of 5s and a maximum of 10 minutes. It also has a jitter.
Delayed enroll now uses the same backoff behaviour as other requests to Fleet Server.
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
#pr: https://github.com/owner/repo/1234

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/6761
47 changes: 41 additions & 6 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,15 @@ const fleetStateOnline = "online"
const fleetStateError = "error"
const fleetStateStarting = "starting"

// Default backoff settings for connecting to Fleet
var defaultFleetBackoffSettings = backoffSettings{
Init: 60 * time.Second,
Max: 10 * time.Minute,
}

// Default Configuration for the Fleet Gateway.
var defaultGatewaySettings = &fleetGatewaySettings{
<<<<<<< HEAD

Check failure on line 46 in internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: unexpected <<, expected expression

Check failure on line 46 in internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

expected operand, found '<<' (typecheck)
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
Backoff: backoffSettings{ // time after a failed call
Expand All @@ -49,6 +56,19 @@ type fleetGatewaySettings struct {
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff backoffSettings `config:"backoff"`
=======

Check failure on line 59 in internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: unexpected ==, expected field name or embedded type
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
ErrConsecutiveUnauthDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit
Backoff: &defaultFleetBackoffSettings,
}

type fleetGatewaySettings struct {
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff *backoffSettings `config:"backoff"`
ErrConsecutiveUnauthDuration time.Duration
>>>>>>> ef9fc067c (Make enroll command backoff more conservative (#6983))

Check failure on line 71 in internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: unexpected >>, expected field name or embedded type

Check failure on line 71 in internal/pkg/agent/application/gateway/fleet/fleet_gateway.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

invalid character U+0023 '#'
}

type backoffSettings struct {
Expand Down Expand Up @@ -133,11 +153,18 @@ func (f *FleetGateway) Actions() <-chan []fleetapi.Action {
}

func (f *FleetGateway) Run(ctx context.Context) error {
backoff := backoff.NewEqualJitterBackoff(
ctx.Done(),
f.settings.Backoff.Init,
f.settings.Backoff.Max,
)
var requestBackoff backoff.Backoff
if f.settings.Backoff == nil {
requestBackoff = RequestBackoff(ctx.Done())
} else {
// this is only used in tests
requestBackoff = backoff.NewEqualJitterBackoff(
ctx.Done(),
f.settings.Backoff.Init,
f.settings.Backoff.Max,
)
}

f.log.Info("Fleet gateway started")
for {
select {
Expand All @@ -151,7 +178,7 @@ func (f *FleetGateway) Run(ctx context.Context) error {
// Execute the checkin call and for any errors returned by the fleet-server API
// the function will retry to communicate with fleet-server with an exponential delay and some
// jitter to help better distribute the load from a fleet of agents.
resp, err := f.doExecute(ctx, backoff)
resp, err := f.doExecute(ctx, requestBackoff)
if err != nil {
continue
}
Expand Down Expand Up @@ -425,3 +452,11 @@ func agentStateToString(state agentclient.State) string {
// Unknown states map to degraded.
return fleetStateDegraded
}

func RequestBackoff(done <-chan struct{}) backoff.Backoff {
return backoff.NewEqualJitterBackoff(
done,
defaultFleetBackoffSettings.Init,
defaultFleetBackoffSettings.Max,
)
}
112 changes: 108 additions & 4 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func TestFleetGateway(t *testing.T) {
agentInfo := &testAgentInfo{}
settings := &fleetGatewaySettings{
Duration: 5 * time.Second,
Backoff: backoffSettings{Init: 1 * time.Second, Max: 5 * time.Second},
Backoff: &backoffSettings{Init: 1 * time.Second, Max: 5 * time.Second},
}

t.Run("send no event and receive no action", withGateway(agentInfo, settings, func(
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestFleetGateway(t *testing.T) {
log,
&fleetGatewaySettings{
Duration: d,
Backoff: backoffSettings{Init: 1 * time.Second, Max: 30 * time.Second},
Backoff: &backoffSettings{Init: 1 * time.Second, Max: 30 * time.Second},
},
agentInfo,
client,
Expand Down Expand Up @@ -382,7 +382,7 @@ func TestRetriesOnFailures(t *testing.T) {
agentInfo := &testAgentInfo{}
settings := &fleetGatewaySettings{
Duration: 5 * time.Second,
Backoff: backoffSettings{Init: 100 * time.Millisecond, Max: 5 * time.Second},
Backoff: &backoffSettings{Init: 100 * time.Millisecond, Max: 5 * time.Second},
}

t.Run("When the gateway fails to communicate with the checkin API we will retry",
Expand Down Expand Up @@ -433,7 +433,7 @@ func TestRetriesOnFailures(t *testing.T) {
t.Run("The retry loop is interruptible",
withGateway(agentInfo, &fleetGatewaySettings{
Duration: 0 * time.Second,
Backoff: backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute},
Backoff: &backoffSettings{Init: 10 * time.Minute, Max: 20 * time.Minute},
}, func(
t *testing.T,
gateway coordinator.FleetGateway,
Expand Down Expand Up @@ -564,3 +564,107 @@ func TestAgentStateToString(t *testing.T) {
})
}
}
<<<<<<< HEAD

Check failure on line 567 in internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: non-declaration statement outside function body
=======

type MockScheduler struct {
Duration time.Duration
Ticker *time.Ticker
}

func (m *MockScheduler) WaitTick() <-chan time.Time {
return m.Ticker.C
}

func (m *MockScheduler) SetDuration(d time.Duration) {
m.Duration = d
}

func (m *MockScheduler) Stop() {
m.Ticker.Stop()
}

func TestFleetGatewaySchedulerSwitch(t *testing.T) {
agentInfo := &testAgentInfo{}
settings := &fleetGatewaySettings{
Duration: 1 * time.Second,
Backoff: &backoffSettings{Init: 1 * time.Millisecond, Max: 2 * time.Millisecond},
}

tempSet := *defaultGatewaySettings
defaultGatewaySettings.Duration = 500 * time.Millisecond
defaultGatewaySettings.ErrConsecutiveUnauthDuration = 700 * time.Millisecond
defer func() {
*defaultGatewaySettings = tempSet
}()

t.Run("if unauthorized responses exceed the set limit, the scheduler should be switched to the long-wait scheduler", withGateway(agentInfo, settings, func(
t *testing.T,
gateway coordinator.FleetGateway,
c *testingClient,
sch *scheduler.Stepper,
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

unauth := func(_ http.Header, _ io.Reader) (*http.Response, error) {
return nil, client.ErrInvalidAPIKey
}

clientWaitFn := c.Answer(unauth)
g, ok := gateway.(*FleetGateway)
require.True(t, ok)

ms := &MockScheduler{
Duration: defaultGatewaySettings.Duration,
Ticker: time.NewTicker(defaultGatewaySettings.Duration),
}
g.scheduler = ms
errCh := runFleetGateway(ctx, gateway)

for i := 0; i <= maxUnauthCounter; i++ {
<-clientWaitFn
}

cancel()
err := <-errCh
require.NoError(t, err)

require.Equal(t, ms.Duration, defaultGatewaySettings.ErrConsecutiveUnauthDuration)
}))

t.Run("should switch back to short-wait scheduler if the a successful response is received", withGateway(agentInfo, settings, func(
t *testing.T,
gateway coordinator.FleetGateway,
c *testingClient,
sch *scheduler.Stepper,
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

unauth := func(_ http.Header, _ io.Reader) (*http.Response, error) {
resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}

clientWaitFn := c.Answer(unauth)
g, ok := gateway.(*FleetGateway)
require.True(t, ok)

ms := &MockScheduler{
Duration: defaultGatewaySettings.ErrConsecutiveUnauthDuration,
Ticker: time.NewTicker(defaultGatewaySettings.ErrConsecutiveUnauthDuration),
}
g.scheduler = ms
errCh := runFleetGateway(ctx, gateway)

<-clientWaitFn

cancel()
err := <-errCh
require.NoError(t, err)

require.Equal(t, ms.Duration, defaultGatewaySettings.Duration)
}))
}
>>>>>>> ef9fc067c (Make enroll command backoff more conservative (#6983))

Check failure on line 670 in internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

syntax error: non-declaration statement outside function body

Check failure on line 670 in internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

invalid character U+0023 '#' (typecheck)
1 change: 1 addition & 0 deletions internal/pkg/agent/cmd/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,7 @@ func enroll(streams *cli.IOStreams, cmd *cobra.Command) error {
&options,
pathConfigFile,
store,
nil,
)
if err != nil {
return err
Expand Down
28 changes: 18 additions & 10 deletions internal/pkg/agent/cmd/enroll_cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ const (
defaultFleetServerPort = 8220
defaultFleetServerInternalHost = "localhost"
defaultFleetServerInternalPort = 8221
enrollBackoffInit = time.Second
enrollBackoffMax = 10 * time.Second
enrollBackoffInit = time.Second * 5
enrollBackoffMax = time.Minute * 10
)

var (
Expand All @@ -69,13 +69,14 @@ type saver interface {

// enrollCmd is an enroll subcommand that interacts between the Kibana API and the Agent.
type enrollCmd struct {
log *logger.Logger
options *enrollCmdOption
client fleetclient.Sender
configStore saver
remoteConfig remote.Config
agentProc *process.Info
configPath string
log *logger.Logger
options *enrollCmdOption
client fleetclient.Sender
configStore saver
remoteConfig remote.Config
agentProc *process.Info
configPath string
backoffFactory func(done <-chan struct{}) backoff.Backoff

// For testability
daemonReloadFunc func(context.Context) error
Expand Down Expand Up @@ -178,13 +179,20 @@ func newEnrollCmd(
options *enrollCmdOption,
configPath string,
store saver,
backoffFactory func(done <-chan struct{}) backoff.Backoff,
) (*enrollCmd, error) {
if backoffFactory == nil {
backoffFactory = func(done <-chan struct{}) backoff.Backoff {
return backoff.NewEqualJitterBackoff(done, enrollBackoffInit, enrollBackoffMax)
}
}
return &enrollCmd{
log: log,
options: options,
configStore: store,
configPath: configPath,
daemonReloadFunc: daemonReload,
backoffFactory: backoffFactory,
}, nil
}

Expand Down Expand Up @@ -531,7 +539,7 @@ func (c *enrollCmd) enrollWithBackoff(ctx context.Context, persistentConfig map[

signal := make(chan struct{})
defer close(signal)
backExp := backoff.NewExpBackoff(signal, enrollBackoffInit, enrollBackoffMax)
backExp := c.backoffFactory(signal)

for {
retry := false
Expand Down
7 changes: 7 additions & 0 deletions internal/pkg/agent/cmd/enroll_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ func TestEnroll(t *testing.T) {
},
"",
store,
nil,
)
require.NoError(t, err)

Expand Down Expand Up @@ -181,6 +182,7 @@ func TestEnroll(t *testing.T) {
&enrollOptions,
"",
store,
nil,
)
require.NoError(t, err, "could not create enroll command")

Expand Down Expand Up @@ -254,6 +256,7 @@ func TestEnroll(t *testing.T) {
},
"",
store,
nil,
)
require.NoError(t, err)

Expand Down Expand Up @@ -316,6 +319,7 @@ func TestEnroll(t *testing.T) {
},
"",
store,
nil,
)
require.NoError(t, err)

Expand Down Expand Up @@ -380,6 +384,7 @@ func TestEnroll(t *testing.T) {
},
"",
store,
nil,
)
require.NoError(t, err)

Expand Down Expand Up @@ -424,6 +429,7 @@ func TestEnroll(t *testing.T) {
},
"",
store,
nil,
)
require.NoError(t, err)

Expand Down Expand Up @@ -492,6 +498,7 @@ func TestEnroll(t *testing.T) {
},
"",
store,
nil,
)
require.NoError(t, err)

Expand Down
Loading

0 comments on commit b319dce

Please sign in to comment.