Skip to content

Commit

Permalink
Windows: Fix Windows job assignment to work on Go 1.23+ (#6825)
Browse files Browse the repository at this point in the history
* Windows: Use OpenProcess to assign job objects.

* Dump local agent status when degraded in Fleet.

* Add changelog.

* Make sure no running agentbeat processes are left behind.

* Fix double import

* Silence windows lint errors

* Fix handle leak and separate linter warning.

* Ignore sub-processes when multiple agents should be allowed.

* Remove PROCESS_QUERY_LIMITED_INFORMATION. The handle doesn't need it.

* Fix old variable names.

* Fix typo.
  • Loading branch information
cmacknz authored Feb 13, 2025
1 parent 99696b8 commit 1a2e29e
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: bug-fix

# Change summary; a 80ish characters long description of the change.
summary: Change how Windows process handles are obtained when assigning sub-processes to Job objects.

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
#description:

# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: "elastic-agent"

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/owner/repo/6825

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
#issue: https://github.com/owner/repo/1234
7 changes: 5 additions & 2 deletions pkg/core/process/external_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"os"
"syscall"
"time"

"golang.org/x/sys/windows"
)

const (
Expand All @@ -34,16 +36,17 @@ func externalProcess(proc *os.Process) {

func isWindowsProcessExited(pid int) bool {
const desiredAccess = syscall.STANDARD_RIGHTS_READ | syscall.PROCESS_QUERY_INFORMATION | syscall.SYNCHRONIZE
h, err := syscall.OpenProcess(desiredAccess, false, uint32(pid))
h, err := windows.OpenProcess(desiredAccess, false, uint32(pid)) //nolint:gosec // G115 Conversion from int to uint32 is safe here.
if err != nil {
// failed to open handle, report exited
return true
}
defer windows.CloseHandle(h) //nolint:errcheck // No way to handle errors returned here so safe to ignore.

// get exit code, this returns immediately in case it is still running
// it returns exitCodeStillActive
var ec uint32
if err := syscall.GetExitCodeProcess(h, &ec); err != nil {
if err := windows.GetExitCodeProcess(h, &ec); err != nil {
// failed to contact, report exited
return true
}
Expand Down
26 changes: 19 additions & 7 deletions pkg/core/process/job_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package process

import (
"fmt"
"os"
"unsafe"

Expand Down Expand Up @@ -76,12 +77,23 @@ func (job Job) Assign(p *os.Process) error {
if job == 0 || p == nil {
return nil
}
return windows.AssignProcessToJobObject(
windows.Handle(job),
windows.Handle((*process)(unsafe.Pointer(p)).Handle))
}

type process struct {
Pid int
Handle uintptr
// To assign a process to a job, you need a handle to the process. Since os.Process provides no
// way to obtain it's underlying handle safely, get one with OpenProcess().
// https://learn.microsoft.com/en-us/windows/win32/api/processthreadsapi/nf-processthreadsapi-openprocess
// This requires at least the PROCESS_SET_QUOTA and PROCESS_TERMINATE access rights.
// https://learn.microsoft.com/en-us/windows/win32/api/jobapi2/nf-jobapi2-assignprocesstojobobject
desiredAccess := uint32(windows.PROCESS_SET_QUOTA | windows.PROCESS_TERMINATE)
processHandle, err := windows.OpenProcess(desiredAccess, false, uint32(p.Pid)) //nolint:gosec // G115 Conversion from int to uint32 is safe here.
if err != nil {
return fmt.Errorf("opening process handle: %w", err)
}
defer windows.CloseHandle(processHandle) //nolint:errcheck // No way to handle errors returned here so safe to ignore.

err = windows.AssignProcessToJobObject(windows.Handle(job), processHandle)
if err != nil {
return fmt.Errorf("assigning to job object: %w", err)
}

return nil
}
24 changes: 17 additions & 7 deletions pkg/testing/fixture_install.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (f *Fixture) installFunc(ctx context.Context, installOpts *InstallOpts, sho

// check for running agents before installing, but only if not installed into a namespace whose point is allowing two agents at once.
if installOpts != nil && !installOpts.Develop && installOpts.Namespace == "" {
assert.Empty(f.t, getElasticAgentProcesses(f.t), "there should be no running agent at beginning of Install()")
assert.Empty(f.t, getElasticAgentAndAgentbeatProcesses(f.t), "there should be no running agent at beginning of Install()")
}

switch f.packageFormat {
Expand Down Expand Up @@ -287,19 +287,22 @@ func (f *Fixture) installNoPkgManager(ctx context.Context, installOpts *InstallO

f.t.Cleanup(func() {
// check for running agents after uninstall had a chance to run
processes := getElasticAgentProcesses(f.t)

// there can be a single agent left when using --develop mode
if f.installOpts != nil && f.installOpts.Namespace != "" {
assert.LessOrEqualf(f.t, len(processes), 1, "More than one agent left running at the end of the test when second agent in namespace %s was used: %v", f.installOpts.Namespace, processes)
// Only consider the main agent process and not sub-processes so that we can detect when
// multiple agents are running without needing to know the number of input sub-processes to expect.
agentProcesses := getElasticAgentProcesses(f.t)
assert.LessOrEqualf(f.t, len(agentProcesses), 1, "More than one agent left running at the end of the test when second agent in namespace %s was used: %v", f.installOpts.Namespace, agentProcesses)
// The agent left running has to be the non-development agent. The development agent should be uninstalled first as a convention.
if len(processes) > 0 {
assert.NotContainsf(f.t, processes[0].Cmdline, paths.InstallDirNameForNamespace(f.installOpts.Namespace),
"The agent installed into namespace %s was left running at the end of the test or was not uninstalled first: %v", f.installOpts.Namespace, processes)
if len(agentProcesses) > 0 {
assert.NotContainsf(f.t, agentProcesses[0].Cmdline, paths.InstallDirNameForNamespace(f.installOpts.Namespace),
"The agent installed into namespace %s was left running at the end of the test or was not uninstalled first: %v", f.installOpts.Namespace, agentProcesses)
}
return
}

// If not using an installation namespace, there should be no elastic-agent or agentbeat processes left running.
processes := getElasticAgentAndAgentbeatProcesses(f.t)
assert.Empty(f.t, processes, "there should be no running agent at the end of the test")
})

Expand Down Expand Up @@ -409,6 +412,13 @@ func getElasticAgentProcesses(t *gotesting.T) []runningProcess {
return getProcesses(t, `.*elastic\-agent.*`)
}

// Includes both the main elastic-agent process and the agentbeat sub-processes for ensuring
// that no sub-processes are orphaned from their parent process and left running. This
// primarily tests that Windows Job Object assignment works.
func getElasticAgentAndAgentbeatProcesses(t *gotesting.T) []runningProcess {
return getProcesses(t, `.*(elastic\-agent|agentbeat).*`)
}

func getProcesses(t *gotesting.T, regex string) []runningProcess {
procStats := agentsystemprocess.Stats{
Procs: []string{regex},
Expand Down
9 changes: 8 additions & 1 deletion pkg/testing/tools/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func ConnectedToFleet(ctx context.Context, t *testing.T, fixture *integrationtes
// for use with assert.Eventually or require.Eventually.
func FleetAgentStatus(ctx context.Context,
t *testing.T,
fixture *integrationtest.Fixture,
client *kibana.Client,
policyID,
expectedStatus string) func() bool {
Expand All @@ -61,7 +62,13 @@ func FleetAgentStatus(ctx context.Context,
return true
}

t.Logf("Agent fleet status: %s", currentStatus)
agentStatus, err := fixture.ExecStatus(ctx)
if err != nil {
t.Logf("Agent fleet status: %s Error getting local status: %s", currentStatus, err)
return false
}

t.Logf("Agent fleet status: %s Local status: %v", currentStatus, agentStatus)
return false
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/testing/tools/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func InstallAgentForPolicyWithToken(ctx context.Context, t *testing.T,
// Wait for Agent to be healthy
require.Eventually(
t,
check.FleetAgentStatus(ctx, t, kibClient, policyID, "online"),
check.FleetAgentStatus(ctx, t, agentFixture, kibClient, policyID, "online"),
timeout,
10*time.Second,
"Elastic Agent status is not online",
Expand Down
2 changes: 0 additions & 2 deletions testing/integration/logs_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
atesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/tools"
"github.com/elastic/elastic-agent/pkg/testing/tools/check"
"github.com/elastic/elastic-agent/pkg/testing/tools/fleettools"
"github.com/elastic/elastic-agent/pkg/testing/tools/testcontext"
"github.com/elastic/elastic-agent/testing/installtest"
Expand Down Expand Up @@ -102,7 +101,6 @@ func TestLogIngestionFleetManaged(t *testing.T) {
createPolicyReq)
require.NoError(t, err)
t.Logf("created policy: %s", policy.ID)
check.ConnectedToFleet(ctx, t, agentFixture, 5*time.Minute)

// 3. Ensure installation is correct.
require.NoError(t, installtest.CheckSuccess(ctx, agentFixture, installOpts.BasePath, &installtest.CheckOpts{Privileged: installOpts.Privileged}))
Expand Down
4 changes: 2 additions & 2 deletions testing/integration/upgrade_fleet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func testUpgradeFleetManagedElasticAgent(
t.Log("Waiting for enrolled Agent status to be online...")
require.Eventually(t,
check.FleetAgentStatus(
ctx, t, kibClient, policyResp.ID, "online"),
ctx, t, startFixture, kibClient, policyResp.ID, "online"),
2*time.Minute,
10*time.Second,
"Agent status is not online")
Expand Down Expand Up @@ -446,7 +446,7 @@ func testUpgradeFleetManagedElasticAgent(
require.NoError(t, err)

t.Log("Waiting for enrolled Agent status to be online...")
require.Eventually(t, check.FleetAgentStatus(ctx, t, kibClient, policyResp.ID, "online"), 10*time.Minute, 15*time.Second, "Agent status is not online")
require.Eventually(t, check.FleetAgentStatus(ctx, t, startFixture, kibClient, policyResp.ID, "online"), 10*time.Minute, 15*time.Second, "Agent status is not online")

// wait for version
require.Eventually(t, func() bool {
Expand Down

0 comments on commit 1a2e29e

Please sign in to comment.