Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use component method calls for some of the envelope->weavelet calls. #686

Merged
merged 3 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package weaver

import (
"context"
"fmt"

"github.com/ServiceWeaver/weaver/runtime/protos"
)

// controller is a component hosted in every weavelet. Deployers make calls to this component to
// fetch information about the weavelet, and to make it do various things.
//
// Arguments and results are protobufs to allow deployers to evolve independently of application
// binaries.
type controller interface {
ghemawat marked this conversation as resolved.
Show resolved Hide resolved
// GetHealth returns the health of the weavelet.
GetHealth(context.Context, *protos.GetHealthRequest) (*protos.GetHealthReply, error)

// UpdateComponents updates the weavelet with the latest set of components it
// should be running.
UpdateComponents(context.Context, *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error)
}

// noopController is a no-op implementation of controller. It exists solely to cause
// controller to be registered as a component. The actual implementation is provided
// by internal/weaver/remoteweavelet.go
type noopController struct {
Implements[controller]
}

var _ controller = &noopController{}

// GetHealth implements controller interface.
func (*noopController) GetHealth(context.Context, *protos.GetHealthRequest) (*protos.GetHealthReply, error) {
return nil, fmt.Errorf("controller.GetHealth not implemented")
}

// UpdateComponents implements controller interface.
func (*noopController) UpdateComponents(context.Context, *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error) {
return nil, fmt.Errorf("controller.UpdateComponents not implemented")
}
19 changes: 19 additions & 0 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ github.com/ServiceWeaver/weaver/internal/weaver
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/codegen
github.com/ServiceWeaver/weaver/runtime/colors
github.com/ServiceWeaver/weaver/runtime/deployers
github.com/ServiceWeaver/weaver/runtime/logging
github.com/ServiceWeaver/weaver/runtime/metrics
github.com/ServiceWeaver/weaver/runtime/protos
Expand Down Expand Up @@ -812,21 +813,31 @@ github.com/ServiceWeaver/weaver/runtime/colors
strings
github.com/ServiceWeaver/weaver/runtime/deployers
context
fmt
github.com/ServiceWeaver/weaver/internal/net/call
log/slog
net
path/filepath
sync
github.com/ServiceWeaver/weaver/runtime/envelope
bufio
context
errors
fmt
github.com/ServiceWeaver/weaver/internal/envelope/conn
github.com/ServiceWeaver/weaver/internal/net/call
github.com/ServiceWeaver/weaver/internal/pipe
github.com/ServiceWeaver/weaver/internal/reflection
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/codegen
github.com/ServiceWeaver/weaver/runtime/deployers
github.com/ServiceWeaver/weaver/runtime/metrics
github.com/ServiceWeaver/weaver/runtime/protomsg
github.com/ServiceWeaver/weaver/runtime/protos
go.opentelemetry.io/otel/trace
golang.org/x/sync/errgroup
io
log/slog
os
strconv
sync
Expand Down Expand Up @@ -983,12 +994,14 @@ github.com/ServiceWeaver/weaver/weavertest
github.com/ServiceWeaver/weaver/internal/weaver
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/codegen
github.com/ServiceWeaver/weaver/runtime/deployers
github.com/ServiceWeaver/weaver/runtime/envelope
github.com/ServiceWeaver/weaver/runtime/logging
github.com/ServiceWeaver/weaver/runtime/protos
github.com/google/uuid
golang.org/x/exp/maps
golang.org/x/sync/errgroup
log/slog
os
reflect
regexp
Expand Down Expand Up @@ -1066,17 +1079,22 @@ github.com/ServiceWeaver/weaver/website/blog/deployers/multi
context
flag
fmt
github.com/ServiceWeaver/weaver
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/colors
github.com/ServiceWeaver/weaver/runtime/envelope
github.com/ServiceWeaver/weaver/runtime/logging
github.com/ServiceWeaver/weaver/runtime/protos
github.com/google/uuid
os
sync
github.com/ServiceWeaver/weaver/website/blog/deployers/pipes
context
flag
fmt
github.com/ServiceWeaver/weaver
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/deployers
github.com/ServiceWeaver/weaver/runtime/protomsg
github.com/ServiceWeaver/weaver/runtime/protos
github.com/google/uuid
Expand All @@ -1087,6 +1105,7 @@ github.com/ServiceWeaver/weaver/website/blog/deployers/single
context
flag
fmt
github.com/ServiceWeaver/weaver
github.com/ServiceWeaver/weaver/runtime/colors
github.com/ServiceWeaver/weaver/runtime/envelope
github.com/ServiceWeaver/weaver/runtime/logging
Expand Down
2 changes: 2 additions & 0 deletions internal/envelope/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ServiceWeaver/weaver/internal/reflection"
"github.com/ServiceWeaver/weaver/metrics"
"github.com/ServiceWeaver/weaver/runtime/codegen"
"github.com/ServiceWeaver/weaver/runtime/deployers"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -106,6 +107,7 @@ func makeConnections(t *testing.T, handler conn.EnvelopeHandler) (*conn.Envelope
App: "app",
DeploymentId: uuid.New().String(),
Id: uuid.New().String(),
ControlSocket: deployers.NewUnixSocketPath(t.TempDir()),
InternalAddress: "localhost:0",
}

