Skip to content

Commit

Permalink
Use component method calls for some of the envelope->weavelet calls.
Browse files Browse the repository at this point in the history
Previously, all envelope initiated communicaton to the weavelet was
carried over a pair of pipes. This change switches some of that
communication to be based on component method calls. In particular,
the signal to start components is now carried by a method
call (UpdateComponents) to a component hosted by the weavelet.

Main changes
------------

1. Introduce a controller component that is hosted by every remoteweavelet.
2. The envelope picks the name of a Unix domain socket and passes
   it to the weavelet it creates.
3. The envelope creates a stub pointing at this socket and makes calls
   to it to control the weavelet. (Currently only UpdateComponents is
   handled this way.)
4. remoteweavelet creates a controller implementation to handle method
   calls received over this Unix domain socket.

Other changes
-------------

1. envelope.NewEnvelope() now takes an Options struct as an argument.
   This struct can contain an optional Logger and an optional Tracer.
   The supplied Logger and Tracer are used for calls made to the
   controller component.
2. envelope.NewEnvelope() creates a temporary directory and allocates
   a Unix domain socket inside the directory. It also arranges to
   remove the directory when cleaning up (by catching termination
   signals for examples).
3. The socket name is passed in the EnvelopeInfo proto sent to the
   weavelet.
4. RemoteWeavelet creates a controller component and listens for calls
   to it on the supplied socket.
5. Added deployers.NewUnixSocketPath(), which allocates a new socket
   name inside a supplied directory.
6. weavertest multi deployer manages its own socket creation when it
   bypasses envelope.NewEnvelope().
7. website/blog/deployers/deployers_test.go kills child processes
   using SIGTERM instead of SIGKILL. This allows the child process
   a chance to clean up its temporary directories.
8. deployers_test.go logs the stderr and stdout lines it receives
   instead of dropping them on the floor. This can be very helpful
   when things aren't working as expected.
9. The deployer examples in the blog import the weaver package so
   they have access to the controller component registration.
  • Loading branch information
ghemawat committed Dec 14, 2023
1 parent 9f3b5bc commit 7054583
Show file tree
Hide file tree
Showing 24 changed files with 1,057 additions and 405 deletions.
55 changes: 55 additions & 0 deletions controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package weaver

import (
"context"
"fmt"

"github.com/ServiceWeaver/weaver/runtime/protos"
)

// controller is a component hosted in every weavelet. Deployers make calls to this component to
// fetch information about the weavelet, and to make it do various things.
//
// Arguments and results are protobufs to allow deployers to evolve independently of application
// binaries.
type controller interface {
// GetHealth returns the health of the weavelet.
GetHealth(context.Context, *protos.GetHealthRequest) (*protos.GetHealthReply, error)

// UpdateComponents updates the weavelet with the latest set of components it
// should be running.
UpdateComponents(context.Context, *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error)
}

// noopController is a no-op implementation of controller. It exists solely to cause
// controller to be registered as a component. The actual implementation is provided
// by internal/weaver/remoteweavelet.go
type noopController struct {
Implements[controller]
}

var _ controller = &noopController{}

// GetHealth implements controller interface.
func (*noopController) GetHealth(context.Context, *protos.GetHealthRequest) (*protos.GetHealthReply, error) {
return nil, fmt.Errorf("controller.GetHealth not implemented")
}

