Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Netflow code to the logp logging library (targeting main) #42704

Merged
merged 2 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions x-pack/filebeat/input/netflow/decoder/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
package config

import (
"io"
"time"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields"
"github.com/elastic/elastic-agent-libs/logp"
)

type ActiveSessionsMetric interface {
Expand All @@ -19,7 +19,7 @@ type ActiveSessionsMetric interface {
// Config stores the configuration used by the NetFlow Collector.
type Config struct {
protocols []string
logOutput io.Writer
logOutput *logp.Logger
expiration time.Duration
detectReset bool
fields fields.FieldDict
Expand All @@ -28,21 +28,22 @@ type Config struct {
activeSessionsMetric ActiveSessionsMetric
}

var defaultCfg = Config{
protocols: []string{},
logOutput: io.Discard,
expiration: time.Hour,
detectReset: true,
sharedTemplates: false,
withCache: false,
}

// Defaults returns a configuration object with defaults settings:
// - no protocols are enabled.
// - log output is discarded
// - log output is set to the logger that is passed in.
// - session expiration is checked once every hour.
func Defaults() Config {
return defaultCfg
// - resets are detected.
// - templates are not shared.
// - cache is disabled.
func Defaults(logger *logp.Logger) Config {
return Config{
protocols: []string{},
logOutput: logger,
expiration: time.Hour,
detectReset: true,
sharedTemplates: false,
withCache: false,
}
}

// WithProtocols modifies an existing configuration object to enable the
Expand All @@ -52,12 +53,6 @@ func (c *Config) WithProtocols(protos ...string) *Config {
return c
}

// WithLogOutput sets the output io.Writer for logging.
func (c *Config) WithLogOutput(output io.Writer) *Config {
c.logOutput = output
return c
}

// WithExpiration configures the expiration timeout for sessions and templates.
// A value of zero disables expiration.
func (c *Config) WithExpiration(timeout time.Duration) *Config {
Expand Down Expand Up @@ -121,7 +116,7 @@ func (c *Config) Protocols() []string {
}

// LogOutput returns the io.Writer where logs are to be written.
func (c *Config) LogOutput() io.Writer {
func (c *Config) LogOutput() *logp.Logger {
return c.logOutput
}

Expand Down
6 changes: 4 additions & 2 deletions x-pack/filebeat/input/netflow/decoder/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"net"
"sync"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
Expand Down Expand Up @@ -57,7 +59,7 @@

for _, proto := range p.protos {
if err := proto.Start(); err != nil {
p.stop()

Check failure on line 62 in x-pack/filebeat/input/netflow/decoder/decoder.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `p.stop` is not checked (errcheck)

Check failure on line 62 in x-pack/filebeat/input/netflow/decoder/decoder.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `p.stop` is not checked (errcheck)
return fmt.Errorf("failed to start protocol version %d: %w", proto.Version(), err)
}
}
Expand All @@ -65,7 +67,7 @@
return nil
}

// Stop will stop any background tasks running withing the decoder.

Check failure on line 70 in x-pack/filebeat/input/netflow/decoder/decoder.go

View workflow job for this annotation

GitHub Actions / lint (windows)

`withing` is a misspelling of `within` (misspell)

Check failure on line 70 in x-pack/filebeat/input/netflow/decoder/decoder.go

View workflow job for this annotation

GitHub Actions / lint (linux)

`withing` is a misspelling of `within` (misspell)
func (p *Decoder) Stop() error {
p.mutex.Lock()
defer p.mutex.Unlock()
Expand Down Expand Up @@ -93,8 +95,8 @@
}

// NewConfig returns a new configuration structure to be passed to NewDecoder.
func NewConfig() *config.Config {
cfg := config.Defaults()
func NewConfig(logger *logp.Logger) *config.Config {
cfg := config.Defaults(logger)
return &cfg
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,51 +8,52 @@ import (
"bytes"
"encoding/json"
"fmt"
"log"
"net"
"os"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder"
)

func main() {
decoder, err := decoder.NewDecoder(decoder.NewConfig().
WithLogOutput(os.Stderr).
logger := logp.L().Named("netflow")

decoder, err := decoder.NewDecoder(decoder.NewConfig(logger).
WithProtocols("v1", "v5", "v9", "ipfix"))
if err != nil {
log.Fatal("Failed creating decoder:", err)
logger.Fatal("Failed creating decoder:", err)
}

addr, err := net.ResolveUDPAddr("udp", ":2055")
if err != nil {
log.Fatal("Failed to resolve address:", err)
logger.Fatal("Failed to resolve address:", err)
}

server, err := net.ListenUDP("udp", addr)
if err != nil {
log.Fatalf("Failed to listen on %v: %v", addr, err)
logger.Fatalf("Failed to listen on %v: %v", addr, err)
}
defer server.Close()

if err = server.SetReadBuffer(1 << 16); err != nil {
log.Fatalf("Failed to set read buffer size for socket: %v", err)
logger.Fatalf("Failed to set read buffer size for socket: %v", err)
}

log.Println("Listening on ", server.LocalAddr())
logger.Debug("Listening on ", server.LocalAddr())
buf := make([]byte, 8192)
decBuf := new(bytes.Buffer)
for {
size, remote, err := server.ReadFromUDP(buf)
if err != nil {
log.Println("Error reading from socket:", err)
logger.Debug("Error reading from socket:", err)
continue
}

decBuf.Reset()
decBuf.Write(buf[:size])
records, err := decoder.Read(decBuf, remote)
if err != nil {
log.Printf("warn: Failed reading records from %v: %v\n", remote, err)
logger.Debugf("warn: Failed reading records from %v: %v\n", remote, err)
}

for _, r := range records {
Expand All @@ -63,7 +64,7 @@ func main() {
"data": r.Fields,
})
if err != nil {
log.Fatal(err)
logger.Fatal(err)
}
fmt.Println(string(evt))
}
Expand Down
4 changes: 1 addition & 3 deletions x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
package ipfix

import (
"log"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol"
v9 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/v9"
Expand All @@ -25,11 +23,11 @@
var _ protocol.Protocol = (*IPFixProtocol)(nil)

func init() {
protocol.Registry.Register(ProtocolName, New)

Check failure on line 26 in x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go

View workflow job for this annotation

GitHub Actions / lint (windows)

Error return value of `protocol.Registry.Register` is not checked (errcheck)

Check failure on line 26 in x-pack/filebeat/input/netflow/decoder/ipfix/ipfix.go

View workflow job for this annotation

GitHub Actions / lint (linux)

Error return value of `protocol.Registry.Register` is not checked (errcheck)
}

func New(config config.Config) protocol.Protocol {
logger := log.New(config.LogOutput(), LogPrefix, 0)
logger := config.LogOutput().Named(LogPrefix)
decoder := DecoderIPFIX{
DecoderV9: v9.DecoderV9{Logger: logger, Fields: config.Fields()},
}
Expand Down
16 changes: 11 additions & 5 deletions x-pack/filebeat/input/netflow/decoder/ipfix/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/fields"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test"
v9 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/v9"
)

func init() {
logp.TestingSetup()
}

func TestMessageWithOptions(t *testing.T) {
rawString := "" +
"000a01e45bf435e1000000a500000000000200480400001000080004000c0004" +
Expand Down Expand Up @@ -67,7 +73,7 @@ func TestMessageWithOptions(t *testing.T) {
"version": uint64(10),
},
}
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
flows, err := proto.OnPacket(bytes.NewBuffer(raw), test.MakeAddress(t, "127.0.0.1:1234"))
assert.NoError(t, err)
if assert.Len(t, flows, 7) {
Expand All @@ -84,7 +90,7 @@ func TestOptionTemplates(t *testing.T) {
key := v9.MakeSessionKey(addr, 1234, false)

t.Run("Single options template", func(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
flows, err := proto.OnPacket(test.MakePacket([]uint16{
// Header
// Version, Length, Ts, SeqNo, Source
Expand Down Expand Up @@ -113,7 +119,7 @@ func TestOptionTemplates(t *testing.T) {
})

t.Run("Multiple options template", func(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
raw := test.MakePacket([]uint16{
// Header
// Version, Count, Ts, SeqNo, Source
Expand Down Expand Up @@ -151,7 +157,7 @@ func TestOptionTemplates(t *testing.T) {
})

t.Run("records discarded", func(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))
raw := test.MakePacket([]uint16{
// Header
// Version, Count, Ts, SeqNo, Source
Expand Down Expand Up @@ -193,7 +199,7 @@ func TestOptionTemplates(t *testing.T) {
func TestCustomFields(t *testing.T) {
addr := test.MakeAddress(t, "127.0.0.1:12345")

conf := config.Defaults()
conf := config.Defaults(logp.L())
conf.WithCustomFields(fields.FieldDict{
fields.Key{EnterpriseID: 0x12345678, FieldID: 33}: &fields.Field{Name: "customField", Decoder: fields.String},
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,16 @@ import (

"github.com/stretchr/testify/assert"

"github.com/elastic/elastic-agent-libs/logp"

"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/config"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
)

func init() {
logp.TestingSetup()
}

type testProto int

func (testProto) Version() uint16 {
Expand Down Expand Up @@ -61,7 +67,7 @@ func TestRegistry_Get(t *testing.T) {
assert.NoError(t, err)
gen, err := registry.Get("my_proto")
assert.NoError(t, err)
assert.Equal(t, testProto(0), gen(config.Defaults()))
assert.Equal(t, testProto(0), gen(config.Defaults(logp.L())))
})
t.Run("two protocols", func(t *testing.T) {
registry := ProtocolRegistry{}
Expand All @@ -71,10 +77,10 @@ func TestRegistry_Get(t *testing.T) {
assert.NoError(t, err)
gen, err := registry.Get("my_proto")
assert.NoError(t, err)
assert.Equal(t, testProto(1), gen(config.Defaults()))
assert.Equal(t, testProto(1), gen(config.Defaults(logp.L())))
gen, err = registry.Get("other_proto")
assert.NoError(t, err)
assert.Equal(t, testProto(2), gen(config.Defaults()))
assert.Equal(t, testProto(2), gen(config.Defaults(logp.L())))
})
t.Run("not registered", func(t *testing.T) {
registry := ProtocolRegistry{}
Expand Down
5 changes: 0 additions & 5 deletions x-pack/filebeat/input/netflow/decoder/test/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ type TestLogWriter struct {
testing.TB
}

func (t TestLogWriter) Write(buf []byte) (int, error) {
t.Log(string(buf))
return len(buf), nil
}

func MakeAddress(t testing.TB, ipPortPair string) net.Addr {
ip, portS, err := net.SplitHostPort(ipPortPair)
if err != nil {
Expand Down
14 changes: 8 additions & 6 deletions x-pack/filebeat/input/netflow/decoder/v1/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/binary"
"fmt"
"io"
"log"
"net"
"time"

Expand All @@ -18,6 +17,7 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/protocol"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
"github.com/elastic/elastic-agent-libs/logp"
)

const (
Expand Down Expand Up @@ -52,21 +52,23 @@ var templateV1 = template.Template{
type ReadHeaderFn func(*bytes.Buffer, net.Addr) (int, time.Time, record.Map, error)

type NetflowProtocol struct {
logger *log.Logger
logger *logp.Logger
flowTemplate *template.Template
version uint16
readHeader ReadHeaderFn
}

func init() {
protocol.Registry.Register(ProtocolName, New)
if err := protocol.Registry.Register(ProtocolName, New); err != nil {
panic(err)
}
}

func New(config config.Config) protocol.Protocol {
return NewProtocol(ProtocolID, &templateV1, readV1Header, log.New(config.LogOutput(), LogPrefix, 0))
return NewProtocol(ProtocolID, &templateV1, readV1Header, config.LogOutput().Named(LogPrefix))
}

func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *log.Logger) protocol.Protocol {
func NewProtocol(version uint16, template *template.Template, readHeader ReadHeaderFn, logger *logp.Logger) protocol.Protocol {
return &NetflowProtocol{
logger: logger,
flowTemplate: template,
Expand All @@ -90,7 +92,7 @@ func (NetflowProtocol) Stop() error {
func (p *NetflowProtocol) OnPacket(buf *bytes.Buffer, source net.Addr) (flows []record.Record, err error) {
numFlows, timestamp, metadata, err := p.readHeader(buf, source)
if err != nil {
p.logger.Printf("Failed parsing packet: %v", err)
p.logger.Debugf("Failed parsing packet: %v", err)
return nil, fmt.Errorf("error reading netflow header: %w", err)
}
flows, err = p.flowTemplate.Apply(buf, numFlows)
Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/input/netflow/decoder/v1/v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ import (
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/record"
template2 "github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/template"
"github.com/elastic/beats/v7/x-pack/filebeat/input/netflow/decoder/test"
"github.com/elastic/elastic-agent-libs/logp"
)

func init() {
logp.TestingSetup()
}

func TestNetflowProtocol_New(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

assert.Nil(t, proto.Start())
assert.Equal(t, uint16(1), proto.Version())
assert.Nil(t, proto.Stop())
}

func TestNetflowProtocol_OnPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00010002000000015bf689f605946fb0" +
"acd910e5c0a8017b00000000000000000000000e00002cfa" +
Expand Down Expand Up @@ -105,7 +110,7 @@ func TestNetflowProtocol_OnPacket(t *testing.T) {
}

func TestNetflowProtocol_BadPacket(t *testing.T) {
proto := New(config.Defaults())
proto := New(config.Defaults(logp.L()))

rawS := "00010002000000015bf689f605"
raw, err := hex.DecodeString(rawS)
Expand Down
Loading
Loading