Skip to content

Commit

Permalink
Remote buffer size config (#187)
Browse files Browse the repository at this point in the history
* increase buff limit

* buff size as configuration
  • Loading branch information
tprifti authored Feb 17, 2025
1 parent 267a02f commit 91ef524
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 6 deletions.
21 changes: 19 additions & 2 deletions remote/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ import (
"sync/atomic"

"github.com/anthdm/hollywood/actor"
"storj.io/drpc/drpcmanager"
"storj.io/drpc/drpcmux"
"storj.io/drpc/drpcserver"
"storj.io/drpc/drpcwire"
)

// Config holds the remote configuration.
type Config struct {
TLSConfig *tls.Config
BuffSize int
// Wg *sync.WaitGroup
}

Expand All @@ -32,6 +35,14 @@ func (c Config) WithTLS(tlsconf *tls.Config) Config {
return c
}

// Set the buffer size of the stream reader.
// If not provided, the default buffer size is 4MB
// defined by drpc package
func (c Config) WithBufferSize(size int) Config {
c.BuffSize = size
return c
}

type Remote struct {
addr string
engine *actor.Engine
Expand Down Expand Up @@ -83,10 +94,16 @@ func (r *Remote) Start(e *actor.Engine) error {
if err != nil {
return fmt.Errorf("failed to register remote: %w", err)
}
s := drpcserver.New(mux)
s := drpcserver.NewWithOptions(mux, drpcserver.Options{
Manager: drpcmanager.Options{
Reader: drpcwire.ReaderOptions{
MaximumBufferSize: r.config.BuffSize,
},
},
})

r.streamRouterPID = r.engine.Spawn(
newStreamRouter(r.engine, r.config.TLSConfig),
newStreamRouter(r.engine, r.config.TLSConfig, r.config.BuffSize),
"router", actor.WithInboxSize(1024*1024))
slog.Debug("server started", "listenAddr", r.addr)
r.stopWg = &sync.WaitGroup{}
Expand Down
6 changes: 4 additions & 2 deletions remote/stream_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ type streamRouter struct {
streams map[string]*actor.PID
pid *actor.PID
tlsConfig *tls.Config
buffSize int
}

func newStreamRouter(e *actor.Engine, tlsConfig *tls.Config) actor.Producer {
func newStreamRouter(e *actor.Engine, tlsConfig *tls.Config, buffSize int) actor.Producer {
return func() actor.Receiver {
return &streamRouter{
streams: make(map[string]*actor.PID),
engine: e,
tlsConfig: tlsConfig,
buffSize: buffSize,
}
}
}
Expand Down Expand Up @@ -60,7 +62,7 @@ func (s *streamRouter) deliverStream(msg *streamDeliver) {

swpid, ok = s.streams[address]
if !ok {
swpid = s.engine.SpawnProc(newStreamWriter(s.engine, s.pid, address, s.tlsConfig))
swpid = s.engine.SpawnProc(newStreamWriter(s.engine, s.pid, address, s.tlsConfig, s.buffSize))
s.streams[address] = swpid
}

Expand Down
14 changes: 12 additions & 2 deletions remote/stream_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (

"github.com/anthdm/hollywood/actor"
"storj.io/drpc/drpcconn"
"storj.io/drpc/drpcmanager"
"storj.io/drpc/drpcwire"
)

const (
Expand All @@ -29,9 +31,10 @@ type streamWriter struct {
inbox actor.Inboxer
serializer Serializer
tlsConfig *tls.Config
buffSize int
}

func newStreamWriter(e *actor.Engine, rpid *actor.PID, address string, tlsConfig *tls.Config) actor.Processer {
func newStreamWriter(e *actor.Engine, rpid *actor.PID, address string, tlsConfig *tls.Config, buffSize int) actor.Processer {
return &streamWriter{
writeToAddr: address,
engine: e,
Expand All @@ -40,6 +43,7 @@ func newStreamWriter(e *actor.Engine, rpid *actor.PID, address string, tlsConfig
pid: actor.NewPID(e.Address(), "stream"+"/"+address),
serializer: ProtoSerializer{},
tlsConfig: tlsConfig,
buffSize: buffSize,
}
}

Expand Down Expand Up @@ -151,7 +155,13 @@ func (s *streamWriter) init() {
return
}

conn := drpcconn.New(rawconn)
conn := drpcconn.NewWithOptions(rawconn, drpcconn.Options{
Manager: drpcmanager.Options{
Reader: drpcwire.ReaderOptions{
MaximumBufferSize: s.buffSize,
},
},
})
client := NewDRPCRemoteClient(conn)

stream, err := client.Receive(context.Background())
Expand Down

0 comments on commit 91ef524

Please sign in to comment.