Expand Down
4 changes: 2 additions & 2 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type WeaveletHandler interface {

// UpdateComponents updates the set of components the weavelet should be
// running. Currently, the set of components only increases over time.
UpdateComponents(*protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error)
UpdateComponents(context.Context, *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error)

// UpdateRoutingInfo updates a component's routing information.
UpdateRoutingInfo(*protos.UpdateRoutingInfoRequest) (*protos.UpdateRoutingInfoReply, error)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (w *WeaveletConn) handleMessage(handler WeaveletHandler, msg *protos.Envelo
}()
return nil
case msg.UpdateComponentsRequest != nil:
reply, err := handler.UpdateComponents(msg.UpdateComponentsRequest)
reply, err := handler.UpdateComponents(context.Background(), msg.UpdateComponentsRequest)
return w.conn.send(&protos.WeaveletMsg{
Id: -msg.Id,
Error: errstring(err),
Expand Down
2 changes: 1 addition & 1 deletion internal/net/call/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var _ codegen.Stub = &stub{}
func NewStub(name string, reg *codegen.Registration, conn Connection, tracer trace.Tracer, injectRetries int) codegen.Stub {
return &stub{
conn: conn,
methods: makeStubMethods(name, reg), // XXX Remove reg.Name arg from makeStubMethods
methods: makeStubMethods(name, reg),
tracer: tracer,
injectRetries: injectRetries,
}
Expand Down
14 changes: 9 additions & 5 deletions internal/testdeployer/remoteweavelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ServiceWeaver/weaver/internal/weaver"
"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/codegen"
"github.com/ServiceWeaver/weaver/runtime/deployers"
"github.com/ServiceWeaver/weaver/runtime/envelope"
"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/protomsg"
Expand Down Expand Up @@ -212,6 +213,7 @@ func deployWithInfo(t *testing.T, ctx context.Context, placement map[string][]st
for name := range placement {
info := protomsg.Clone(info)
info.Id = uuid.New().String()
info.ControlSocket = deployers.NewUnixSocketPath(t.TempDir())
weavelet, err := spawn(ctx, info, d)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -244,7 +246,7 @@ func (d *deployer) ActivateComponent(ctx context.Context, req *protos.ActivateCo
replicas := []string{}
for _, name := range d.placedAt[req.Component] {
weavelet := d.weavelets[name]
if _, err := weavelet.wlet.UpdateComponents(components); err != nil {
if _, err := weavelet.wlet.UpdateComponents(ctx, components); err != nil {
return nil, err
}
replicas = append(replicas, weavelet.env.WeaveletInfo().DialAddr)
Expand Down Expand Up @@ -416,6 +418,7 @@ func TestLocalhostWeaveletAddress(t *testing.T) {
d := deployWithInfo(t, context.Background(), colocated, &protos.EnvelopeInfo{
App: "remoteweavelet_test.go",
DeploymentId: fmt.Sprint(os.Getpid()),
ControlSocket: deployers.NewUnixSocketPath(t.TempDir()),
InternalAddress: "localhost:12345",
})
defer d.shutdown()
Expand Down Expand Up @@ -443,6 +446,7 @@ func TestHostnameWeaveletAddress(t *testing.T) {
d := deployWithInfo(t, context.Background(), colocated, &protos.EnvelopeInfo{
App: "remoteweavelet_test.go",
DeploymentId: fmt.Sprint(os.Getpid()),
ControlSocket: deployers.NewUnixSocketPath(t.TempDir()),
InternalAddress: net.JoinHostPort(ips[0].String(), "12345"),
})
defer d.shutdown()
Expand Down Expand Up @@ -488,7 +492,7 @@ func TestFailActivateComponent(t *testing.T) {
return nil, err
}
components := &protos.UpdateComponentsRequest{Components: []string{req.Component}}
if _, err := d.weavelets["1"].wlet.UpdateComponents(components); err != nil {
if _, err := d.weavelets["1"].wlet.UpdateComponents(ctx, components); err != nil {
return nil, err
}
return &protos.ActivateComponentReply{}, nil
Expand Down Expand Up @@ -607,7 +611,7 @@ func TestUpdateMissingComponents(t *testing.T) {

// Update the weavelet with components that don't exist.
components := &protos.UpdateComponentsRequest{Components: []string{"foo", "bar"}}
if _, err := d.weavelets["1"].wlet.UpdateComponents(components); err == nil {
if _, err := d.weavelets["1"].wlet.UpdateComponents(context.Background(), components); err == nil {
t.Fatal("unexpected success")
}

Expand All @@ -623,7 +627,7 @@ func TestUpdateExistingComponents(t *testing.T) {
components := &protos.UpdateComponentsRequest{
Components: []string{componenta, componentb, componentc},
}
if _, err := d.weavelets["1"].wlet.UpdateComponents(components); err != nil {
if _, err := d.weavelets["1"].wlet.UpdateComponents(context.Background(), components); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -756,7 +760,7 @@ func TestUpdateBadRoutingInfo(t *testing.T) {
componentc: d.weavelets["3"],
}
weavelet := weavelets[req.Component]
if _, err := weavelet.wlet.UpdateComponents(components); err != nil {
if _, err := weavelet.wlet.UpdateComponents(ctx, components); err != nil {
return nil, err
}

Expand Down
9 changes: 6 additions & 3 deletions internal/tool/multi/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"log/slog"
"net"
"net/http"
"path/filepath"
"slices"
"sync"
"syscall"
Expand Down Expand Up @@ -60,6 +59,7 @@ type deployer struct {
ctx context.Context
ctxCancel context.CancelFunc
deploymentId string
tmpDir string // Private directory for this weavelet/envelope
udsPath string // Path to Unix domain socket
config *MultiConfig
started time.Time
Expand Down Expand Up @@ -147,7 +147,7 @@ func newDeployer(ctx context.Context, deploymentId string, config *MultiConfig,
}

// Make Unix domain socket listener for serving hosted system components.
udsPath := filepath.Join(tmpDir, "socket")
udsPath := deployers.NewUnixSocketPath(tmpDir)
uds, err := net.Listen("unix", udsPath)
if err != nil {
return nil, err
Expand All @@ -157,6 +157,7 @@ func newDeployer(ctx context.Context, deploymentId string, config *MultiConfig,
d := &deployer{
ctx: ctx,
ctxCancel: cancel,
tmpDir: tmpDir,
udsPath: udsPath,
logger: logger,
caCert: caCert,
Expand Down Expand Up @@ -357,7 +358,9 @@ func (d *deployer) startColocationGroup(g *group) error {
},
},
}
e, err := envelope.NewEnvelope(d.ctx, info, d.config.App)
e, err := envelope.NewEnvelope(d.ctx, info, d.config.App, envelope.Options{
Logger: d.logger,
})
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion internal/tool/ssh/impl/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ func RunBabysitter(ctx context.Context) error {
Sections: info.App.Sections,
RunMain: info.RunMain,
}
e, err := envelope.NewEnvelope(ctx, wlet, info.App)
e, err := envelope.NewEnvelope(ctx, wlet, info.App, envelope.Options{
Logger: b.logger,
})
if err != nil {
return err
}
Expand Down
25 changes: 22 additions & 3 deletions internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ServiceWeaver/weaver/internal/traceio"
"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/codegen"
"github.com/ServiceWeaver/weaver/runtime/deployers"
"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/ServiceWeaver/weaver/runtime/retry"
Expand All @@ -55,6 +56,8 @@ type RemoteWeaveletOptions struct {
// coordinates with a deployer over a set of Unix pipes to start other
// components remotely. It is the weavelet used by all deployers, except for
// the single process deployer.
//
// RemoteWeavelet must implement the weaver.Controls component interface.
ghemawat marked this conversation as resolved.
Show resolved Hide resolved
type RemoteWeavelet struct {
ctx context.Context // shuts down the weavelet when canceled
servers *errgroup.Group // background servers
Expand Down Expand Up @@ -140,12 +143,16 @@ func NewRemoteWeavelet(ctx context.Context, regs []*codegen.Registration, bootst
if err != nil {
return nil, err
}
// TODO(mwhittaker): Pass handler to Serve, not NewWeaveletConn.

w.conn, err = conn.NewWeaveletConn(toWeavelet, toEnvelope)
if err != nil {
return nil, fmt.Errorf("new weavelet conn: %w", err)
}
info := w.conn.EnvelopeInfo()
controlSocket, err := net.Listen("unix", info.ControlSocket)
if err != nil {
return nil, err
}

// Set up logging.
w.syslogger = w.logger("weavelet", "serviceweaver/system", "")
Expand Down Expand Up @@ -214,6 +221,13 @@ func NewRemoteWeavelet(ctx context.Context, regs []*codegen.Registration, bootst
return nil
})

// Serve the control component.
servers.Go(func() error {
return deployers.ServeComponents(ctx, controlSocket, w.syslogger, map[string]any{
"github.com/ServiceWeaver/weaver/controller": w,
})
})

// Serve RPC requests from other weavelets.
servers.Go(func() error {
server := &server{Listener: w.conn.Listener(), wlet: w}
Expand Down Expand Up @@ -448,8 +462,13 @@ func (w *RemoteWeavelet) GetLoad(*protos.GetLoadRequest) (*protos.GetLoadReply,
return &protos.GetLoadReply{Load: report}, nil
}

// UpdateComponents implements the conn.WeaverHandler interface.
func (w *RemoteWeavelet) UpdateComponents(req *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error) {
// GetHealth implements the controls.Control interface.
ghemawat marked this conversation as resolved.
Show resolved Hide resolved
func (w *RemoteWeavelet) GetHealth(ctx context.Context, req *protos.GetHealthRequest) (*protos.GetHealthReply, error) {
return &protos.GetHealthReply{Status: protos.HealthStatus_HEALTHY}, nil
}

// UpdateComponents implements controls.Control and conn.WeaverHandler interfaces.
ghemawat marked this conversation as resolved.
Show resolved Hide resolved
func (w *RemoteWeavelet) UpdateComponents(ctx context.Context, req *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error) {
var errs []error
var components []*component
var shortened []string
Expand Down
Loading