Skip to content

Commit

Permalink
add extended runtime test
Browse files Browse the repository at this point in the history
  • Loading branch information
fearful-symmetry committed Jan 25, 2024
1 parent 65a68f8 commit 9516c4d
Show file tree
Hide file tree
Showing 5 changed files with 1,053 additions and 2 deletions.
10 changes: 10 additions & 0 deletions magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1600,6 +1600,14 @@ func (Integration) TestBeatServerless(ctx context.Context, beatname string) erro
return integRunner(ctx, false, "TestBeatsServerless")
}

func (Integration) TestExtendedRuntime(ctx context.Context) error {
err := os.Setenv("TEST_EXTENDED", "true")
if err != nil {
return fmt.Errorf("error setting binary name: %w", err)
}
return integRunner(ctx, false, "TestAgentLong")
}

// TestOnRemote shouldn't be called locally (called on remote host to perform testing)
func (Integration) TestOnRemote(ctx context.Context) error {
mg.Deps(Build.TestBinaries)
Expand Down Expand Up @@ -1826,6 +1834,8 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche
extraEnv["AGENT_KEEP_INSTALLED"] = os.Getenv("AGENT_KEEP_INSTALLED")
}

extraEnv["TEST_EXTENDED"] = os.Getenv("TEST_EXTENDED")

// these following two env vars are currently not used by anything, but can be used in the future to test beats or
// other binaries, see https://github.com/elastic/elastic-agent/pull/3258
binaryName := os.Getenv("TEST_BINARY_NAME")
Expand Down
7 changes: 5 additions & 2 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,10 @@ func (f *Fixture) ExecStatus(ctx context.Context, opts ...process.CmdOption) (Ag
}, uerr))
}

return status, err
if err != nil {
return status, fmt.Errorf("error running command (output: %s): %w", string(out), err)
}
return status, nil
}

