From 007b5c6be12b3225aab814fc81ecd35022f3da3f Mon Sep 17 00:00:00 2001 From: Thomas Merritt Date: Tue, 23 May 2023 15:16:52 -0400 Subject: [PATCH] Use multiple UDP sockets for sending statsd packets. --- statsd/options.go | 15 +++++++++++++++ statsd/statsd.go | 6 +++--- statsd/telemetry.go | 2 +- statsd/udp.go | 46 +++++++++++++++++++++++++++++++++------------ 4 files changed, 53 insertions(+), 16 deletions(-) diff --git a/statsd/options.go b/statsd/options.go index 0728a976b..7071a30e6 100644 --- a/statsd/options.go +++ b/statsd/options.go @@ -24,6 +24,7 @@ var ( defaultAggregation = true defaultExtendedAggregation = false defaultOriginDetection = true + defaultUDPSocketCount = 1 ) // Options contains the configuration options for a client. @@ -46,6 +47,7 @@ type Options struct { telemetryAddr string originDetection bool containerID string + udpSocketCount int } func resolveOptions(options []Option) (*Options, error) { @@ -66,6 +68,7 @@ func resolveOptions(options []Option) (*Options, error) { aggregation: defaultAggregation, extendedAggregation: defaultExtendedAggregation, originDetection: defaultOriginDetection, + udpSocketCount: defaultUDPSocketCount, } for _, option := range options { @@ -346,3 +349,15 @@ func WithContainerID(id string) Option { return nil } } + +// WithUDPSocketCount allows setting the number of UDP sockets that statsd packets will be written on. This allows for +// high packet rates on systems with large numbers of cores. +func WithUDPSocketCount(count int) Option { + return func(o *Options) error { + if count < 1 { + return fmt.Errorf("UDPSocketCount must be a positive integer") + } + o.udpSocketCount = count + return nil + } +} diff --git a/statsd/statsd.go b/statsd/statsd.go index 378581b9b..423b8fcbe 100644 --- a/statsd/statsd.go +++ b/statsd/statsd.go @@ -338,7 +338,7 @@ func parseAgentURL(agentURL string) string { return "" } -func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, string, error) { +func createWriter(addr string, writeTimeout time.Duration, socketCount int) (io.WriteCloser, string, error) { addr = resolveAddr(addr) if addr == "" { return nil, "", errors.New("No address passed and autodetection from environment failed") @@ -352,7 +352,7 @@ func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, stri w, err := newUDSWriter(addr[len(UnixAddressPrefix):], writeTimeout) return w, writerNameUDS, err default: - w, err := newUDPWriter(addr, writeTimeout) + w, err := newUDPWriter(addr, writeTimeout, socketCount) return w, writerNameUDP, err } } @@ -365,7 +365,7 @@ func New(addr string, options ...Option) (*Client, error) { return nil, err } - w, writerType, err := createWriter(addr, o.writeTimeout) + w, writerType, err := createWriter(addr, o.writeTimeout, o.udpSocketCount) if err != nil { return nil, err } diff --git a/statsd/telemetry.go b/statsd/telemetry.go index 1e2bc0a3f..2d05f7845 100644 --- a/statsd/telemetry.go +++ b/statsd/telemetry.go @@ -140,7 +140,7 @@ func newTelemetryClient(c *Client, transport string, aggregationEnabled bool) *t } func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration) (*telemetryClient, error) { - telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout) + telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout, 1) if err != nil { return nil, fmt.Errorf("Could not resolve telemetry address: %v", err) } diff --git a/statsd/udp.go b/statsd/udp.go index e2922a911..ad0933de3 100644 --- a/statsd/udp.go +++ b/statsd/udp.go @@ -1,34 +1,56 @@ package statsd import ( + "math/rand" "net" + "sync" "time" ) // udpWriter is an internal class wrapping around management of UDP connection type udpWriter struct { - conn net.Conn + conns []net.Conn + random *rand.Rand + randomLock sync.Mutex } // New returns a pointer to a new udpWriter given an addr in the format "hostname:port". -func newUDPWriter(addr string, _ time.Duration) (*udpWriter, error) { - udpAddr, err := net.ResolveUDPAddr("udp", addr) - if err != nil { - return nil, err +func newUDPWriter(addr string, _ time.Duration, udpSocketCount int) (*udpWriter, error) { + conns := make([]net.Conn, udpSocketCount) + for i := range conns { + udpAddr, err := net.ResolveUDPAddr("udp", addr) + if err != nil { + return nil, err + } + conn, err := net.DialUDP("udp", nil, udpAddr) + if err != nil { + return nil, err + } + conns[i] = conn } - conn, err := net.DialUDP("udp", nil, udpAddr) - if err != nil { - return nil, err - } - writer := &udpWriter{conn: conn} + random := rand.New(rand.NewSource(time.Now().UnixNano())) + writer := &udpWriter{conns: conns, random: random} return writer, nil } // Write data to the UDP connection with no error handling func (w *udpWriter) Write(data []byte) (int, error) { - return w.conn.Write(data) + if len(w.conns) == 1 { + return w.conns[0].Write(data) + } + w.randomLock.Lock() + c := w.random.Intn(len(w.conns)) + w.randomLock.Unlock() + return w.conns[c].Write(data) } func (w *udpWriter) Close() error { - return w.conn.Close() + var err error + for i := range w.conns { + e := w.conns[i].Close() + if e != nil { + err = e + } + } + return err }