Skip to content

Commit

Permalink
Merge branch 'main' into bump/go1.23
Browse files Browse the repository at this point in the history
  • Loading branch information
kruskall authored Feb 13, 2025
2 parents d9b5558 + 1a2e29e commit 55126a0
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 40 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: enhancement

# Change summary; a 80ish characters long description of the change.
summary: Updated the fleet gateway so that when the number of unauthorized fleet responses exceeds the set limit, instead of unenrolling, the gateway starts checking in less frequently.

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: "elastic-agent"
# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/6619
# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/5428
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Change how Windows process handles are obtained when assigning sub-processes to Job objects.

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: "elastic-agent"

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/owner/repo/6825

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
39 changes: 21 additions & 18 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,29 @@ import (
const maxUnauthCounter int = 6

// Consts for states at fleet checkin
const fleetStateDegraded = "DEGRADED"
const fleetStateOnline = "online"
const fleetStateError = "error"
const fleetStateStarting = "starting"
const (
fleetStateDegraded = "DEGRADED"
fleetStateOnline = "online"
fleetStateError = "error"
fleetStateStarting = "starting"
)

// Default Configuration for the Fleet Gateway.
var defaultGatewaySettings = &fleetGatewaySettings{
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
Duration: 1 * time.Second, // time between successful calls
Jitter: 500 * time.Millisecond, // used as a jitter for duration
ErrConsecutiveUnauthDuration: 1 * time.Hour, // time between calls when the agent exceeds unauthorized response limit
Backoff: backoffSettings{ // time after a failed call
Init: 60 * time.Second,
Max: 10 * time.Minute,
},
}

type fleetGatewaySettings struct {
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff backoffSettings `config:"backoff"`
Duration time.Duration `config:"checkin_frequency"`
Jitter time.Duration `config:"jitter"`
Backoff backoffSettings `config:"backoff"`
ErrConsecutiveUnauthDuration time.Duration
}

type backoffSettings struct {
Expand Down Expand Up @@ -90,7 +94,6 @@ func New(
stateFetcher func() coordinator.State,
stateStore stateStore,
) (*FleetGateway, error) {

scheduler := scheduler.NewPeriodicJitter(defaultGatewaySettings.Duration, defaultGatewaySettings.Jitter)
return newFleetGatewayWithScheduler(
log,
Expand Down Expand Up @@ -356,17 +359,17 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
resp, took, err := cmd.Execute(ctx, req)
if isUnauth(err) {
f.unauthCounter++

if f.shouldUnenroll() {
f.log.Warnf("retrieved an invalid api key error '%d' times. Starting to unenroll the elastic agent.", f.unauthCounter)
return &fleetapi.CheckinResponse{
Actions: []fleetapi.Action{&fleetapi.ActionUnenroll{ActionID: "", ActionType: "UNENROLL", IsDetected: true}},
}, took, nil
if f.shouldUseLongSched() {
f.log.Warnf("retrieved an invalid api key error '%d' times. will use long scheduler", f.unauthCounter)
f.scheduler.SetDuration(defaultGatewaySettings.ErrConsecutiveUnauthDuration)
return &fleetapi.CheckinResponse{}, took, nil
}

return nil, took, err
}

f.scheduler.SetDuration(defaultGatewaySettings.Duration)

f.unauthCounter = 0
if err != nil {
return nil, took, err
Expand All @@ -384,8 +387,8 @@ func (f *FleetGateway) execute(ctx context.Context) (*fleetapi.CheckinResponse,
return resp, took, nil
}

// shouldUnenroll checks if the max number of trying an invalid key is reached
func (f *FleetGateway) shouldUnenroll() bool {
// shouldUseLongSched checks if the max number of trying an invalid key is reached
func (f *FleetGateway) shouldUseLongSched() bool {
return f.unauthCounter > maxUnauthCounter
}

Expand Down
102 changes: 102 additions & 0 deletions internal/pkg/agent/application/gateway/fleet/fleet_gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/elastic/elastic-agent/internal/pkg/agent/storage/store"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker/noop"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/client"
"github.com/elastic/elastic-agent/internal/pkg/scheduler"
agentclient "github.com/elastic/elastic-agent/pkg/control/v2/client"
"github.com/elastic/elastic-agent/pkg/core/logger"
Expand Down Expand Up @@ -564,3 +565,104 @@ func TestAgentStateToString(t *testing.T) {
})
}
}

type MockScheduler struct {
Duration time.Duration
Ticker *time.Ticker
}

func (m *MockScheduler) WaitTick() <-chan time.Time {
return m.Ticker.C
}

func (m *MockScheduler) SetDuration(d time.Duration) {
m.Duration = d
}

func (m *MockScheduler) Stop() {
m.Ticker.Stop()
}

func TestFleetGatewaySchedulerSwitch(t *testing.T) {
agentInfo := &testAgentInfo{}
settings := &fleetGatewaySettings{
Duration: 1 * time.Second,
Backoff: backoffSettings{Init: 1 * time.Millisecond, Max: 2 * time.Millisecond},
}

tempSet := *defaultGatewaySettings
defaultGatewaySettings.Duration = 500 * time.Millisecond
defaultGatewaySettings.ErrConsecutiveUnauthDuration = 700 * time.Millisecond
defer func() {
*defaultGatewaySettings = tempSet
}()

t.Run("if unauthorized responses exceed the set limit, the scheduler should be switched to the long-wait scheduler", withGateway(agentInfo, settings, func(
t *testing.T,
gateway coordinator.FleetGateway,
c *testingClient,
sch *scheduler.Stepper,
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

unauth := func(_ http.Header, _ io.Reader) (*http.Response, error) {
return nil, client.ErrInvalidAPIKey
}

clientWaitFn := c.Answer(unauth)
g, ok := gateway.(*FleetGateway)
require.True(t, ok)

ms := &MockScheduler{
Duration: defaultGatewaySettings.Duration,
Ticker: time.NewTicker(defaultGatewaySettings.Duration),
}
g.scheduler = ms
errCh := runFleetGateway(ctx, gateway)

for i := 0; i <= maxUnauthCounter; i++ {
<-clientWaitFn
}

cancel()
err := <-errCh
require.NoError(t, err)

require.Equal(t, ms.Duration, defaultGatewaySettings.ErrConsecutiveUnauthDuration)
}))

t.Run("should switch back to short-wait scheduler if the a successful response is received", withGateway(agentInfo, settings, func(
t *testing.T,
gateway coordinator.FleetGateway,
c *testingClient,
sch *scheduler.Stepper,
) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

unauth := func(_ http.Header, _ io.Reader) (*http.Response, error) {
resp := wrapStrToResp(http.StatusOK, `{ "actions": [] }`)
return resp, nil
}

clientWaitFn := c.Answer(unauth)
g, ok := gateway.(*FleetGateway)
require.True(t, ok)

ms := &MockScheduler{
Duration: defaultGatewaySettings.ErrConsecutiveUnauthDuration,
Ticker: time.NewTicker(defaultGatewaySettings.ErrConsecutiveUnauthDuration),
}
g.scheduler = ms
errCh := runFleetGateway(ctx, gateway)

<-clientWaitFn

cancel()
err := <-errCh
require.NoError(t, err)

require.Equal(t, ms.Duration, defaultGatewaySettings.Duration)
}))
}
12 changes: 12 additions & 0 deletions internal/pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type Scheduler interface {
WaitTick() <-chan time.Time
Stop()
SetDuration(time.Duration)
}

// Stepper is a scheduler where each Tick is manually triggered, this is useful in scenario
Expand All @@ -32,6 +33,9 @@ func (s *Stepper) WaitTick() <-chan time.Time {
return s.C
}

// Sets the wait duration for the scheduler. Noop for stepper scheduler
func (s *Stepper) SetDuration(_ time.Duration) {}

// Stop is stopping the scheduler, in the case of the Stepper scheduler nothing is done.
func (s *Stepper) Stop() {}

Expand Down Expand Up @@ -68,6 +72,10 @@ func (p *Periodic) WaitTick() <-chan time.Time {
return rC
}

func (p *Periodic) SetDuration(d time.Duration) {
p.Ticker = time.NewTicker(d)
}

// Stop stops the internal Ticker.
// Note this will not close the internal channel is up to the developer to unblock the goroutine
// using another mechanism.
Expand Down Expand Up @@ -123,6 +131,10 @@ func (p *PeriodicJitter) WaitTick() <-chan time.Time {
return p.C
}

func (p *PeriodicJitter) SetDuration(d time.Duration) {
p.d = d
}

// Stop stops the PeriodicJitter scheduler.
func (p *PeriodicJitter) Stop() {
close(p.done)
Expand Down
7 changes: 5 additions & 2 deletions pkg/core/process/external_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"os"
"syscall"
"time"

"golang.org/x/sys/windows"
)

const (
Expand All @@ -34,16 +36,17 @@ func externalProcess(proc *os.Process) {

func isWindowsProcessExited(pid int) bool {
const desiredAccess = syscall.STANDARD_RIGHTS_READ | syscall.PROCESS_QUERY_INFORMATION | syscall.SYNCHRONIZE
h, err := syscall.OpenProcess(desiredAccess, false, uint32(pid))
h, err := windows.OpenProcess(desiredAccess, false, uint32(pid)) //nolint:gosec // G115 Conversion from int to uint32 is safe here.
if err != nil {
// failed to open handle, report exited
return true
}
defer windows.CloseHandle(h) //nolint:errcheck // No way to handle errors returned here so safe to ignore.

// get exit code, this returns immediately in case it is still running
// it returns exitCodeStillActive
var ec uint32
if err := syscall.GetExitCodeProcess(h, &ec); err != nil {
if err := windows.GetExitCodeProcess(h, &ec); err != nil {
// failed to contact, report exited
return true
}
Expand Down
26 changes: 19 additions & 7 deletions pkg/core/process/job_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package process

import (
"fmt"
"os"
"unsafe"

Expand Down Expand Up @@ -76,12 +77,23 @@ func (job Job) Assign(p *os.Process) error {
if job == 0 || p == nil {
return nil
}
return windows.AssignProcessToJobObject(
windows.Handle(job),
windows.Handle((*process)(unsafe.Pointer(p)).Handle))
}

type process struct {
Pid int
Handle uintptr
// To assign a process to a job, you need a handle to the process. Since os.Process provides no
// way to obtain it's underlying handle safely, get one with OpenProcess().
// https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-openprocess
// This requires at least the PROCESS_SET_QUOTA and PROCESS_TERMINATE access rights.
// https://learn.microsoft.com/en-us/windows/win32/api/jobapi2/nf-jobapi2-assignprocesstojobobject
desiredAccess := uint32(windows.PROCESS_SET_QUOTA | windows.PROCESS_TERMINATE)
processHandle, err := windows.OpenProcess(desiredAccess, false, uint32(p.Pid)) //nolint:gosec // G115 Conversion from int to uint32 is safe here.
if err != nil {
return fmt.Errorf("opening process handle: %w", err)
}
defer windows.CloseHandle(processHandle) //nolint:errcheck // No way to handle errors returned here so safe to ignore.

err = windows.AssignProcessToJobObject(windows.Handle(job), processHandle)
if err != nil {
return fmt.Errorf("assigning to job object: %w", err)
}

return nil
}
Loading

0 comments on commit 55126a0

Please sign in to comment.