Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update all usages of fleettools to use the installed Agent ID #7054

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
126 changes: 109 additions & 17 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,24 +714,100 @@ func (e *ExecErr) Unwrap() error {
return e.err
}

// ExecStatus executes the status subcommand on the prepared Elastic Agent binary.
// It returns the parsed output and the error from the execution. Keep in mind
// the agent exits with status 1 if it's unhealthy, but it still outputs the
// status successfully. An empty AgentStatusOutput and non nil error
// means the output could not be parsed.
// As long as we get some output, we don't return any error.
// It should work with any 8.6+ agent
func (f *Fixture) ExecStatus(ctx context.Context, opts ...process.CmdOption) (AgentStatusOutput, error) {
out, err := f.Exec(ctx, []string{"status", "--output", "json"}, opts...)
status := AgentStatusOutput{}
if uerr := json.Unmarshal(out, &status); uerr != nil {
return AgentStatusOutput{},
fmt.Errorf("could not unmarshal agent status output: %w:\n%s", errors.Join(uerr, err), out)
} else if status.IsZero() {
return status, fmt.Errorf("agent status output is empty: %w", err)
type statusOpts struct {
noRetry bool
retryTimeout time.Duration
retryInterval time.Duration

cmdOptions []process.CmdOption
}

type statusOpt func(*statusOpts)

// WithNoRetry disables the retry logic in ExecStatus function call.
func WithNoRetry() func(opt *statusOpts) {
return func(opt *statusOpts) {
opt.noRetry = true
}
}

return status, nil
// WithRetryTimeout adjusts the retry timeout from the default value of one minute.
func WithRetryTimeout(duration time.Duration) func(opt *statusOpts) {
return func(opt *statusOpts) {
opt.retryTimeout = duration
}
}

// WithRetryInterval adjusts the retry interval from the default value of one second.
func WithRetryInterval(duration time.Duration) func(opt *statusOpts) {
return func(opt *statusOpts) {
opt.retryInterval = duration
}
}

// WithCmdOptions adjusts the options of the command when status is called.
func WithCmdOptions(cmdOptions ...process.CmdOption) func(opt *statusOpts) {
return func(opt *statusOpts) {
opt.cmdOptions = append(opt.cmdOptions, cmdOptions...)
}
}

// ExecStatus executes `elastic-agent status --output=json`.
//
// Returns the parsed output and the error from the execution. Keep in mind the agent exits with status 1 if it's
// unhealthy, but it still outputs the status successfully. This call does require that the Elastic Agent is running
// and communication over the control protocol is working.
//
// By default, retry logic is applied. Use WithNoRetry to disable this behavior. WithRetryTimeout and WithRetryInterval
// can be used to adjust the retry logic timing. The default retry timeout is one minute and the default retry
// interval is one second.
//
// An empty AgentStatusOutput and non nil error means the output could not be parsed. As long as we get some output,
// we don't return any error. It should work with any 8.6+ agent
func (f *Fixture) ExecStatus(ctx context.Context, opts ...statusOpt) (AgentStatusOutput, error) {
var opt statusOpts
opt.retryTimeout = 1 * time.Minute
opt.retryInterval = 1 * time.Second
for _, o := range opts {
o(&opt)
}

if opt.noRetry {
out, err := f.Exec(ctx, []string{"status", "--output", "json"}, opt.cmdOptions...)
status := AgentStatusOutput{}
if uerr := json.Unmarshal(out, &status); uerr != nil {
return AgentStatusOutput{},
fmt.Errorf("could not unmarshal agent status output: %w:\n%s", errors.Join(uerr, err), out)
} else if status.IsZero() {
return status, fmt.Errorf("agent status output is empty: %w", err)
}
}

ctx, cancel := context.WithTimeout(ctx, opt.retryTimeout)
defer cancel()

var lastErr error
for {
if ctx.Err() != nil {
if errors.Is(ctx.Err(), context.DeadlineExceeded) && lastErr != nil {
// return the last observed error
return AgentStatusOutput{}, fmt.Errorf("agent status returned an error: %w", lastErr)
}
return AgentStatusOutput{}, fmt.Errorf("agent status failed: %w", ctx.Err())
}
out, err := f.Exec(ctx, []string{"status", "--output", "json"}, opt.cmdOptions...)
status := AgentStatusOutput{}
if uerr := json.Unmarshal(out, &status); uerr != nil {
// unmarshal error means that json was not outputted due to a communication error
lastErr = fmt.Errorf("could not unmarshal agent status output: %w:\n%s", errors.Join(uerr, err), out)
} else if status.IsZero() {
// still not correct try again for a successful status
lastErr = fmt.Errorf("agent status output is empty: %w", err)
} else {
return status, nil
}
sleepFor(ctx, opt.retryInterval)
}
}

// ExecInspect executes to inspect subcommand on the prepared Elastic Agent binary.
Expand Down Expand Up @@ -799,12 +875,21 @@ func (f *Fixture) ExecDiagnostics(ctx context.Context, cmd ...string) (string, e
return files[0], err
}

// AgentID returns the ID of the installed Elastic Agent.
func (f *Fixture) AgentID(ctx context.Context, opts ...statusOpt) (string, error) {
status, err := f.ExecStatus(ctx, opts...)
if err != nil {
return "", err
}
return status.Info.ID, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is possibly misleading, because even standalone agents have IDs, that are then replaced with one generated by Fleet after enrollment succeeds.

For example I see the following with a local standalone agent. Notice "is_managed": false there but "id": "913ce739-2c6c-45e9-90f5-2226a14bca70" being populated.

sudo elastic-development-agent status --output=json
{
    "info": {
        "id": "913ce739-2c6c-45e9-90f5-2226a14bca70",
        "version": "9.1.0",
        "commit": "d2047ac48df2f4536ca69a86ad4922b3e264501a",
        "build_time": "2025-02-25 21:52:49 +0000 UTC",
        "snapshot": true,
        "pid": 70294,
        "unprivileged": false,
        "is_managed": false
    },
    "state": 2,
    "message": "Running",

Just looking at the ID at any one point in time is not going to give you a valid ID for making requests to Fleet.

We probably want an explicit entry in the status output for the ID as assigned by Fleet so we can poll for it to be populated. Otherwise I worry there will be a race conditions in tests where sometimes the standalone ID is picked up before it replaced by the one assigned during enrollment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not misleading, but it does greatly depend on when you ask for the AgentID. You must check after enrollment has occurred. You don't need to worry about it picking up the wrong ID, as long as you are calling it at the correct time. I think AgentID() is also useful in the standalone case, so I don't think check if is_managed: true would be correct for this type of call.

}

// IsHealthy checks whether the prepared Elastic Agent reports itself as healthy.
// It returns an error if either the reported state isn't healthy or if it fails
// to fetch the current state. If the status is successfully fetched, but it
// isn't healthy, the error will contain the reported status.
// This function is compatible with any Elastic Agent version 8.6 or later.
func (f *Fixture) IsHealthy(ctx context.Context, opts ...process.CmdOption) error {
func (f *Fixture) IsHealthy(ctx context.Context, opts ...statusOpt) error {
status, err := f.ExecStatus(ctx, opts...)
if err != nil {
return fmt.Errorf("agent status returned an error: %w", err)
Expand Down Expand Up @@ -1450,3 +1535,10 @@ func (v *AgentBinaryVersion) String() string {
type AgentVersionOutput struct {
Binary AgentBinaryVersion `yaml:"binary"`
}

func sleepFor(ctx context.Context, amount time.Duration) {
select {
case <-ctx.Done():
case <-time.After(amount):
}
}
8 changes: 6 additions & 2 deletions pkg/testing/tools/check/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,14 @@ func FleetAgentStatus(ctx context.Context,
t *testing.T,
fixture *integrationtest.Fixture,
client *kibana.Client,
policyID,
expectedStatus string) func() bool {
return func() bool {
currentStatus, err := fleettools.GetAgentStatus(ctx, client, policyID)
agentID, err := fixture.AgentID(ctx)
if err != nil {
t.Errorf("failed to get agent ID: %s", err.Error())
return false
}
currentStatus, err := fleettools.GetAgentStatus(ctx, client, agentID)
if err != nil {
t.Errorf("unable to determine agent status: %s", err.Error())
return false
Expand Down
130 changes: 9 additions & 121 deletions pkg/testing/tools/fleettools/fleet.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,8 @@ package fleettools

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"os"

"github.com/gofrs/uuid/v5"

Expand All @@ -25,151 +20,44 @@ type EnrollParams struct {
PolicyID string `json:"policy_id"`
}

func extractError(result []byte) error {
var kibanaResult struct {
Message string
Attributes struct {
Objects []struct {
ID string
Error struct {
Message string
}
}
}
}
if err := json.Unmarshal(result, &kibanaResult); err != nil {
return fmt.Errorf("error extracting JSON for error response: %w", err)
}
var errs []error
if kibanaResult.Message != "" {
for _, err := range kibanaResult.Attributes.Objects {
errs = append(errs, fmt.Errorf("id: %s, message: %s", err.ID, err.Error.Message))
}
if len(errs) == 0 {
return fmt.Errorf("%s", kibanaResult.Message)
}
return fmt.Errorf("%s: %w", kibanaResult.Message, errors.Join(errs...))

}
return nil
}

// GetAgentByPolicyIDAndHostnameFromList get an agent by the local_metadata.host.name property, reading from the agents list
func GetAgentByPolicyIDAndHostnameFromList(ctx context.Context, client *kibana.Client, policyID, hostname string) (*kibana.AgentExisting, error) {
params := url.Values{}
params.Add("kuery", fmt.Sprintf(`local_metadata.host.name:"%s" and policy_id:"%s" and active:true`, hostname, policyID))

resp, err := client.Connection.SendWithContext(ctx, http.MethodGet, "/api/fleet/agents", params, nil, nil)
if err != nil {
return nil, fmt.Errorf("error calling list agents API: %w", err)
}
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %w", err)
}

if resp.StatusCode != http.StatusOK {
return nil, extractError(b)
}
var r kibana.ListAgentsResponse
err = json.Unmarshal(b, &r)
if err != nil {
return nil, fmt.Errorf("unmarshalling response json: %w", err)
}

if len(r.Items) == 0 {
return nil, fmt.Errorf("unable to find agent with hostname [%s] for policy [%s]",
hostname, policyID)
}

if len(r.Items) > 1 {
return nil, fmt.Errorf("found %d agents with hostname [%s] for policy [%s]; expected to find only one, response:\n%s", len(r.Items), hostname, policyID, b)
}

return &r.Items[0], nil
}

func GetAgentIDByHostname(ctx context.Context, client *kibana.Client, policyID, hostname string) (string, error) {
agent, err := GetAgentByPolicyIDAndHostnameFromList(ctx, client, policyID, hostname)
func GetAgentStatus(ctx context.Context, client *kibana.Client, agentID string) (string, error) {
agent, err := client.GetAgent(ctx, kibana.GetAgentRequest{ID: agentID})
if err != nil {
return "", err
}
return agent.Agent.ID, nil
}

func GetAgentStatus(ctx context.Context, client *kibana.Client, policyID string) (string, error) {
hostname, err := os.Hostname()
if err != nil {
return "", err
}

agent, err := GetAgentByPolicyIDAndHostnameFromList(ctx, client, policyID, hostname)
if err != nil {
return "", err
}

return agent.Status, nil
}

func GetAgentVersion(ctx context.Context, client *kibana.Client, policyID string) (string, error) {
hostname, err := os.Hostname()
if err != nil {
return "", err
}

agent, err := GetAgentByPolicyIDAndHostnameFromList(ctx, client, policyID, hostname)
func GetAgentVersion(ctx context.Context, client *kibana.Client, agentID string) (string, error) {
agent, err := client.GetAgent(ctx, kibana.GetAgentRequest{ID: agentID})
if err != nil {
return "", err
}

return agent.Agent.Version, err
return agent.Agent.Version, nil
}

func UnEnrollAgent(ctx context.Context, client *kibana.Client, policyID string) error {
hostname, err := os.Hostname()
if err != nil {
return err
}
agentID, err := GetAgentIDByHostname(ctx, client, policyID, hostname)
if err != nil {
return err
}

func UnEnrollAgent(ctx context.Context, client *kibana.Client, agentID string) error {
unEnrollAgentReq := kibana.UnEnrollAgentRequest{
ID: agentID,
Revoke: true,
}
_, err = client.UnEnrollAgent(ctx, unEnrollAgentReq)
_, err := client.UnEnrollAgent(ctx, unEnrollAgentReq)
if err != nil {
return fmt.Errorf("unable to unenroll agent with ID [%s]: %w", agentID, err)
}

return nil
}

func UpgradeAgent(ctx context.Context, client *kibana.Client, policyID, version string, force bool) error {
// TODO: fix me: this does not work if FQDN is enabled
hostname, err := os.Hostname()
if err != nil {
return err
}
agentID, err := GetAgentIDByHostname(ctx, client, policyID, hostname)
if err != nil {
return err
}

func UpgradeAgent(ctx context.Context, client *kibana.Client, agentID, version string, force bool) error {
upgradeAgentReq := kibana.UpgradeAgentRequest{
ID: agentID,
Version: version,
Force: force,
}
_, err = client.UpgradeAgent(ctx, upgradeAgentReq)
_, err := client.UpgradeAgent(ctx, upgradeAgentReq)
if err != nil {
return fmt.Errorf("unable to upgrade agent with ID [%s]: %w", agentID, err)
}

return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/testing/tools/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func InstallAgentForPolicyWithToken(ctx context.Context, t *testing.T,
// Wait for Agent to be healthy
require.Eventually(
t,
check.FleetAgentStatus(ctx, t, agentFixture, kibClient, policyID, "online"),
check.FleetAgentStatus(ctx, t, agentFixture, kibClient, "online"),
timeout,
10*time.Second,
"Elastic Agent status is not online",
Expand Down
8 changes: 4 additions & 4 deletions testing/integration/container_cmd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestContainerCMD(t *testing.T) {
// the agent logs will be present in the error message
// which should help to explain why the agent was not
// healthy.
err = agentFixture.IsHealthy(ctx, withEnv(env))
err = agentFixture.IsHealthy(ctx, atesting.WithCmdOptions(withEnv(env)))
return err == nil
},
5*time.Minute, time.Second,
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestContainerCMDWithAVeryLongStatePath(t *testing.T) {
// the agent logs will be present in the error message
// which should help to explain why the agent was not
// healthy.
err = agentFixture.IsHealthy(ctx, withEnv(env))
err = agentFixture.IsHealthy(ctx, atesting.WithCmdOptions(withEnv(env)))
return err == nil
},
1*time.Minute, time.Second,
Expand Down Expand Up @@ -369,7 +369,7 @@ func TestContainerCMDEventToStderr(t *testing.T) {
// the agent logs will be present in the error message
// which should help to explain why the agent was not
// healthy.
err := agentFixture.IsHealthy(ctx, withEnv(env))
err := agentFixture.IsHealthy(ctx, atesting.WithCmdOptions(withEnv(env)))
return err == nil
},
2*time.Minute, time.Second,
Expand Down Expand Up @@ -402,7 +402,7 @@ func createMockESOutput(t *testing.T, info *define.Info) (string, string) {
"%s"
],
"preset": "latency"
}
}
`
// The API will return an error if the output ID/name contains an
// UUID substring, so we replace the '-' by '_' to keep the API happy.
Expand Down
Loading