Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move GetMetrics from a pipe RPC to a controller component method. #702

Merged
merged 2 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ func (*noopController) GetHealth(context.Context, *protos.GetHealthRequest) (*pr
return nil, fmt.Errorf("controller.GetHealth not implemented")
}

// GetLoad implements controller nterface.
func (*noopController) GetLoad(context.Context, *protos.GetLoadRequest) (*protos.GetLoadReply, error) {
return nil, fmt.Errorf("controller.GetLoad not implemented")
}

// GetMetrics implements controller nterface.
func (*noopController) GetMetrics(context.Context, *protos.GetMetricsRequest) (*protos.GetMetricsReply, error) {
return nil, fmt.Errorf("controller.GetMetrics not implemented")
Expand Down
3 changes: 3 additions & 0 deletions internal/control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ type Controller interface {
// GetHealth fetches weavelet health information.
GetHealth(context.Context, *protos.GetHealthRequest) (*protos.GetHealthReply, error)

// GetLoad fetches weavelet load information.
GetLoad(context.Context, *protos.GetLoadRequest) (*protos.GetLoadReply, error)

// GetMetrics fetches metrics from the weavelet.
GetMetrics(context.Context, *protos.GetMetricsRequest) (*protos.GetMetricsReply, error)

Expand Down
2 changes: 1 addition & 1 deletion internal/envelope/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func makeConnections(t *testing.T, handler conn.EnvelopeHandler) (*conn.Envelope
panic(err)
}
created <- struct{}{}
err = w.Serve(ctx, nil)
err = w.Serve(ctx)
weaveletDone <- err
}()

Expand Down
30 changes: 0 additions & 30 deletions internal/envelope/conn/envelope_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,36 +249,6 @@ func (e *EnvelopeConn) handleMessage(msg *protos.WeaveletMsg, h EnvelopeHandler)
}
}

// GetLoadRPC gets a load report from the weavelet.
func (e *EnvelopeConn) GetLoadRPC() (*protos.LoadReport, error) {
req := &protos.EnvelopeMsg{GetLoadRequest: &protos.GetLoadRequest{}}
reply, err := e.rpc(req)
if err != nil {
return nil, err
}
if reply.GetLoadReply == nil {
return nil, fmt.Errorf("nil GetLoadReply received from weavelet")
}
return reply.GetLoadReply.Load, nil
}

func (e *EnvelopeConn) rpc(request *protos.EnvelopeMsg) (*protos.WeaveletMsg, error) {
response, err := e.conn.doBlockingRPC(request)
if err != nil {
err := fmt.Errorf("connection to weavelet broken: %w", err)
e.conn.cleanup(err)
return nil, err
}
msg, ok := response.(*protos.WeaveletMsg)
if !ok {
return nil, fmt.Errorf("weavelet response has wrong type %T", response)
}
if msg.Error != "" {
return nil, fmt.Errorf(msg.Error)
}
return msg, nil
}