// UpdateComponents implements controller interface.
func (*noopController) UpdateComponents(context.Context, *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error) {
return nil, fmt.Errorf("controller.UpdateComponents not implemented")
}
19 changes: 19 additions & 0 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,7 @@ github.com/ServiceWeaver/weaver/internal/weaver
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/codegen
github.com/ServiceWeaver/weaver/runtime/colors
github.com/ServiceWeaver/weaver/runtime/deployers
github.com/ServiceWeaver/weaver/runtime/logging
github.com/ServiceWeaver/weaver/runtime/metrics
github.com/ServiceWeaver/weaver/runtime/protos
Expand Down Expand Up @@ -812,21 +813,31 @@ github.com/ServiceWeaver/weaver/runtime/colors
strings
github.com/ServiceWeaver/weaver/runtime/deployers
context
fmt
github.com/ServiceWeaver/weaver/internal/net/call
log/slog
net
path/filepath
sync
github.com/ServiceWeaver/weaver/runtime/envelope
bufio
context
errors
fmt
github.com/ServiceWeaver/weaver/internal/envelope/conn
github.com/ServiceWeaver/weaver/internal/net/call
github.com/ServiceWeaver/weaver/internal/pipe
github.com/ServiceWeaver/weaver/internal/reflection
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/codegen
github.com/ServiceWeaver/weaver/runtime/deployers
github.com/ServiceWeaver/weaver/runtime/metrics
github.com/ServiceWeaver/weaver/runtime/protomsg
github.com/ServiceWeaver/weaver/runtime/protos
go.opentelemetry.io/otel/trace
golang.org/x/sync/errgroup
io
log/slog
os
strconv
sync
Expand Down Expand Up @@ -983,12 +994,14 @@ github.com/ServiceWeaver/weaver/weavertest
github.com/ServiceWeaver/weaver/internal/weaver
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/codegen
github.com/ServiceWeaver/weaver/runtime/deployers
github.com/ServiceWeaver/weaver/runtime/envelope
github.com/ServiceWeaver/weaver/runtime/logging
github.com/ServiceWeaver/weaver/runtime/protos
github.com/google/uuid
golang.org/x/exp/maps
golang.org/x/sync/errgroup
log/slog
os
reflect
regexp
Expand Down Expand Up @@ -1066,17 +1079,22 @@ github.com/ServiceWeaver/weaver/website/blog/deployers/multi
context
flag
fmt
github.com/ServiceWeaver/weaver
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/colors
github.com/ServiceWeaver/weaver/runtime/envelope
github.com/ServiceWeaver/weaver/runtime/logging
github.com/ServiceWeaver/weaver/runtime/protos
github.com/google/uuid
os
sync
github.com/ServiceWeaver/weaver/website/blog/deployers/pipes
context
flag
fmt
github.com/ServiceWeaver/weaver
github.com/ServiceWeaver/weaver/runtime
github.com/ServiceWeaver/weaver/runtime/deployers
github.com/ServiceWeaver/weaver/runtime/protomsg
github.com/ServiceWeaver/weaver/runtime/protos
github.com/google/uuid
Expand All @@ -1087,6 +1105,7 @@ github.com/ServiceWeaver/weaver/website/blog/deployers/single
context
flag
fmt
github.com/ServiceWeaver/weaver
github.com/ServiceWeaver/weaver/runtime/colors
github.com/ServiceWeaver/weaver/runtime/envelope
github.com/ServiceWeaver/weaver/runtime/logging
Expand Down
2 changes: 2 additions & 0 deletions internal/envelope/conn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ServiceWeaver/weaver/internal/reflection"
"github.com/ServiceWeaver/weaver/metrics"
"github.com/ServiceWeaver/weaver/runtime/codegen"
"github.com/ServiceWeaver/weaver/runtime/deployers"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -106,6 +107,7 @@ func makeConnections(t *testing.T, handler conn.EnvelopeHandler) (*conn.Envelope
App: "app",
DeploymentId: uuid.New().String(),
Id: uuid.New().String(),
ControlSocket: deployers.NewUnixSocketPath(t.TempDir()),
InternalAddress: "localhost:0",
}

