Skip to content

Commit

Permalink
Merge branch 'main' into feat/standard-uuid-lib
Browse files Browse the repository at this point in the history
  • Loading branch information
kruskall authored Jul 19, 2024
2 parents 66555cb + 463bbb4 commit b523946
Show file tree
Hide file tree
Showing 19 changed files with 1,210 additions and 153 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix high IO and handling of a corrupted registry log file. {pull}35893[35893]
- Filebeat, when running with Elastic-Agent, reports status for Filestream input. {pull}40121[40121]
- Implement Elastic Agent status and health reporting for Winlog Filebeat input. {pull}40163[40163]
- Fix filestream's registry GC: registry entries will never be removed if clean_inactive is set to "-1". {pull}40258[40258]

*Heartbeat*

Expand All @@ -55,6 +56,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Setting period for counter cache for Prometheus remote_write at least to 60sec {pull}38553[38553]
- Add support of Graphite series 1.1.0+ tagging extension for statsd module. {pull}39619[39619]
- Remove fallback to the node limit for the `kubernetes.pod.cpu.usage.limit.pct` and `kubernetes.pod.memory.usage.limit.pct` metrics calculation
- Add support for Kibana status metricset in v8 format {pull}40275[40275]

*Osquerybeat*

Expand Down Expand Up @@ -178,6 +180,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix behavior of pagetypeinfo metrics {pull}39985[39985]
- Fix query logic for temp and non-temp tablespaces in Oracle module. {issue}38051[38051] {pull}39787[39787]
- Set GCP metrics config period to the default (60s) when the value is below the minimum allowed period. {issue}30434[30434] {pull}40020[40020]
- Add GCP 'instance_id' resource label in ECS cloud fields. {issue}40033[40033] {pull}40062[40062]
- Fix missing metrics from CloudWatch when include_linked_accounts set to false. {issue}40071[40071] {pull}40135[40135]
- Update beat module with apm-server monitoring metrics fields {pull}40127[40127]

Expand Down
2 changes: 1 addition & 1 deletion filebeat/_meta/config/filebeat.inputs.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ filebeat.inputs:

# Files for the modification data is older than clean_inactive the state from the registry is removed
# By default this is disabled.
#clean_inactive: 0
#clean_inactive: -1

# Removes the state for files which cannot be found on disk anymore immediately
#clean_removed: true
Expand Down
2 changes: 1 addition & 1 deletion filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -839,7 +839,7 @@ filebeat.inputs:

# Files for the modification data is older than clean_inactive the state from the registry is removed
# By default this is disabled.
#clean_inactive: 0
#clean_inactive: -1

# Removes the state for files which cannot be found on disk anymore immediately
#clean_removed: true
Expand Down
16 changes: 9 additions & 7 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@ import (
type config struct {
Reader readerConfig `config:",inline"`

ID string `config:"id"`
Paths []string `config:"paths"`
Close closerConfig `config:"close"`
FileWatcher *conf.Namespace `config:"prospector"`
FileIdentity *conf.Namespace `config:"file_identity"`
CleanInactive time.Duration `config:"clean_inactive" validate:"min=0"`
ID string `config:"id"`
Paths []string `config:"paths"`
Close closerConfig `config:"close"`
FileWatcher *conf.Namespace `config:"prospector"`
FileIdentity *conf.Namespace `config:"file_identity"`

// -1 means that registry will never be cleaned
CleanInactive time.Duration `config:"clean_inactive" validate:"min=-1"`
CleanRemoved bool `config:"clean_removed"`
HarvesterLimit uint32 `config:"harvester_limit" validate:"min=0"`
IgnoreOlder time.Duration `config:"ignore_older"`
Expand Down Expand Up @@ -98,7 +100,7 @@ func defaultConfig() config {
Reader: defaultReaderConfig(),
Paths: []string{},
Close: defaultCloserConfig(),
CleanInactive: 0,
CleanInactive: -1,
CleanRemoved: true,
HarvesterLimit: 0,
IgnoreOlder: 0,
Expand Down
9 changes: 5 additions & 4 deletions filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,11 @@ func Plugin(log *logp.Logger, store loginp.StateStore) input.Plugin {
Info: "filestream input",
Doc: "The filestream input collects logs from the local filestream service",
Manager: &loginp.InputManager{
Logger: log,
StateStore: store,
Type: pluginName,
Configure: configure,
Logger: log,
StateStore: store,
Type: pluginName,
Configure: configure,
DefaultCleanTimeout: -1,
},
}
}
Expand Down
4 changes: 3 additions & 1 deletion filebeat/input/filestream/internal/input-logfile/clean.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,5 +130,7 @@ func checkCleanResource(started, now time.Time, resource *resource) bool {
reference = started
}

return reference.Add(ttl).Before(now) && resource.stored
// if ttl is negative, we never delete the entry
// else check for time elapsed
return ttl >= 0 && reference.Add(ttl).Before(now) && resource.stored
}
20 changes: 20 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/clean_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,24 @@ func TestGCStore(t *testing.T) {
want := map[string]state{}
checkEqualStoreState(t, want, backend.snapshot())
})