// verifyWeaveletInfo verifies the information sent by the weavelet.
func verifyWeaveletInfo(wlet *protos.WeaveletInfo) error {
if wlet == nil {
Expand Down
54 changes: 8 additions & 46 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,6 @@ import (
"github.com/ServiceWeaver/weaver/runtime/version"
)

// WeaveletHandler handles messages from the envelope. A handler should not
// block and should not perform RPCs over the pipe. Values passed to the
// handlers are only valid for the duration of the handler's execution.
type WeaveletHandler interface {
// TODO(mwhittaker): Add context.Context to these methods?

// GetLoad returns a load report.
GetLoad(*protos.GetLoadRequest) (*protos.GetLoadReply, error)
}

// WeaveletConn is the weavelet side of the connection between a weavelet and
// an envelope. For more information, refer to runtime/protos/runtime.proto and
// https://serviceweaver.dev/blog/deployers.html.
Expand Down Expand Up @@ -97,24 +87,21 @@ func NewWeaveletConn(r io.ReadCloser, w io.WriteCloser) (*WeaveletConn, error) {
return wc, nil
}

// Serve accepts RPC requests from the envelope. Requests are handled serially
// in the order they are received.
func (w *WeaveletConn) Serve(ctx context.Context, h WeaveletHandler) error {
// Serve handles RPC responses from the envelope.
func (w *WeaveletConn) Serve(ctx context.Context) error {
go func() {
<-ctx.Done()
w.conn.cleanup(ctx.Err())
}()

msg := &protos.EnvelopeMsg{}
for ctx.Err() == nil {
if err := w.conn.recv(msg); err != nil {
return err
}
if err := w.handleMessage(h, msg); err != nil {
return err
}
if err := w.conn.recv(msg); err != nil {
return err
}
return ctx.Err()
// We do not support any requests initiated by the envelope.
err := fmt.Errorf("weavelet_conn: unexpected message %+v", msg)
w.conn.cleanup(err)
return err
}

// EnvelopeInfo returns the EnvelopeInfo received from the envelope.
Expand All @@ -132,31 +119,6 @@ func (w *WeaveletConn) Listener() net.Listener {
return w.lis
}

// handleMessage handles all RPC requests initiated by the envelope. Note that
// this method doesn't handle RPC replies from the envelope.
func (w *WeaveletConn) handleMessage(handler WeaveletHandler, msg *protos.EnvelopeMsg) error {
errstring := func(err error) string {
if err == nil {
return ""
}
return err.Error()
}

switch {
case msg.GetLoadRequest != nil:
reply, err := handler.GetLoad(msg.GetLoadRequest)
return w.conn.send(&protos.WeaveletMsg{
Id: -msg.Id,
Error: errstring(err),
GetLoadReply: reply,
})
default:
err := fmt.Errorf("weavelet_conn: unexpected message %+v", msg)
w.conn.cleanup(err)
return err
}
}

// ActivateComponentRPC ensures that the provided component is running
// somewhere. A call to ActivateComponentRPC also implicitly signals that a
// weavelet is interested in receiving routing info for the component.
Expand Down
8 changes: 4 additions & 4 deletions internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,9 +217,9 @@ func NewRemoteWeavelet(ctx context.Context, regs []*codegen.Registration, bootst
return nil
})

// Serve deployer API requests on the weavelet conn.
// Handle the weavelet side of the connection with the envelope.
servers.Go(func() error {
if err := w.conn.Serve(ctx, w); err != nil {
if err := w.conn.Serve(ctx); err != nil {
w.syslogger.Error("weavelet conn failed", "err", err)
return err
}
Expand Down Expand Up @@ -450,8 +450,8 @@ func (w *RemoteWeavelet) makeStub(fullName string, reg *codegen.Registration, re
return call.NewStub(fullName, reg, conn, w.tracer, w.opts.InjectRetries), nil
}

// GetLoad implements the conn.WeaveletHandler interface.
func (w *RemoteWeavelet) GetLoad(*protos.GetLoadRequest) (*protos.GetLoadReply, error) {
// GetLoad implements controller nterface.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// GetLoad implements controller nterface.
// GetLoad implements controller interface.

func (w *RemoteWeavelet) GetLoad(context.Context, *protos.GetLoadRequest) (*protos.GetLoadReply, error) {
report := &protos.LoadReport{
Loads: map[string]*protos.LoadReport_ComponentLoad{},
}
Expand Down
7 changes: 6 additions & 1 deletion runtime/envelope/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,12 @@ func (e *Envelope) GetMetrics() ([]*metrics.MetricSnapshot, error) {

// GetLoad gets a load report from the weavelet.
func (e *Envelope) GetLoad() (*protos.LoadReport, error) {
return e.conn.GetLoadRPC()
req := &protos.GetLoadRequest{}
reply, err := e.controller.GetLoad(context.TODO(), req)
if err != nil {
return nil, err
}
return reply.Load, nil
}

// UpdateComponents updates the weavelet with the latest set of components it
Expand Down
4 changes: 2 additions & 2 deletions runtime/envelope/envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestMain(m *testing.M) {
return nil
},
"writetraces": func() error { return writeTraces(conn) },
"serve_conn": func() error { return conn.Serve(context.Background(), nil) },
"serve_conn": func() error { return conn.Serve(context.Background()) },
}
fn, ok := cmds[cmd]
if !ok {
Expand All @@ -99,7 +99,7 @@ func TestMain(m *testing.M) {
fmt.Fprintf(os.Stderr, "subprocess: %v\n", err)
os.Exit(1)
}
conn.Serve(context.Background(), nil)
conn.Serve(context.Background())
}

var err error
Expand Down
Loading