Expand Down
4 changes: 2 additions & 2 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type WeaveletHandler interface {

// UpdateComponents updates the set of components the weavelet should be
// running. Currently, the set of components only increases over time.
UpdateComponents(*protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error)
UpdateComponents(context.Context, *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error)

// UpdateRoutingInfo updates a component's routing information.
UpdateRoutingInfo(*protos.UpdateRoutingInfoRequest) (*protos.UpdateRoutingInfoReply, error)
Expand Down Expand Up @@ -200,7 +200,7 @@ func (w *WeaveletConn) handleMessage(handler WeaveletHandler, msg *protos.Envelo
}()
return nil
case msg.UpdateComponentsRequest != nil:
reply, err := handler.UpdateComponents(msg.UpdateComponentsRequest)
reply, err := handler.UpdateComponents(context.Background(), msg.UpdateComponentsRequest)
return w.conn.send(&protos.WeaveletMsg{
Id: -msg.Id,
Error: errstring(err),
Expand Down
2 changes: 1 addition & 1 deletion internal/net/call/stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ var _ codegen.Stub = &stub{}
func NewStub(name string, reg *codegen.Registration, conn Connection, tracer trace.Tracer, injectRetries int) codegen.Stub {
return &stub{
conn: conn,
methods: makeStubMethods(name, reg), // XXX Remove reg.Name arg from makeStubMethods
methods: makeStubMethods(name, reg),
tracer: tracer,
injectRetries: injectRetries,
}
Expand Down
14 changes: 9 additions & 5 deletions internal/testdeployer/remoteweavelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/ServiceWeaver/weaver/internal/weaver"
"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/codegen"
"github.com/ServiceWeaver/weaver/runtime/deployers"
"github.com/ServiceWeaver/weaver/runtime/envelope"
"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/protomsg"
Expand Down Expand Up @@ -212,6 +213,7 @@ func deployWithInfo(t *testing.T, ctx context.Context, placement map[string][]st
for name := range placement {
info := protomsg.Clone(info)
info.Id = uuid.New().String()
info.ControlSocket = deployers.NewUnixSocketPath(t.TempDir())
weavelet, err := spawn(ctx, info, d)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -244,7 +246,7 @@ func (d *deployer) ActivateComponent(ctx context.Context, req *protos.ActivateCo
replicas := []string{}
for _, name := range d.placedAt[req.Component] {
weavelet := d.weavelets[name]
if _, err := weavelet.wlet.UpdateComponents(components); err != nil {
if _, err := weavelet.wlet.UpdateComponents(ctx, components); err != nil {
return nil, err
}
replicas = append(replicas, weavelet.env.WeaveletInfo().DialAddr)
Expand Down Expand Up @@ -416,6 +418,7 @@ func TestLocalhostWeaveletAddress(t *testing.T) {
d := deployWithInfo(t, context.Background(), colocated, &protos.EnvelopeInfo{
App: "remoteweavelet_test.go",
DeploymentId: fmt.Sprint(os.Getpid()),
ControlSocket: deployers.NewUnixSocketPath(t.TempDir()),
InternalAddress: "localhost:12345",
})
defer d.shutdown()
Expand Down Expand Up @@ -443,6 +446,7 @@ func TestHostnameWeaveletAddress(t *testing.T) {
d := deployWithInfo(t, context.Background(), colocated, &protos.EnvelopeInfo{
App: "remoteweavelet_test.go",
DeploymentId: fmt.Sprint(os.Getpid()),
ControlSocket: deployers.NewUnixSocketPath(t.TempDir()),
InternalAddress: net.JoinHostPort(ips[0].String(), "12345"),
})
defer d.shutdown()
Expand Down Expand Up @@ -488,7 +492,7 @@ func TestFailActivateComponent(t *testing.T) {
return nil, err
}
components := &protos.UpdateComponentsRequest{Components: []string{req.Component}}
if _, err := d.weavelets["1"].wlet.UpdateComponents(components); err != nil {
if _, err := d.weavelets["1"].wlet.UpdateComponents(ctx, components); err != nil {
return nil, err
}
return &protos.ActivateComponentReply{}, nil
Expand Down Expand Up @@ -607,7 +611,7 @@ func TestUpdateMissingComponents(t *testing.T) {

// Update the weavelet with components that don't exist.
components := &protos.UpdateComponentsRequest{Components: []string{"foo", "bar"}}
if _, err := d.weavelets["1"].wlet.UpdateComponents(components); err == nil {
if _, err := d.weavelets["1"].wlet.UpdateComponents(context.Background(), components); err == nil {
t.Fatal("unexpected success")
}

Expand All @@ -623,7 +627,7 @@ func TestUpdateExistingComponents(t *testing.T) {
components := &protos.UpdateComponentsRequest{
Components: []string{componenta, componentb, componentc},
}
if _, err := d.weavelets["1"].wlet.UpdateComponents(components); err != nil {
if _, err := d.weavelets["1"].wlet.UpdateComponents(context.Background(), components); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -756,7 +760,7 @@ func TestUpdateBadRoutingInfo(t *testing.T) {
componentc: d.weavelets["3"],
}
weavelet := weavelets[req.Component]
if _, err := weavelet.wlet.UpdateComponents(components); err != nil {
if _, err := weavelet.wlet.UpdateComponents(ctx, components); err != nil {
return nil, err
}

Expand Down
9 changes: 6 additions & 3 deletions internal/tool/multi/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"log/slog"
"net"
"net/http"
"path/filepath"
"slices"
"sync"
"syscall"
Expand Down Expand Up @@ -60,6 +59,7 @@ type deployer struct {
ctx context.Context
ctxCancel context.CancelFunc
deploymentId string
tmpDir string // Private directory for this weavelet/envelope
udsPath string // Path to Unix domain socket
config *MultiConfig
started time.Time
Expand Down Expand Up @@ -147,7 +147,7 @@ func newDeployer(ctx context.Context, deploymentId string, config *MultiConfig,
}

// Make Unix domain socket listener for serving hosted system components.
udsPath := filepath.Join(tmpDir, "socket")
udsPath := deployers.NewUnixSocketPath(tmpDir)
uds, err := net.Listen("unix", udsPath)
if err != nil {
return nil, err
Expand All @@ -157,6 +157,7 @@ func newDeployer(ctx context.Context, deploymentId string, config *MultiConfig,
d := &deployer{
ctx: ctx,
ctxCancel: cancel,
tmpDir: tmpDir,
udsPath: udsPath,
logger: logger,
caCert: caCert,
Expand Down Expand Up @@ -357,7 +358,9 @@ func (d *deployer) startColocationGroup(g *group) error {
},
},
}
e, err := envelope.NewEnvelope(d.ctx, info, d.config.App)
e, err := envelope.NewEnvelope(d.ctx, info, d.config.App, envelope.Options{
Logger: d.logger,
})
if err != nil {
return err
}
Expand Down
4 changes: 3 additions & 1 deletion internal/tool/ssh/impl/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ func RunBabysitter(ctx context.Context) error {
Sections: info.App.Sections,
RunMain: info.RunMain,
}
e, err := envelope.NewEnvelope(ctx, wlet, info.App)
e, err := envelope.NewEnvelope(ctx, wlet, info.App, envelope.Options{
Logger: b.logger,
})
if err != nil {
return err
}
Expand Down
25 changes: 22 additions & 3 deletions internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/ServiceWeaver/weaver/internal/traceio"
"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/codegen"
"github.com/ServiceWeaver/weaver/runtime/deployers"
"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/ServiceWeaver/weaver/runtime/retry"
Expand All @@ -55,6 +56,8 @@ type RemoteWeaveletOptions struct {
// coordinates with a deployer over a set of Unix pipes to start other
// components remotely. It is the weavelet used by all deployers, except for
// the single process deployer.
//
// RemoteWeavelet must implement the weaver.Controls component interface.
type RemoteWeavelet struct {
ctx context.Context // shuts down the weavelet when canceled
servers *errgroup.Group // background servers
Expand Down Expand Up @@ -140,12 +143,16 @@ func NewRemoteWeavelet(ctx context.Context, regs []*codegen.Registration, bootst
if err != nil {
return nil, err
}
// TODO(mwhittaker): Pass handler to Serve, not NewWeaveletConn.

w.conn, err = conn.NewWeaveletConn(toWeavelet, toEnvelope)
if err != nil {
return nil, fmt.Errorf("new weavelet conn: %w", err)
}
info := w.conn.EnvelopeInfo()
controlSocket, err := net.Listen("unix", info.ControlSocket)
if err != nil {
return nil, err
}

// Set up logging.
w.syslogger = w.logger("weavelet", "serviceweaver/system", "")
Expand Down Expand Up @@ -214,6 +221,13 @@ func NewRemoteWeavelet(ctx context.Context, regs []*codegen.Registration, bootst
return nil
})

// Serve the control component.
servers.Go(func() error {
return deployers.ServeComponents(ctx, controlSocket, w.syslogger, map[string]any{
"github.com/ServiceWeaver/weaver/controller": w,
})
})

// Serve RPC requests from other weavelets.
servers.Go(func() error {
server := &server{Listener: w.conn.Listener(), wlet: w}
Expand Down Expand Up @@ -448,8 +462,13 @@ func (w *RemoteWeavelet) GetLoad(*protos.GetLoadRequest) (*protos.GetLoadReply,
return &protos.GetLoadReply{Load: report}, nil
}

// UpdateComponents implements the conn.WeaverHandler interface.
func (w *RemoteWeavelet) UpdateComponents(req *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error) {
// GetHealth implements the controls.Control interface.
func (w *RemoteWeavelet) GetHealth(ctx context.Context, req *protos.GetHealthRequest) (*protos.GetHealthReply, error) {
return &protos.GetHealthReply{Status: protos.HealthStatus_HEALTHY}, nil
}

// UpdateComponents implements controls.Control and conn.WeaverHandler interfaces.
func (w *RemoteWeavelet) UpdateComponents(ctx context.Context, req *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error) {
var errs []error
var components []*component
var shortened []string
Expand Down
Loading

0 comments on commit 7054583

Please sign in to comment.