Skip to content

Commit

Permalink
Move GetHealth from a pipe RPC to a controller component method.
Browse files Browse the repository at this point in the history
  • Loading branch information
ghemawat committed Dec 27, 2023
1 parent abde5ca commit 0d0be48
Show file tree
Hide file tree
Showing 13 changed files with 346 additions and 236 deletions.
5 changes: 5 additions & 0 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ func (*noopController) UpdateRoutingInfo(context.Context, *protos.UpdateRoutingI
return nil, fmt.Errorf("controller.UpdateRoutingInfo not implemented")
}

// GetHealth implements controller nterface.
func (*noopController) GetHealth(context.Context, *protos.GetHealthRequest) (*protos.GetHealthReply, error) {
return nil, fmt.Errorf("controller.GetHealth not implemented")
}

// GetProfile implements controller nterface.
func (*noopController) GetProfile(context.Context, *protos.GetProfileRequest) (*protos.GetProfileReply, error) {
return nil, fmt.Errorf("controller.GetProfile not implemented")
Expand Down
3 changes: 3 additions & 0 deletions internal/control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type Controller interface {
// UpdateRoutingInfo updates the weavelet with a component's most recent routing info.
UpdateRoutingInfo(context.Context, *protos.UpdateRoutingInfoRequest) (*protos.UpdateRoutingInfoReply, error)

// GetHealth fetches weavelet health information.
GetHealth(context.Context, *protos.GetHealthRequest) (*protos.GetHealthReply, error)

// GetProfile gets a profile from the weavelet.
GetProfile(context.Context, *protos.GetProfileRequest) (*protos.GetProfileReply, error)
}
17 changes: 2 additions & 15 deletions internal/envelope/conn/envelope_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,8 @@ func NewEnvelopeConn(ctx context.Context, r io.ReadCloser, w io.WriteCloser, inf
// }
//
// If an EnvelopeHandler, invoked by handleMessage, calls an RPC on the
// weavelet (e.g., GetHealth), then it will block forever, as the RPC
// response will never be read by conn.recv.
// weavelet, then it will block forever, as the RPC response will never
// be read by conn.recv.
e.running.Go(func() error {
for {
msg := &protos.WeaveletMsg{}
Expand Down Expand Up @@ -265,19 +265,6 @@ func (e *EnvelopeConn) GetMetricsRPC() ([]*metrics.MetricSnapshot, error) {
return e.metrics.Import(reply.GetMetricsReply.Update)
}

// GetHealthRPC gets a weavelet's health.
func (e *EnvelopeConn) GetHealthRPC() (protos.HealthStatus, error) {
req := &protos.EnvelopeMsg{GetHealthRequest: &protos.GetHealthRequest{}}
reply, err := e.rpc(req)
if err != nil {
return protos.HealthStatus_UNHEALTHY, err
}
if reply.GetHealthReply == nil {
return protos.HealthStatus_UNHEALTHY, fmt.Errorf("nil HealthStatusReply received from weavelet")
}
return reply.GetHealthReply.Status, nil
}

// GetLoadRPC gets a load report from the weavelet.
func (e *EnvelopeConn) GetLoadRPC() (*protos.LoadReport, error) {
req := &protos.EnvelopeMsg{GetLoadRequest: &protos.GetLoadRequest{}}
Expand Down
5 changes: 0 additions & 5 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,6 @@ func (w *WeaveletConn) handleMessage(handler WeaveletHandler, msg *protos.Envelo
Id: -msg.Id,
GetMetricsReply: &protos.GetMetricsReply{Update: update},
})
case msg.GetHealthRequest != nil:
return w.conn.send(&protos.WeaveletMsg{
Id: -msg.Id,
GetHealthReply: &protos.GetHealthReply{Status: protos.HealthStatus_HEALTHY},
})
case msg.GetLoadRequest != nil:
reply, err := handler.GetLoad(msg.GetLoadRequest)
return w.conn.send(&protos.WeaveletMsg{
Expand Down
5 changes: 5 additions & 0 deletions internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,11 @@ func (w *RemoteWeavelet) UpdateRoutingInfo(ctx context.Context, req *protos.Upda
return &protos.UpdateRoutingInfoReply{}, nil
}

// GetHealth implements controller.GetHealth.
func (w *RemoteWeavelet) GetHealth(ctx context.Context, req *protos.GetHealthRequest) (*protos.GetHealthReply, error) {
return &protos.GetHealthReply{Status: protos.HealthStatus_HEALTHY}, nil
}

// GetProfile implements controller.GetProfile.
func (w *RemoteWeavelet) GetProfile(ctx context.Context, req *protos.GetProfileRequest) (*protos.GetProfileReply, error) {
data, err := getProfile(ctx, req)
Expand Down
6 changes: 3 additions & 3 deletions runtime/envelope/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,11 +311,11 @@ func (e *Envelope) WeaveletInfo() *protos.WeaveletInfo {

// GetHealth returns the health status of the weavelet.
func (e *Envelope) GetHealth() protos.HealthStatus {
status, err := e.conn.GetHealthRPC()
reply, err := e.controller.GetHealth(context.TODO(), &protos.GetHealthRequest{})
if err != nil {
return protos.HealthStatus_UNHEALTHY
return protos.HealthStatus_UNKNOWN
}
return status
return reply.Status
}

// GetProfile gets a profile from the weavelet.
Expand Down
5 changes: 2 additions & 3 deletions runtime/envelope/envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,9 +432,8 @@ func TestRPCBeforeServe(t *testing.T) {
if err != nil {
t.Fatal(err)
}
health := e.GetHealth()
if health != protos.HealthStatus_HEALTHY {
t.Fatalf("expected healthy, got %v", health)
if _, err := e.GetMetrics(); err != nil {
t.Fatalf("expected metrics, got %v", err)
}
h := &handlerForTest{logSaver: testSaver(t)}

Expand Down
366 changes: 170 additions & 196 deletions runtime/protos/runtime.pb.go

Large diffs are not rendered by default.

6 changes: 2 additions & 4 deletions runtime/protos/runtime.proto
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ message EnvelopeMsg {
EnvelopeInfo envelope_info = 2;

// Envelope initiated RPC requests.
GetHealthRequest get_health_request = 3;
GetMetricsRequest get_metrics_request = 4;
GetLoadRequest get_load_request = 5;

Expand All @@ -110,7 +109,7 @@ message EnvelopeMsg {
VerifyClientCertificateReply verify_client_certificate_reply = 13;
VerifyServerCertificateReply verify_server_certificate_reply = 14;

reserved 6, 7, 8;
reserved 3, 6, 7, 8;
}

// WeaveletMsg is a message sent by a weavelet to an envelope.
Expand All @@ -130,7 +129,6 @@ message WeaveletMsg {

// Envelope initiated RPC replies.
string error = 5; // non-nil on error
GetHealthReply get_health_reply = 6;
GetMetricsReply get_metrics_reply = 7;
GetLoadReply get_load_reply = 8;

Expand All @@ -142,7 +140,7 @@ message WeaveletMsg {
VerifyClientCertificateRequest verify_client_certificate_request = 15;
VerifyServerCertificateRequest verify_server_certificate_request = 16;

reserved 9, 10, 11;
reserved 6, 9, 10, 11;
}

// EnvelopeInfo is the information provided by an envelope to a weavelet during
Expand Down
2 changes: 1 addition & 1 deletion runtime/protos/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestDeployerVersion(t *testing.T) {
got := fmt.Sprintf("%x", h.Sum(nil))

// If runtime.proto has changed, the deployer API version may need updating.
const want = "ebf594eced1e857f5cd24af3d95988e82260a61baf2e86100ee85ac6ac8d6ea3"
const want = "a4c589df11159cafac26f242d5be7a55116c399b2e2a3e75db347a42b5b9e991"
if got != want {
t.Fatalf(`Unexpected SHA-256 hash of runtime.proto: got %s, want %s. If this change is meaningful, REMEMBER TO UPDATE THE DEPLOYER API VERSION in runtime/version/version.go.`, got, want)
}
Expand Down
151 changes: 146 additions & 5 deletions weaver_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0d0be48

Please sign in to comment.