// ExecInspect executes to inspect subcommand on the prepared Elastic Agent binary.
Expand Down Expand Up @@ -735,7 +738,7 @@ func (f *Fixture) ExecVersion(ctx context.Context, opts ...process.CmdOption) (A
func (f *Fixture) IsHealthy(ctx context.Context, opts ...process.CmdOption) error {
status, err := f.ExecStatus(ctx, opts...)
if err != nil {
return fmt.Errorf("agent status returned and error: %w", err)
return fmt.Errorf("agent status returned an error: %w", err)
}

if status.State != int(cproto.State_HEALTHY) {
Expand Down
36 changes: 36 additions & 0 deletions pkg/testing/tools/estools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,41 @@ func DeletePipelines(ctx context.Context, client elastictransport.Interface, nam
return nil
}

func FindMatchingLogLinesForAgent(client elastictransport.Interface, agentID, line string) (Documents, error) {
return FindMatchingLogLinesForAgentWithContext(context.Background(), client, agentID, line)
}

func FindMatchingLogLinesForAgentWithContext(ctx context.Context, client elastictransport.Interface, agentID, line string) (Documents, error) {
queryRaw := map[string]interface{}{
"query": map[string]interface{}{
"bool": map[string]interface{}{
"must": []map[string]interface{}{
{
"match_phrase": map[string]interface{}{
"message": line,
},
},
{
"term": map[string]interface{}{
"agent.id": map[string]interface{}{
"value": agentID,
},
},
},
},
},
},
}

var buf bytes.Buffer
err := json.NewEncoder(&buf).Encode(queryRaw)
if err != nil {
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
}

// FindMatchingLogLinesWithContext returns any logs with message fields that match the given line
func FindMatchingLogLinesWithContext(ctx context.Context, client elastictransport.Interface, namespace, line string) (Documents, error) {
queryRaw := map[string]interface{}{
Expand Down Expand Up @@ -538,6 +573,7 @@ func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
es.Search.WithContext(ctx),
es.Search.WithSize(2000),
)
if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
Expand Down
213 changes: 213 additions & 0 deletions testing/integration/agent_long_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
package integration

import (
"cmp"
"context"
"encoding/json"
"fmt"
"os"
"testing"
"time"

"github.com/elastic/elastic-agent-libs/kibana"

Check failure on line 12 in testing/integration/agent_long_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

File is not `goimports`-ed with -local github.com/elastic (goimports)
atesting "github.com/elastic/elastic-agent/pkg/testing"
"github.com/elastic/elastic-agent/pkg/testing/define"
"github.com/elastic/elastic-agent/pkg/testing/tools"
"github.com/elastic/elastic-agent/pkg/testing/tools/estools"
"github.com/google/uuid"
"github.com/mitchellh/mapstructure"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"golang.org/x/exp/slices"
)

type ExtendedRunner struct {
suite.Suite
info *define.Info
agentFixture *atesting.Fixture

ESHost string
}

type ComponentMetrics struct {
Memory MemoryMetrics `mapstructure:"memstats"`
Handles HandlesMetrics `mapstructure:"handles"`
}

// TestComponent is used as a key in our map of component metrics
type TestComponent struct {
Binary string `mapstructure:"binary"`
Dataset string `mapstructure:"dataset"`
ID string `mapstructure:"id"`
CompType string `mapstructure:"type"`
}

type MemoryMetrics struct {
GcNext uint64 `mapstructure:"gc_next"`
MemoryAlloc uint64 `mapstructure:"memory_alloc"`
MemorySys uint64 `mapstructure:"memory_sys"`
MemoryTotal uint64 `mapstructure:"memory_total"`
RSS uint64 `mapstructure:"rss"`
}

type HandlesMetrics struct {
Open uint64 `mapstructure:"open"`
Limit HandlesLimits `mapstructure:"limit"`
}

type HandlesLimits struct {
Hard uint `mapstructure:"hard"`
Soft uint64 `mapstructure:"soft"`
}

func TestAgentLong(t *testing.T) {
if os.Getenv("TEST_EXTENDED") == "" {
t.Skipf("not running extended test unless TEST_EXTENDED is set")
}
info := define.Require(t, define.Requirements{
Group: "fleet",
Stack: &define.Stack{},
Local: false, // requires Agent installation
Sudo: true, // requires Agent installation
OS: []define.OS{
{Type: define.Linux},
},
})

suite.Run(t, &ExtendedRunner{info: info})
}

func (runner *ExtendedRunner) SetupSuite() {
policyUUID := uuid.New().String()

installOpts := atesting.InstallOpts{
NonInteractive: true,
Force: true,
}

fixture, err := define.NewFixture(runner.T(), define.Version())
require.NoError(runner.T(), err)
runner.agentFixture = fixture

ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel()

basePolicy := kibana.AgentPolicy{
Name: "test-policy-" + policyUUID,
Namespace: "default",
Description: "Test policy " + policyUUID,
MonitoringEnabled: []kibana.MonitoringEnabledOption{
kibana.MonitoringEnabledLogs,
kibana.MonitoringEnabledMetrics,
},
}

policyResp, err := tools.InstallAgentWithPolicy(ctx, runner.T(), installOpts, runner.agentFixture, runner.info.KibanaClient, basePolicy)
require.NoError(runner.T(), err)

systemPackage := kibana.PackagePolicyRequest{}

jsonRaw, err := os.ReadFile("agent_long_test_base_system_integ.json")
require.NoError(runner.T(), err)

err = json.Unmarshal(jsonRaw, &systemPackage)
require.NoError(runner.T(), err)

systemPackage.ID = policyUUID
systemPackage.PolicyID = policyResp.ID
systemPackage.Namespace = "default"
systemPackage.Name = fmt.Sprintf("system-long-test-%s", policyUUID)
systemPackage.Vars = map[string]interface{}{}

runner.T().Logf("Installing fleet package....")
_, err = runner.info.KibanaClient.InstallFleetPackage(ctx, systemPackage)
require.NoError(runner.T(), err, "error creating fleet package")

//runner.T().Logf("Got policy response :%#v", pkgResp)

}

func (runner *ExtendedRunner) TestAgentLong() {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour)
defer cancel()

runtime := os.Getenv("LONG_TEST_RUNTIME")
if runtime == "" {
runtime = "120m"
}

testDuration, err := time.ParseDuration(runtime)
require.NoError(runner.T(), err)

timer := time.NewTimer(testDuration)

// time to perform a health check
ticker := time.Tick(time.Minute)

done := false
for {
if done {
break
}
select {
case <-timer.C:
done = true
case <-ticker:
err := runner.agentFixture.IsHealthy(ctx)
require.NoError(runner.T(), err)
}
}

status, err := runner.agentFixture.ExecStatus(ctx)
require.NoError(runner.T(), err)
runner.T().Logf("Looking for logs matching agent ID %s", status.Info.ID)

docs, err := estools.FindMatchingLogLinesForAgent(runner.info.ESClient, status.Info.ID, "last 30s")
require.NoError(runner.T(), err)

// iterate over hits, compile metrics based on the component
componentCollection := map[TestComponent][]ComponentMetrics{}
for _, doc := range docs.Hits.Hits {

componentData := doc.Source["component"].(map[string]interface{})
toComponent := TestComponent{}
err := mapstructure.Decode(componentData, &toComponent)
require.NoError(runner.T(), err)

monitoringData := doc.Source["monitoring"].(map[string]interface{})["metrics"].(map[string]interface{})["beat"].(map[string]interface{})
metrics := ComponentMetrics{}
err = mapstructure.Decode(monitoringData, &metrics)
require.NoError(runner.T(), err)

if foundComp, ok := componentCollection[toComponent]; ok {
updated := append(foundComp, metrics)
componentCollection[toComponent] = updated
} else {
componentCollection[toComponent] = []ComponentMetrics{metrics}
}
}

// after we get all the metrics, sort by memory/handles to see if we've passed a threshhold

Check failure on line 190 in testing/integration/agent_long_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

`threshhold` is a misspelling of `threshold` (misspell)
for comp, metrics := range componentCollection {
// we're using Sys from `runtime.ReadMemStats` for this. From the `runtime` godoc:
//
//Sys is the sum of the XSys fields below.
//Sys measures the virtual address space reserved by the Go runtime for the heap, stacks, and other internal data structures.
//It's likely that not all of the virtual address space is backed by physical memory at any given moment, though in general it all was at some point.

// At some point in the future, this is where we'll fail the test if our metrics go over a given threshhold.

Check failure on line 198 in testing/integration/agent_long_test.go

View workflow job for this annotation

GitHub Actions / lint (ubuntu-latest)

`threshhold` is a misspelling of `threshold` (misspell)
slices.SortFunc(metrics, func(a, b ComponentMetrics) int { return cmp.Compare(a.Memory.MemorySys, b.Memory.MemorySys) })
highestMem := metrics[len(metrics)-1].Memory.MemorySys
runner.T().Logf("Top memory usage for %s: %d bytes", comp.Dataset, highestMem)

highestHandleOpen := metrics[len(metrics)-1].Handles.Open
slices.SortFunc(metrics, func(a, b ComponentMetrics) int { return cmp.Compare(a.Handles.Open, b.Handles.Open) })

runner.T().Logf("Top count of open handles for %s: %d", comp.Dataset, highestHandleOpen)
if softLimit := metrics[len(metrics)-1].Handles.Limit.Soft; softLimit > 0 {
limitPct := (float64(highestHandleOpen) / float64(softLimit)) * 100
runner.T().Logf("Percent of handle soft limit: %f", limitPct)
}
}

}
Loading

0 comments on commit 9516c4d

Please sign in to comment.