t.Run("state never removed with ttl=-1", func(t *testing.T) {

// keep started as a large value
started := time.Now().Add(-1 * time.Hour * 24 * 356) // cleanup process is running for a while already

initState := map[string]state{
"test::key": {
TTL: -1,
Updated: started,
},
}

backend := createSampleStore(t, initState)
store := testOpenStore(t, "test", backend)
defer store.Release()

gcStore(logp.NewLogger("test"), started, store)
checkEqualStoreState(t, initState, backend.snapshot())
})
}
3 changes: 0 additions & 3 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ type StateStore interface {

func (cim *InputManager) init() error {
cim.initOnce.Do(func() {
if cim.DefaultCleanTimeout <= 0 {
cim.DefaultCleanTimeout = 30 * time.Minute
}

log := cim.Logger.With("input_type", cim.Type)

Expand Down
62 changes: 61 additions & 1 deletion metricbeat/docs/fields.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -44450,13 +44450,73 @@ alias to: service.version
*`kibana.status.status.overall.state`*::
+
--
Kibana overall state.
Kibana overall state (v7 format).


type: keyword

--

*`kibana.status.status.overall.level`*::
+
--
Kibana overall level (v8 format).


type: keyword

--

*`kibana.status.status.overall.summary`*::
+
--
Kibana overall state in a human-readable format.


type: text

--

*`kibana.status.status.core.elasticsearch.level`*::
+
--
Kibana Elasticsearch client's status


type: keyword

--

*`kibana.status.status.core.elasticsearch.summary`*::
+
--
Kibana Elasticsearch client's status in a human-readable format.


type: text

--

*`kibana.status.status.core.savedObjects.level`*::
+
--
Kibana Saved Objects client's status


type: keyword

--

*`kibana.status.status.core.savedObjects.summary`*::
+
--
Kibana Saved Objects client's status in a human-readable format.


type: text

--

[float]
=== metrics

Expand Down
141 changes: 46 additions & 95 deletions metricbeat/mb/testing/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,13 @@ package testing

import (
"context"
"sync"
"testing"
"time"

"github.com/elastic/go-concert/timed"

"github.com/elastic/beats/v7/metricbeat/mb"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)

type TestModule struct {
Expand Down Expand Up @@ -132,25 +132,6 @@ func NewMetricSetsWithRegistry(t testing.TB, config interface{}, registry *mb.Re
return metricsets
}

func NewReportingMetricSet(t testing.TB, config interface{}) mb.ReportingMetricSet {
metricSet := NewMetricSet(t, config)

reportingMetricSet, ok := metricSet.(mb.ReportingMetricSet)
if !ok {
t.Fatal("MetricSet does not implement ReportingMetricSet")
}

return reportingMetricSet
}

// ReportingFetch runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetch(metricSet mb.ReportingMetricSet) ([]mapstr.M, []error) {
r := &capturingReporter{}
metricSet.Fetch(r)
return r.events, r.errs
}

// NewReportingMetricSetV2 returns a new ReportingMetricSetV2 instance. Then
// you can use ReportingFetchV2 to perform a Fetch operation with the MetricSet.
func NewReportingMetricSetV2(t testing.TB, config interface{}) mb.ReportingMetricSetV2 {
Expand Down Expand Up @@ -186,7 +167,7 @@ func NewReportingMetricSetV2Error(t testing.TB, config interface{}) mb.Reporting
// NewReportingMetricSetV2Errors returns an array of new ReportingMetricSetV2 instances.
func NewReportingMetricSetV2Errors(t testing.TB, config interface{}) []mb.ReportingMetricSetV2Error {
metricSets := NewMetricSets(t, config)
var reportingMetricSets []mb.ReportingMetricSetV2Error
reportingMetricSets := make([]mb.ReportingMetricSetV2Error, 0, len(metricSets))
for _, metricSet := range metricSets {
rMS, ok := metricSet.(mb.ReportingMetricSetV2Error)
if !ok {
Expand Down Expand Up @@ -259,6 +240,41 @@ func ReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error) ([]mb.Event,
return r.events, r.errs
}

// PeriodicReportingFetchV2Error runs the given metricset and returns
// the first batch of events or errors that occur during that period.
//
// `period` is the time between each fetch.
// `timeout` is the maximum time to wait for the first event.
//
// The function tries to fetch the metrics every `period` until it gets
// the first batch of metrics or the `timeout` is reached.
func PeriodicReportingFetchV2Error(metricSet mb.ReportingMetricSetV2Error, period time.Duration, timeout time.Duration) ([]mb.Event, []error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

r := &CapturingReporterV2{}
_ = timed.Periodic(ctx, period, func() error {
// Fetch the metrics and store them in the
// reporter.
if err := metricSet.Fetch(r); err != nil {
r.errs = append(r.errs, err)
return err
}

if len(r.events) > 0 {
// We have metrics, stop the periodic
// and return the metrics.
cancel()
}

// No metrics yet, retry again
// in the next period.
return nil
})

return r.events, r.errs
}

// ReportingFetchV2WithContext runs the given reporting metricset and returns all of the
// events and errors that occur during that period.
func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) ([]mb.Event, []error) {
Expand All @@ -270,71 +286,6 @@ func ReportingFetchV2WithContext(metricSet mb.ReportingMetricSetV2WithContext) (
return r.events, r.errs
}

// NewPushMetricSet instantiates a new PushMetricSet using the given
// configuration. The ModuleFactory and MetricSetFactory are obtained from the
// global Registry.
func NewPushMetricSet(t testing.TB, config interface{}) mb.PushMetricSet {
metricSet := NewMetricSet(t, config)

pushMetricSet, ok := metricSet.(mb.PushMetricSet)
if !ok {
t.Fatal("MetricSet does not implement PushMetricSet")
}

return pushMetricSet
}

type capturingReporter struct {
events []mapstr.M
errs []error
done chan struct{}
}

func (r *capturingReporter) Event(event mapstr.M) bool {
r.events = append(r.events, event)
return true
}

func (r *capturingReporter) ErrorWith(err error, meta mapstr.M) bool {
r.events = append(r.events, meta)
r.errs = append(r.errs, err)
return true
}

func (r *capturingReporter) Error(err error) bool {
r.errs = append(r.errs, err)
return true
}

func (r *capturingReporter) Done() <-chan struct{} {
return r.done
}

// RunPushMetricSet run the given push metricset for the specific amount of time
// and returns all of the events and errors that occur during that period.
func RunPushMetricSet(duration time.Duration, metricSet mb.PushMetricSet) ([]mapstr.M, []error) {
r := &capturingReporter{done: make(chan struct{})}

// Run the metricset.
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
metricSet.Run(r)
}()

// Let it run for some period, then stop it by closing the done channel.
time.AfterFunc(duration, func() {
close(r.done)
})

// Wait for the PushMetricSet to completely stop.
wg.Wait()

// Return all events and errors that were collected.
return r.events, r.errs
}

// NewPushMetricSetV2 instantiates a new PushMetricSetV2 using the given
// configuration. The ModuleFactory and MetricSetFactory are obtained from the
// global Registry.
Expand Down Expand Up @@ -428,16 +379,16 @@ func (r *CapturingPushReporterV2) capture(waitEvents int) []mb.Event {

// BlockingCapture blocks until waitEvents n of events are captured
func (r *CapturingPushReporterV2) BlockingCapture(waitEvents int) []mb.Event {
var events []mb.Event
for {
select {
case e := <-r.eventsC:
events = append(events, e)
if waitEvents > 0 && len(events) >= waitEvents {
return events
}
events := make([]mb.Event, 0, waitEvents)

for e := range r.eventsC {
events = append(events, e)
if waitEvents > 0 && len(events) >= waitEvents {
return events
}
}

return events
}

// RunPushMetricSetV2 run the given push metricset for the specific amount of
Expand Down
Loading

0 comments on commit b523946

Please sign in to comment.