Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use multiple UDP sockets for sending statsd packets. #279

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions statsd/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var (
defaultAggregation = true
defaultExtendedAggregation = false
defaultOriginDetection = true
defaultUDPSocketCount = 1
)

// Options contains the configuration options for a client.
Expand All @@ -46,6 +47,7 @@ type Options struct {
telemetryAddr string
originDetection bool
containerID string
udpSocketCount int
}

func resolveOptions(options []Option) (*Options, error) {
Expand All @@ -66,6 +68,7 @@ func resolveOptions(options []Option) (*Options, error) {
aggregation: defaultAggregation,
extendedAggregation: defaultExtendedAggregation,
originDetection: defaultOriginDetection,
udpSocketCount: defaultUDPSocketCount,
}

for _, option := range options {
Expand Down Expand Up @@ -346,3 +349,15 @@ func WithContainerID(id string) Option {
return nil
}
}

// WithUDPSocketCount allows setting the number of UDP sockets that statsd packets will be written on. This allows for
// high packet rates on systems with large numbers of cores.
func WithUDPSocketCount(count int) Option {
return func(o *Options) error {
if count < 1 {
return fmt.Errorf("UDPSocketCount must be a positive integer")
}
o.udpSocketCount = count
return nil
}
}
6 changes: 3 additions & 3 deletions statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ func parseAgentURL(agentURL string) string {
return ""
}

func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, string, error) {
func createWriter(addr string, writeTimeout time.Duration, socketCount int) (io.WriteCloser, string, error) {
addr = resolveAddr(addr)
if addr == "" {
return nil, "", errors.New("No address passed and autodetection from environment failed")
Expand All @@ -352,7 +352,7 @@ func createWriter(addr string, writeTimeout time.Duration) (io.WriteCloser, stri
w, err := newUDSWriter(addr[len(UnixAddressPrefix):], writeTimeout)
return w, writerNameUDS, err
default:
w, err := newUDPWriter(addr, writeTimeout)
w, err := newUDPWriter(addr, writeTimeout, socketCount)
return w, writerNameUDP, err
}
}
Expand All @@ -365,7 +365,7 @@ func New(addr string, options ...Option) (*Client, error) {
return nil, err
}

w, writerType, err := createWriter(addr, o.writeTimeout)
w, writerType, err := createWriter(addr, o.writeTimeout, o.udpSocketCount)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion statsd/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func newTelemetryClient(c *Client, transport string, aggregationEnabled bool) *t
}

func newTelemetryClientWithCustomAddr(c *Client, transport string, telemetryAddr string, aggregationEnabled bool, pool *bufferPool, writeTimeout time.Duration) (*telemetryClient, error) {
telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout)
telemetryWriter, _, err := createWriter(telemetryAddr, writeTimeout, 1)
if err != nil {
return nil, fmt.Errorf("Could not resolve telemetry address: %v", err)
}
Expand Down
46 changes: 34 additions & 12 deletions statsd/udp.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,56 @@
package statsd

import (
"math/rand"
"net"
"sync"
"time"
)

// udpWriter is an internal class wrapping around management of UDP connection
type udpWriter struct {
conn net.Conn
conns []net.Conn
random *rand.Rand
randomLock sync.Mutex
}

// New returns a pointer to a new udpWriter given an addr in the format "hostname:port".
func newUDPWriter(addr string, _ time.Duration) (*udpWriter, error) {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
func newUDPWriter(addr string, _ time.Duration, udpSocketCount int) (*udpWriter, error) {
conns := make([]net.Conn, udpSocketCount)
for i := range conns {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
conns[i] = conn
}
conn, err := net.DialUDP("udp", nil, udpAddr)
if err != nil {
return nil, err
}
writer := &udpWriter{conn: conn}
random := rand.New(rand.NewSource(time.Now().UnixNano()))
writer := &udpWriter{conns: conns, random: random}
return writer, nil
}

// Write data to the UDP connection with no error handling
func (w *udpWriter) Write(data []byte) (int, error) {
return w.conn.Write(data)
if len(w.conns) == 1 {
return w.conns[0].Write(data)
}
w.randomLock.Lock()
c := w.random.Intn(len(w.conns))
w.randomLock.Unlock()
return w.conns[c].Write(data)
}

func (w *udpWriter) Close() error {
return w.conn.Close()
var err error
for i := range w.conns {
e := w.conns[i].Close()
if e != nil {
err = e
}
}
return err
}