From 7c8a152f6d2d7182c58fecff3bde09ea92349552 Mon Sep 17 00:00:00 2001 From: Sanjay Ghemawat Date: Mon, 8 Jan 2024 10:19:05 -0800 Subject: [PATCH] Rename control components. (#704) 1. Rename the control.Controller component to control.WeaverControl (to distinguish it from the DeployerControl component). Consistently rename corresponding types in other packages. 2. Introduce a control.DeployerControl component that will hold methods implemented by deployers. Remove the logger component and move LogBatch into control.DeployerControl. 3. Do not make the multi deployer provide a full component implementation (by using "weaver generate"). Instead, just export its override under the non-overridden component name. --- controller.go | 66 ----- logger.go => deployerControl.go | 31 ++- godeps.txt | 5 +- internal/control/deployer.go | 31 +++ .../control/{controller.go => weavelet.go} | 7 +- internal/tool/multi/deployer.go | 99 +++++--- internal/tool/multi/logger.go | 58 ----- internal/tool/multi/weaver_gen.go | 234 ------------------ internal/weaver/remoteweavelet.go | 10 +- runtime/envelope/envelope.go | 26 +- weaveletControl.go | 66 +++++ weaver.go | 1 - weaver_gen.go | 180 +++++++------- weavertest/deployer.go | 21 +- 14 files changed, 296 insertions(+), 539 deletions(-) delete mode 100644 controller.go rename logger.go => deployerControl.go (52%) create mode 100644 internal/control/deployer.go rename internal/control/{controller.go => weavelet.go} (89%) delete mode 100644 internal/tool/multi/logger.go delete mode 100644 internal/tool/multi/weaver_gen.go create mode 100644 weaveletControl.go diff --git a/controller.go b/controller.go deleted file mode 100644 index 29ddc90dc..000000000 --- a/controller.go +++ /dev/null @@ -1,66 +0,0 @@ -// Copyright 2023 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package weaver - -import ( - "context" - "fmt" - - "github.com/ServiceWeaver/weaver/internal/control" - "github.com/ServiceWeaver/weaver/runtime/protos" -) - -// controller is a component hosted in every weavelet. Deployers make calls to this component to -// fetch information about the weavelet, and to make it do various things. -type controller control.Controller - -// noopController is a no-op implementation of controller. It exists solely to cause -// controller to be registered as a component. The actual implementation is provided -// by internal/weaver/remoteweavelet.go -type noopController struct { - Implements[controller] -} - -var _ controller = &noopController{} - -// UpdateComponents implements controller interface. -func (*noopController) UpdateComponents(context.Context, *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error) { - return nil, fmt.Errorf("controller.UpdateComponents not implemented") -} - -// UpdateRoutingInfo implements controller interface. -func (*noopController) UpdateRoutingInfo(context.Context, *protos.UpdateRoutingInfoRequest) (*protos.UpdateRoutingInfoReply, error) { - return nil, fmt.Errorf("controller.UpdateRoutingInfo not implemented") -} - -// GetHealth implements controller nterface. -func (*noopController) GetHealth(context.Context, *protos.GetHealthRequest) (*protos.GetHealthReply, error) { - return nil, fmt.Errorf("controller.GetHealth not implemented") -} - -// GetLoad implements controller nterface. -func (*noopController) GetLoad(context.Context, *protos.GetLoadRequest) (*protos.GetLoadReply, error) { - return nil, fmt.Errorf("controller.GetLoad not implemented") -} - -// GetMetrics implements controller nterface. -func (*noopController) GetMetrics(context.Context, *protos.GetMetricsRequest) (*protos.GetMetricsReply, error) { - return nil, fmt.Errorf("controller.GetMetrics not implemented") -} - -// GetProfile implements controller nterface. -func (*noopController) GetProfile(context.Context, *protos.GetProfileRequest) (*protos.GetProfileReply, error) { - return nil, fmt.Errorf("controller.GetProfile not implemented") -} diff --git a/logger.go b/deployerControl.go similarity index 52% rename from logger.go rename to deployerControl.go index 45eb02731..d23ac6707 100644 --- a/logger.go +++ b/deployerControl.go @@ -1,4 +1,4 @@ -// Copyright 2023 Google LLC +// Copyright 2024 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -19,36 +19,35 @@ import ( "fmt" "os" + "github.com/ServiceWeaver/weaver/internal/control" "github.com/ServiceWeaver/weaver/runtime/colors" "github.com/ServiceWeaver/weaver/runtime/logging" "github.com/ServiceWeaver/weaver/runtime/protos" ) -// Logger is a component used by the Service Weaver implementation for saving -// log entries. This component is overridden by various deployers to customize -// how logs are stored. The default implementation writes log entries to os.Stderr -// and is hosted in every weavelet. -type Logger interface { - LogBatch(context.Context, *protos.LogEntryBatch) error -} +// deployerControl is a component hosted in every deployer. Weavelets make calls to this +// component to interact with the deployer. +type deployerControl control.DeployerControl -type stderrLogger struct { - Implements[Logger] +// localDeployerControl is the implementation of deployerControl for local execution. It is +// overridden by remote deployers. +type localDeployerControl struct { + Implements[deployerControl] pp *logging.PrettyPrinter } -var _ Logger = &stderrLogger{} +var _ deployerControl = &localDeployerControl{} -// Init initializes the default Logger component. -func (logger *stderrLogger) Init(ctx context.Context) error { - logger.pp = logging.NewPrettyPrinter(colors.Enabled()) +// Init initializes the local deployerControl component. +func (local *localDeployerControl) Init(ctx context.Context) error { + local.pp = logging.NewPrettyPrinter(colors.Enabled()) return nil } // LogBatch logs a list of entries. -func (logger *stderrLogger) LogBatch(ctx context.Context, batch *protos.LogEntryBatch) error { +func (local *localDeployerControl) LogBatch(ctx context.Context, batch *protos.LogEntryBatch) error { for _, entry := range batch.Entries { - fmt.Fprintln(os.Stderr, logger.pp.Format(entry)) + fmt.Fprintln(os.Stderr, local.pp.Format(entry)) } return nil } diff --git a/godeps.txt b/godeps.txt index 05dca0acc..82d7c3d7b 100644 --- a/godeps.txt +++ b/godeps.txt @@ -550,11 +550,10 @@ github.com/ServiceWeaver/weaver/internal/tool/multi errors flag fmt - github.com/ServiceWeaver/weaver + github.com/ServiceWeaver/weaver/internal/control github.com/ServiceWeaver/weaver/internal/metrics github.com/ServiceWeaver/weaver/internal/must github.com/ServiceWeaver/weaver/internal/proxy - github.com/ServiceWeaver/weaver/internal/reflection github.com/ServiceWeaver/weaver/internal/routing github.com/ServiceWeaver/weaver/internal/status github.com/ServiceWeaver/weaver/internal/tool @@ -576,8 +575,6 @@ github.com/ServiceWeaver/weaver/internal/tool/multi github.com/ServiceWeaver/weaver/runtime/traces github.com/ServiceWeaver/weaver/runtime/version github.com/google/uuid - go.opentelemetry.io/otel/codes - go.opentelemetry.io/otel/trace golang.org/x/exp/maps golang.org/x/sync/errgroup google.golang.org/protobuf/reflect/protoreflect diff --git a/internal/control/deployer.go b/internal/control/deployer.go new file mode 100644 index 000000000..22ef7290e --- /dev/null +++ b/internal/control/deployer.go @@ -0,0 +1,31 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package control + +import ( + "context" + + "github.com/ServiceWeaver/weaver/runtime/protos" +) + +// DeployerControl is the interface for the weaver.deployerControl component. It is +// present in its own package so other packages do not need to copy the interface +// definition. +// +// Arguments and results are protobufs to allow deployers to evolve independently +// of application binaries. +type DeployerControl interface { + LogBatch(context.Context, *protos.LogEntryBatch) error +} diff --git a/internal/control/controller.go b/internal/control/weavelet.go similarity index 89% rename from internal/control/controller.go rename to internal/control/weavelet.go index e5b482970..ef8005dbf 100644 --- a/internal/control/controller.go +++ b/internal/control/weavelet.go @@ -20,12 +20,13 @@ import ( "github.com/ServiceWeaver/weaver/runtime/protos" ) -// Controller is the interface for the weaver.controller component. It is present in -// its own package so other packages do not need to copy the interface definition. +// WeaveletControl is the interface for the weaver.weaveletControl component. It is +// present in its own package so other packages do not need to copy the interface +// definition. // // Arguments and results are protobufs to allow deployers to evolve independently of // application binaries. -type Controller interface { +type WeaveletControl interface { // UpdateComponents updates the weavelet with the latest set of components it // should be running. UpdateComponents(context.Context, *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error) diff --git a/internal/tool/multi/deployer.go b/internal/tool/multi/deployer.go index 6d0834952..a39f2ec5b 100644 --- a/internal/tool/multi/deployer.go +++ b/internal/tool/multi/deployer.go @@ -23,20 +23,21 @@ import ( "log/slog" "net" "net/http" + "os" "slices" "sync" "syscall" "time" - "github.com/ServiceWeaver/weaver" + "github.com/ServiceWeaver/weaver/internal/control" imetrics "github.com/ServiceWeaver/weaver/internal/metrics" "github.com/ServiceWeaver/weaver/internal/proxy" - "github.com/ServiceWeaver/weaver/internal/reflection" "github.com/ServiceWeaver/weaver/internal/routing" "github.com/ServiceWeaver/weaver/internal/status" "github.com/ServiceWeaver/weaver/internal/tool/certs" "github.com/ServiceWeaver/weaver/runtime" "github.com/ServiceWeaver/weaver/runtime/bin" + "github.com/ServiceWeaver/weaver/runtime/colors" "github.com/ServiceWeaver/weaver/runtime/deployers" "github.com/ServiceWeaver/weaver/runtime/envelope" "github.com/ServiceWeaver/weaver/runtime/graph" @@ -54,21 +55,25 @@ import ( // The default number of times a component is replicated. const defaultReplication = 2 +// Path name for the deployer control component we implement. +const deployerControlPath = "github.com/ServiceWeaver/weaver/deployerControl" + // A deployer manages an application deployment. type deployer struct { - ctx context.Context - ctxCancel context.CancelFunc - deploymentId string - tmpDir string // Private directory for this weavelet/envelope - udsPath string // Path to Unix domain socket - config *MultiConfig - started time.Time - logger *slog.Logger - caCert *x509.Certificate - caKey crypto.PrivateKey - running errgroup.Group - loggerComponent *logger - traceDB *traces.DB + ctx context.Context + ctxCancel context.CancelFunc + deploymentId string + tmpDir string // Private directory for this weavelet/envelope + udsPath string // Path to Unix domain socket + config *MultiConfig + started time.Time + logger *slog.Logger + caCert *x509.Certificate + caKey crypto.PrivateKey + running errgroup.Group + logsDB *logging.FileStore + printer *logging.PrettyPrinter + traceDB *traces.DB // statsProcessor tracks and computes stats to be rendered on the /statusz page. statsProcessor *imetrics.StatsProcessor @@ -80,6 +85,8 @@ type deployer struct { } +var _ control.DeployerControl = &deployer{} + // A group contains information about a co-location group. type group struct { name string // group name @@ -119,7 +126,7 @@ func newDeployer(ctx context.Context, deploymentId string, config *MultiConfig, if err != nil { return nil, fmt.Errorf("cannot create log storage: %w", err) } - loggerComponent := newLogger(logsDB) + printer := logging.NewPrettyPrinter(colors.Enabled()) logger := slog.New(&logging.LogHandler{ Opts: logging.Options{ App: config.App.Name, @@ -127,9 +134,7 @@ func newDeployer(ctx context.Context, deploymentId string, config *MultiConfig, Weavelet: uuid.NewString(), Attrs: []string{"serviceweaver/system", ""}, }, - // Local log entries are relayed directly to loggerComponent - // without going through any stubs. - Write: loggerComponent.log, + Write: func(e *protos.LogEntry) { log(logsDB, printer, e) }, }) var caCert *x509.Certificate var caKey crypto.PrivateKey @@ -155,20 +160,21 @@ func newDeployer(ctx context.Context, deploymentId string, config *MultiConfig, ctx, cancel := context.WithCancel(ctx) d := &deployer{ - ctx: ctx, - ctxCancel: cancel, - tmpDir: tmpDir, - udsPath: udsPath, - logger: logger, - caCert: caCert, - caKey: caKey, - loggerComponent: loggerComponent, - traceDB: traceDB, - statsProcessor: imetrics.NewStatsProcessor(), - deploymentId: deploymentId, - config: config, - started: time.Now(), - proxies: map[string]*proxyInfo{}, + ctx: ctx, + ctxCancel: cancel, + tmpDir: tmpDir, + udsPath: udsPath, + logger: logger, + caCert: caCert, + caKey: caKey, + logsDB: logsDB, + printer: printer, + traceDB: traceDB, + statsProcessor: imetrics.NewStatsProcessor(), + deploymentId: deploymentId, + config: config, + started: time.Now(), + proxies: map[string]*proxyInfo{}, } // Form co-location groups. @@ -191,10 +197,10 @@ func newDeployer(ctx context.Context, deploymentId string, config *MultiConfig, return err }) - // Start a goroutine that serves calls to system components like multiLogger. + // Start a goroutine that serves calls to system components like deployerControl. d.running.Go(func() error { err := deployers.ServeComponents(d.ctx, uds, d.logger, map[string]any{ - reflection.ComponentName[multiLogger](): loggerComponent, + deployerControlPath: d, }) d.stop(err) return err @@ -350,10 +356,10 @@ func (d *deployer) startColocationGroup(g *group) error { Mtls: d.config.Mtls, InternalAddress: "localhost:0", Redirects: []*protos.EnvelopeInfo_Redirect{ - // Override the builtin logger. + // Supply custom deployer control component { - Component: reflection.ComponentName[weaver.Logger](), - Target: reflection.ComponentName[multiLogger](), + Component: deployerControlPath, + Target: deployerControlPath, Address: "unix://" + d.udsPath, }, }, @@ -565,9 +571,17 @@ func (d *deployer) registerReplica(g *group, info *protos.WeaveletInfo, pid int) return nil } +// LogBatch implements the control.DeployerControl interface. +func (d *deployer) LogBatch(ctx context.Context, batch *protos.LogEntryBatch) error { + for _, entry := range batch.Entries { + log(d.logsDB, d.printer, entry) + } + return nil +} + // HandleLogEntry implements the envelope.EnvelopeHandler interface. func (d *deployer) HandleLogEntry(_ context.Context, entry *protos.LogEntry) error { - d.loggerComponent.log(entry) + log(d.logsDB, d.printer, entry) return nil } @@ -769,3 +783,10 @@ func runProfiling(_ context.Context, req *protos.GetProfileRequest, processes ma data, err := profiling.ProfileGroups(groups) return &protos.GetProfileReply{Data: data}, err } + +func log(db *logging.FileStore, printer *logging.PrettyPrinter, e *protos.LogEntry) { + if !logging.IsSystemGenerated(e) { + fmt.Fprintln(os.Stderr, printer.Format(e)) + } + db.Add(e) +} diff --git a/internal/tool/multi/logger.go b/internal/tool/multi/logger.go deleted file mode 100644 index 3d2d3065d..000000000 --- a/internal/tool/multi/logger.go +++ /dev/null @@ -1,58 +0,0 @@ -// Copyright 2023 Google LLC -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package multi - -import ( - "context" - "fmt" - "os" - - "github.com/ServiceWeaver/weaver" - "github.com/ServiceWeaver/weaver/runtime/colors" - "github.com/ServiceWeaver/weaver/runtime/logging" - "github.com/ServiceWeaver/weaver/runtime/protos" -) - -// multiLogger overrides weaver.Logger component when running under the multi deployer. -type multiLogger weaver.Logger - -type logger struct { - weaver.Implements[multiLogger] - logsDB *logging.FileStore - printer *logging.PrettyPrinter -} - -var _ multiLogger = &logger{} - -func newLogger(logsDB *logging.FileStore) *logger { - return &logger{ - logsDB: logsDB, - printer: logging.NewPrettyPrinter(colors.Enabled()), - } -} - -func (l *logger) LogBatch(ctx context.Context, batch *protos.LogEntryBatch) error { - for _, e := range batch.Entries { - l.log(e) - } - return nil -} - -func (l *logger) log(e *protos.LogEntry) { - if !logging.IsSystemGenerated(e) { - fmt.Fprintln(os.Stderr, l.printer.Format(e)) - } - l.logsDB.Add(e) -} diff --git a/internal/tool/multi/weaver_gen.go b/internal/tool/multi/weaver_gen.go deleted file mode 100644 index df3b917a2..000000000 --- a/internal/tool/multi/weaver_gen.go +++ /dev/null @@ -1,234 +0,0 @@ -// Code generated by "weaver generate". DO NOT EDIT. -//go:build !ignoreWeaverGen - -package multi - -import ( - "context" - "errors" - "github.com/ServiceWeaver/weaver" - "github.com/ServiceWeaver/weaver/runtime/codegen" - "github.com/ServiceWeaver/weaver/runtime/protos" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" - "reflect" -) - -func init() { - codegen.Register(codegen.Registration{ - Name: "github.com/ServiceWeaver/weaver/internal/tool/multi/multiLogger", - Iface: reflect.TypeOf((*multiLogger)(nil)).Elem(), - Impl: reflect.TypeOf(logger{}), - LocalStubFn: func(impl any, caller string, tracer trace.Tracer) any { - return multiLogger_local_stub{impl: impl.(multiLogger), tracer: tracer, logBatchMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/internal/tool/multi/multiLogger", Method: "LogBatch", Remote: false})} - }, - ClientStubFn: func(stub codegen.Stub, caller string) any { - return multiLogger_client_stub{stub: stub, logBatchMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/internal/tool/multi/multiLogger", Method: "LogBatch", Remote: true})} - }, - ServerStubFn: func(impl any, addLoad func(uint64, float64)) codegen.Server { - return multiLogger_server_stub{impl: impl.(multiLogger), addLoad: addLoad} - }, - ReflectStubFn: func(caller func(string, context.Context, []any, []any) error) any { - return multiLogger_reflect_stub{caller: caller} - }, - RefData: "", - }) -} - -// weaver.InstanceOf checks. -var _ weaver.InstanceOf[multiLogger] = (*logger)(nil) - -// weaver.Router checks. -var _ weaver.Unrouted = (*logger)(nil) - -// Local stub implementations. - -type multiLogger_local_stub struct { - impl multiLogger - tracer trace.Tracer - logBatchMetrics *codegen.MethodMetrics -} - -// Check that multiLogger_local_stub implements the multiLogger interface. -var _ multiLogger = (*multiLogger_local_stub)(nil) - -func (s multiLogger_local_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { - // Update metrics. - begin := s.logBatchMetrics.Begin() - defer func() { s.logBatchMetrics.End(begin, err != nil, 0, 0) }() - span := trace.SpanFromContext(ctx) - if span.SpanContext().IsValid() { - // Create a child span for this method. - ctx, span = s.tracer.Start(ctx, "multi.multiLogger.LogBatch", trace.WithSpanKind(trace.SpanKindInternal)) - defer func() { - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - span.End() - }() - } - - return s.impl.LogBatch(ctx, a0) -} - -// Client stub implementations. - -type multiLogger_client_stub struct { - stub codegen.Stub - logBatchMetrics *codegen.MethodMetrics -} - -// Check that multiLogger_client_stub implements the multiLogger interface. -var _ multiLogger = (*multiLogger_client_stub)(nil) - -func (s multiLogger_client_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { - // Update metrics. - var requestBytes, replyBytes int - begin := s.logBatchMetrics.Begin() - defer func() { s.logBatchMetrics.End(begin, err != nil, requestBytes, replyBytes) }() - - span := trace.SpanFromContext(ctx) - if span.SpanContext().IsValid() { - // Create a child span for this method. - ctx, span = s.stub.Tracer().Start(ctx, "multi.multiLogger.LogBatch", trace.WithSpanKind(trace.SpanKindClient)) - } - - defer func() { - // Catch and return any panics detected during encoding/decoding/rpc. - if err == nil { - err = codegen.CatchPanics(recover()) - if err != nil { - err = errors.Join(weaver.RemoteCallError, err) - } - } - - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } - span.End() - - }() - - // Encode arguments. - enc := codegen.NewEncoder() - serviceweaver_enc_ptr_LogEntryBatch_fec9a5d4(enc, a0) - var shardKey uint64 - - // Call the remote method. - requestBytes = len(enc.Data()) - var results []byte - results, err = s.stub.Run(ctx, 0, enc.Data(), shardKey) - replyBytes = len(results) - if err != nil { - err = errors.Join(weaver.RemoteCallError, err) - return - } - - // Decode the results. - dec := codegen.NewDecoder(results) - err = dec.Error() - return -} - -// Note that "weaver generate" will always generate the error message below. -// Everything is okay. The error message is only relevant if you see it when -// you run "go build" or "go run". -var _ codegen.LatestVersion = codegen.Version[[0][20]struct{}](` - -ERROR: You generated this file with 'weaver generate' (devel) (codegen -version v0.20.0). The generated code is incompatible with the version of the -github.com/ServiceWeaver/weaver module that you're using. The weaver module -version can be found in your go.mod file or by running the following command. - - go list -m github.com/ServiceWeaver/weaver - -We recommend updating the weaver module and the 'weaver generate' command by -running the following. - - go get github.com/ServiceWeaver/weaver@latest - go install github.com/ServiceWeaver/weaver/cmd/weaver@latest - -Then, re-run 'weaver generate' and re-build your code. If the problem persists, -please file an issue at https://github.com/ServiceWeaver/weaver/issues. - -`) - -// Server stub implementations. - -type multiLogger_server_stub struct { - impl multiLogger - addLoad func(key uint64, load float64) -} - -// Check that multiLogger_server_stub implements the codegen.Server interface. -var _ codegen.Server = (*multiLogger_server_stub)(nil) - -// GetStubFn implements the codegen.Server interface. -func (s multiLogger_server_stub) GetStubFn(method string) func(ctx context.Context, args []byte) ([]byte, error) { - switch method { - case "LogBatch": - return s.logBatch - default: - return nil - } -} - -func (s multiLogger_server_stub) logBatch(ctx context.Context, args []byte) (res []byte, err error) { - // Catch and return any panics detected during encoding/decoding/rpc. - defer func() { - if err == nil { - err = codegen.CatchPanics(recover()) - } - }() - - // Decode arguments. - dec := codegen.NewDecoder(args) - var a0 *protos.LogEntryBatch - a0 = serviceweaver_dec_ptr_LogEntryBatch_fec9a5d4(dec) - - // TODO(rgrandl): The deferred function above will recover from panics in the - // user code: fix this. - // Call the local method. - appErr := s.impl.LogBatch(ctx, a0) - - // Encode the results. - enc := codegen.NewEncoder() - enc.Error(appErr) - return enc.Data(), nil -} - -// Reflect stub implementations. - -type multiLogger_reflect_stub struct { - caller func(string, context.Context, []any, []any) error -} - -// Check that multiLogger_reflect_stub implements the multiLogger interface. -var _ multiLogger = (*multiLogger_reflect_stub)(nil) - -func (s multiLogger_reflect_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { - err = s.caller("LogBatch", ctx, []any{a0}, []any{}) - return -} - -// Encoding/decoding implementations. - -func serviceweaver_enc_ptr_LogEntryBatch_fec9a5d4(enc *codegen.Encoder, arg *protos.LogEntryBatch) { - if arg == nil { - enc.Bool(false) - } else { - enc.Bool(true) - enc.EncodeProto(arg) - } -} - -func serviceweaver_dec_ptr_LogEntryBatch_fec9a5d4(dec *codegen.Decoder) *protos.LogEntryBatch { - if !dec.Bool() { - return nil - } - var res protos.LogEntryBatch - dec.DecodeProto(&res) - return &res -} diff --git a/internal/weaver/remoteweavelet.go b/internal/weaver/remoteweavelet.go index ec4fd2913..fc139f3c9 100644 --- a/internal/weaver/remoteweavelet.go +++ b/internal/weaver/remoteweavelet.go @@ -79,7 +79,7 @@ type RemoteWeavelet struct { listeners map[string]*listener // listeners, by name } -var _ control.Controller = (*RemoteWeavelet)(nil) +var _ control.WeaveletControl = (*RemoteWeavelet)(nil) type redirect struct { component *component @@ -229,7 +229,7 @@ func NewRemoteWeavelet(ctx context.Context, regs []*codegen.Registration, bootst // Serve the control component. servers.Go(func() error { return deployers.ServeComponents(ctx, controlSocket, w.syslogger, map[string]any{ - "github.com/ServiceWeaver/weaver/controller": w, + "github.com/ServiceWeaver/weaver/weaveletControl": w, }) }) @@ -661,9 +661,9 @@ func (w *RemoteWeavelet) repeatedly(ctx context.Context, errMsg string, f func() } func (w *RemoteWeavelet) getLoggerFunction() (func(context.Context, *protos.LogEntryBatch) error, error) { - // If an override is found for the logger component, use it. - const loggerPath = "github.com/ServiceWeaver/weaver/Logger" - r, ok := w.redirects[loggerPath] + // If an override is found for the deployer control component, use it. + const overridePath = "github.com/ServiceWeaver/weaver/deployerControl" + r, ok := w.redirects[overridePath] if !ok { // For now, fall back to sending over the pipe to the weavelet. // TODO(sanjay): Make the default write to os.Stderr once all deployers diff --git a/runtime/envelope/envelope.go b/runtime/envelope/envelope.go index 18fd0e251..6a0083af0 100644 --- a/runtime/envelope/envelope.go +++ b/runtime/envelope/envelope.go @@ -112,11 +112,11 @@ type Envelope struct { tmpDir string weavelet *protos.EnvelopeInfo config *protos.AppConfig - conn *conn.EnvelopeConn // conn to weavelet - cmd *pipe.Cmd // command that started the weavelet - stdoutPipe io.ReadCloser // stdout pipe from the weavelet - stderrPipe io.ReadCloser // stderr pipe from the weavelet - controller control.Controller // Stub that talks to the weavelet controller + conn *conn.EnvelopeConn // conn to weavelet + cmd *pipe.Cmd // command that started the weavelet + stdoutPipe io.ReadCloser // stdout pipe from the weavelet + stderrPipe io.ReadCloser // stderr pipe from the weavelet + controller control.WeaveletControl // Stub that talks to the weavelet controller // State needed to process metric updates. metricsMu sync.Mutex @@ -163,7 +163,7 @@ func NewEnvelope(ctx context.Context, wlet *protos.EnvelopeInfo, config *protos. wlet = protomsg.Clone(wlet) wlet.ControlSocket = deployers.NewUnixSocketPath(tmpDir) - controller, err := getController(ctx, wlet.ControlSocket, options) + controller, err := getWeaveletControlStub(ctx, wlet.ControlSocket, options) if err != nil { return nil, err } @@ -241,8 +241,8 @@ func NewEnvelope(ctx context.Context, wlet *protos.EnvelopeInfo, config *protos. return e, nil } -// Controller returns the controller component for the weavelet managed by this envelope. -func (e *Envelope) Controller() control.Controller { return e.controller } +// WeaveletControl returns the controller component for the weavelet managed by this envelope. +func (e *Envelope) WeaveletControl() control.WeaveletControl { return e.controller } // Serve accepts incoming messages from the weavelet. RPC requests are handled // serially in the order they are received. Serve blocks until the connection @@ -409,10 +409,10 @@ func dropNewline(line []byte) []byte { return line } -// getController returns a controller that forwards calls to the controller component -// in the weavelet at the specified socket. -func getController(ctx context.Context, socket string, options Options) (control.Controller, error) { - const controllerName = "github.com/ServiceWeaver/weaver/controller" +// getWeaveletControlStub returns a control.WeaveletControl that forwards calls to the controller +// component in the weavelet at the specified socket. +func getWeaveletControlStub(ctx context.Context, socket string, options Options) (control.WeaveletControl, error) { + const controllerName = "github.com/ServiceWeaver/weaver/weaveletControl" controllerReg, ok := codegen.Find(controllerName) if !ok { return nil, fmt.Errorf("controller component (%s) not found", controllerName) @@ -427,5 +427,5 @@ func getController(ctx context.Context, socket string, options Options) (control // We skip waitUntilReady() and rely on automatic retries of methods stub := call.NewStub(controllerName, controllerReg, conn, options.Tracer, 0) obj := controllerReg.ClientStubFn(stub, "envelope") - return obj.(control.Controller), nil + return obj.(control.WeaveletControl), nil } diff --git a/weaveletControl.go b/weaveletControl.go new file mode 100644 index 000000000..800bd288e --- /dev/null +++ b/weaveletControl.go @@ -0,0 +1,66 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package weaver + +import ( + "context" + "fmt" + + "github.com/ServiceWeaver/weaver/internal/control" + "github.com/ServiceWeaver/weaver/runtime/protos" +) + +// weaveletControl is a component hosted in every weavelet. Deployers make calls to this component +// to fetch information about the weavelet, and to make it do various things. +type weaveletControl control.WeaveletControl + +// noopWeaveletControl is a no-op implementation of weaveletControl. It exists solely to cause +// weaveletControl to be registered as a component. The actual implementation is provided +// by internal/weaver/remoteweavelet.go +type noopWeaveletControl struct { + Implements[weaveletControl] +} + +var _ weaveletControl = &noopWeaveletControl{} + +// UpdateComponents implements weaveletControl interface. +func (*noopWeaveletControl) UpdateComponents(context.Context, *protos.UpdateComponentsRequest) (*protos.UpdateComponentsReply, error) { + return nil, fmt.Errorf("weaveletControl.UpdateComponents not implemented") +} + +// UpdateRoutingInfo implements weaveletControl interface. +func (*noopWeaveletControl) UpdateRoutingInfo(context.Context, *protos.UpdateRoutingInfoRequest) (*protos.UpdateRoutingInfoReply, error) { + return nil, fmt.Errorf("weaveletControl.UpdateRoutingInfo not implemented") +} + +// GetHealth implements weaveletControl nterface. +func (*noopWeaveletControl) GetHealth(context.Context, *protos.GetHealthRequest) (*protos.GetHealthReply, error) { + return nil, fmt.Errorf("weaveletControl.GetHealth not implemented") +} + +// GetLoad implements weaveletControl nterface. +func (*noopWeaveletControl) GetLoad(context.Context, *protos.GetLoadRequest) (*protos.GetLoadReply, error) { + return nil, fmt.Errorf("weaveletControl.GetLoad not implemented") +} + +// GetMetrics implements weaveletControl nterface. +func (*noopWeaveletControl) GetMetrics(context.Context, *protos.GetMetricsRequest) (*protos.GetMetricsReply, error) { + return nil, fmt.Errorf("weaveletControl.GetMetrics not implemented") +} + +// GetProfile implements weaveletControl nterface. +func (*noopWeaveletControl) GetProfile(context.Context, *protos.GetProfileRequest) (*protos.GetProfileReply, error) { + return nil, fmt.Errorf("weaveletControl.GetProfile not implemented") +} diff --git a/weaver.go b/weaver.go index 6f1a53af6..f048440ce 100644 --- a/weaver.go +++ b/weaver.go @@ -43,7 +43,6 @@ import ( //go:generate ./dev/protoc.sh internal/status/status.proto //go:generate ./dev/protoc.sh internal/tool/single/single.proto -//go:generate ./dev/protoc.sh internal/tool/multi/multi.proto //go:generate ./dev/protoc.sh internal/tool/ssh/impl/ssh.proto //go:generate ./dev/protoc.sh runtime/protos/runtime.proto //go:generate ./dev/protoc.sh runtime/protos/config.proto diff --git a/weaver_gen.go b/weaver_gen.go index 4decef01c..285ed4a2c 100644 --- a/weaver_gen.go +++ b/weaver_gen.go @@ -15,70 +15,70 @@ import ( func init() { codegen.Register(codegen.Registration{ - Name: "github.com/ServiceWeaver/weaver/Logger", - Iface: reflect.TypeOf((*Logger)(nil)).Elem(), - Impl: reflect.TypeOf(stderrLogger{}), + Name: "github.com/ServiceWeaver/weaver/deployerControl", + Iface: reflect.TypeOf((*deployerControl)(nil)).Elem(), + Impl: reflect.TypeOf(localDeployerControl{}), LocalStubFn: func(impl any, caller string, tracer trace.Tracer) any { - return logger_local_stub{impl: impl.(Logger), tracer: tracer, logBatchMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/Logger", Method: "LogBatch", Remote: false})} + return deployerControl_local_stub{impl: impl.(deployerControl), tracer: tracer, logBatchMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/deployerControl", Method: "LogBatch", Remote: false})} }, ClientStubFn: func(stub codegen.Stub, caller string) any { - return logger_client_stub{stub: stub, logBatchMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/Logger", Method: "LogBatch", Remote: true})} + return deployerControl_client_stub{stub: stub, logBatchMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/deployerControl", Method: "LogBatch", Remote: true})} }, ServerStubFn: func(impl any, addLoad func(uint64, float64)) codegen.Server { - return logger_server_stub{impl: impl.(Logger), addLoad: addLoad} + return deployerControl_server_stub{impl: impl.(deployerControl), addLoad: addLoad} }, ReflectStubFn: func(caller func(string, context.Context, []any, []any) error) any { - return logger_reflect_stub{caller: caller} + return deployerControl_reflect_stub{caller: caller} }, RefData: "", }) codegen.Register(codegen.Registration{ - Name: "github.com/ServiceWeaver/weaver/controller", - Iface: reflect.TypeOf((*controller)(nil)).Elem(), - Impl: reflect.TypeOf(noopController{}), + Name: "github.com/ServiceWeaver/weaver/weaveletControl", + Iface: reflect.TypeOf((*weaveletControl)(nil)).Elem(), + Impl: reflect.TypeOf(noopWeaveletControl{}), LocalStubFn: func(impl any, caller string, tracer trace.Tracer) any { - return controller_local_stub{impl: impl.(controller), tracer: tracer, getHealthMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "GetHealth", Remote: false}), getLoadMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "GetLoad", Remote: false}), getMetricsMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "GetMetrics", Remote: false}), getProfileMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "GetProfile", Remote: false}), updateComponentsMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "UpdateComponents", Remote: false}), updateRoutingInfoMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "UpdateRoutingInfo", Remote: false})} + return weaveletControl_local_stub{impl: impl.(weaveletControl), tracer: tracer, getHealthMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "GetHealth", Remote: false}), getLoadMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "GetLoad", Remote: false}), getMetricsMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "GetMetrics", Remote: false}), getProfileMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "GetProfile", Remote: false}), updateComponentsMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "UpdateComponents", Remote: false}), updateRoutingInfoMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "UpdateRoutingInfo", Remote: false})} }, ClientStubFn: func(stub codegen.Stub, caller string) any { - return controller_client_stub{stub: stub, getHealthMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "GetHealth", Remote: true}), getLoadMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "GetLoad", Remote: true}), getMetricsMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "GetMetrics", Remote: true}), getProfileMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "GetProfile", Remote: true}), updateComponentsMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "UpdateComponents", Remote: true}), updateRoutingInfoMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/controller", Method: "UpdateRoutingInfo", Remote: true})} + return weaveletControl_client_stub{stub: stub, getHealthMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "GetHealth", Remote: true}), getLoadMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "GetLoad", Remote: true}), getMetricsMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "GetMetrics", Remote: true}), getProfileMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "GetProfile", Remote: true}), updateComponentsMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "UpdateComponents", Remote: true}), updateRoutingInfoMetrics: codegen.MethodMetricsFor(codegen.MethodLabels{Caller: caller, Component: "github.com/ServiceWeaver/weaver/weaveletControl", Method: "UpdateRoutingInfo", Remote: true})} }, ServerStubFn: func(impl any, addLoad func(uint64, float64)) codegen.Server { - return controller_server_stub{impl: impl.(controller), addLoad: addLoad} + return weaveletControl_server_stub{impl: impl.(weaveletControl), addLoad: addLoad} }, ReflectStubFn: func(caller func(string, context.Context, []any, []any) error) any { - return controller_reflect_stub{caller: caller} + return weaveletControl_reflect_stub{caller: caller} }, RefData: "", }) } // weaver.InstanceOf checks. -var _ InstanceOf[Logger] = (*stderrLogger)(nil) -var _ InstanceOf[controller] = (*noopController)(nil) +var _ InstanceOf[deployerControl] = (*localDeployerControl)(nil) +var _ InstanceOf[weaveletControl] = (*noopWeaveletControl)(nil) // weaver.Router checks. -var _ Unrouted = (*stderrLogger)(nil) -var _ Unrouted = (*noopController)(nil) +var _ Unrouted = (*localDeployerControl)(nil) +var _ Unrouted = (*noopWeaveletControl)(nil) // Local stub implementations. -type logger_local_stub struct { - impl Logger +type deployerControl_local_stub struct { + impl deployerControl tracer trace.Tracer logBatchMetrics *codegen.MethodMetrics } -// Check that logger_local_stub implements the Logger interface. -var _ Logger = (*logger_local_stub)(nil) +// Check that deployerControl_local_stub implements the deployerControl interface. +var _ deployerControl = (*deployerControl_local_stub)(nil) -func (s logger_local_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { +func (s deployerControl_local_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { // Update metrics. begin := s.logBatchMetrics.Begin() defer func() { s.logBatchMetrics.End(begin, err != nil, 0, 0) }() span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.tracer.Start(ctx, "weaver.Logger.LogBatch", trace.WithSpanKind(trace.SpanKindInternal)) + ctx, span = s.tracer.Start(ctx, "weaver.deployerControl.LogBatch", trace.WithSpanKind(trace.SpanKindInternal)) defer func() { if err != nil { span.RecordError(err) @@ -91,8 +91,8 @@ func (s logger_local_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatc return s.impl.LogBatch(ctx, a0) } -type controller_local_stub struct { - impl controller +type weaveletControl_local_stub struct { + impl weaveletControl tracer trace.Tracer getHealthMetrics *codegen.MethodMetrics getLoadMetrics *codegen.MethodMetrics @@ -102,17 +102,17 @@ type controller_local_stub struct { updateRoutingInfoMetrics *codegen.MethodMetrics } -// Check that controller_local_stub implements the controller interface. -var _ controller = (*controller_local_stub)(nil) +// Check that weaveletControl_local_stub implements the weaveletControl interface. +var _ weaveletControl = (*weaveletControl_local_stub)(nil) -func (s controller_local_stub) GetHealth(ctx context.Context, a0 *protos.GetHealthRequest) (r0 *protos.GetHealthReply, err error) { +func (s weaveletControl_local_stub) GetHealth(ctx context.Context, a0 *protos.GetHealthRequest) (r0 *protos.GetHealthReply, err error) { // Update metrics. begin := s.getHealthMetrics.Begin() defer func() { s.getHealthMetrics.End(begin, err != nil, 0, 0) }() span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.tracer.Start(ctx, "weaver.controller.GetHealth", trace.WithSpanKind(trace.SpanKindInternal)) + ctx, span = s.tracer.Start(ctx, "weaver.weaveletControl.GetHealth", trace.WithSpanKind(trace.SpanKindInternal)) defer func() { if err != nil { span.RecordError(err) @@ -125,14 +125,14 @@ func (s controller_local_stub) GetHealth(ctx context.Context, a0 *protos.GetHeal return s.impl.GetHealth(ctx, a0) } -func (s controller_local_stub) GetLoad(ctx context.Context, a0 *protos.GetLoadRequest) (r0 *protos.GetLoadReply, err error) { +func (s weaveletControl_local_stub) GetLoad(ctx context.Context, a0 *protos.GetLoadRequest) (r0 *protos.GetLoadReply, err error) { // Update metrics. begin := s.getLoadMetrics.Begin() defer func() { s.getLoadMetrics.End(begin, err != nil, 0, 0) }() span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.tracer.Start(ctx, "weaver.controller.GetLoad", trace.WithSpanKind(trace.SpanKindInternal)) + ctx, span = s.tracer.Start(ctx, "weaver.weaveletControl.GetLoad", trace.WithSpanKind(trace.SpanKindInternal)) defer func() { if err != nil { span.RecordError(err) @@ -145,14 +145,14 @@ func (s controller_local_stub) GetLoad(ctx context.Context, a0 *protos.GetLoadRe return s.impl.GetLoad(ctx, a0) } -func (s controller_local_stub) GetMetrics(ctx context.Context, a0 *protos.GetMetricsRequest) (r0 *protos.GetMetricsReply, err error) { +func (s weaveletControl_local_stub) GetMetrics(ctx context.Context, a0 *protos.GetMetricsRequest) (r0 *protos.GetMetricsReply, err error) { // Update metrics. begin := s.getMetricsMetrics.Begin() defer func() { s.getMetricsMetrics.End(begin, err != nil, 0, 0) }() span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.tracer.Start(ctx, "weaver.controller.GetMetrics", trace.WithSpanKind(trace.SpanKindInternal)) + ctx, span = s.tracer.Start(ctx, "weaver.weaveletControl.GetMetrics", trace.WithSpanKind(trace.SpanKindInternal)) defer func() { if err != nil { span.RecordError(err) @@ -165,14 +165,14 @@ func (s controller_local_stub) GetMetrics(ctx context.Context, a0 *protos.GetMet return s.impl.GetMetrics(ctx, a0) } -func (s controller_local_stub) GetProfile(ctx context.Context, a0 *protos.GetProfileRequest) (r0 *protos.GetProfileReply, err error) { +func (s weaveletControl_local_stub) GetProfile(ctx context.Context, a0 *protos.GetProfileRequest) (r0 *protos.GetProfileReply, err error) { // Update metrics. begin := s.getProfileMetrics.Begin() defer func() { s.getProfileMetrics.End(begin, err != nil, 0, 0) }() span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.tracer.Start(ctx, "weaver.controller.GetProfile", trace.WithSpanKind(trace.SpanKindInternal)) + ctx, span = s.tracer.Start(ctx, "weaver.weaveletControl.GetProfile", trace.WithSpanKind(trace.SpanKindInternal)) defer func() { if err != nil { span.RecordError(err) @@ -185,14 +185,14 @@ func (s controller_local_stub) GetProfile(ctx context.Context, a0 *protos.GetPro return s.impl.GetProfile(ctx, a0) } -func (s controller_local_stub) UpdateComponents(ctx context.Context, a0 *protos.UpdateComponentsRequest) (r0 *protos.UpdateComponentsReply, err error) { +func (s weaveletControl_local_stub) UpdateComponents(ctx context.Context, a0 *protos.UpdateComponentsRequest) (r0 *protos.UpdateComponentsReply, err error) { // Update metrics. begin := s.updateComponentsMetrics.Begin() defer func() { s.updateComponentsMetrics.End(begin, err != nil, 0, 0) }() span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.tracer.Start(ctx, "weaver.controller.UpdateComponents", trace.WithSpanKind(trace.SpanKindInternal)) + ctx, span = s.tracer.Start(ctx, "weaver.weaveletControl.UpdateComponents", trace.WithSpanKind(trace.SpanKindInternal)) defer func() { if err != nil { span.RecordError(err) @@ -205,14 +205,14 @@ func (s controller_local_stub) UpdateComponents(ctx context.Context, a0 *protos. return s.impl.UpdateComponents(ctx, a0) } -func (s controller_local_stub) UpdateRoutingInfo(ctx context.Context, a0 *protos.UpdateRoutingInfoRequest) (r0 *protos.UpdateRoutingInfoReply, err error) { +func (s weaveletControl_local_stub) UpdateRoutingInfo(ctx context.Context, a0 *protos.UpdateRoutingInfoRequest) (r0 *protos.UpdateRoutingInfoReply, err error) { // Update metrics. begin := s.updateRoutingInfoMetrics.Begin() defer func() { s.updateRoutingInfoMetrics.End(begin, err != nil, 0, 0) }() span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.tracer.Start(ctx, "weaver.controller.UpdateRoutingInfo", trace.WithSpanKind(trace.SpanKindInternal)) + ctx, span = s.tracer.Start(ctx, "weaver.weaveletControl.UpdateRoutingInfo", trace.WithSpanKind(trace.SpanKindInternal)) defer func() { if err != nil { span.RecordError(err) @@ -227,15 +227,15 @@ func (s controller_local_stub) UpdateRoutingInfo(ctx context.Context, a0 *protos // Client stub implementations. -type logger_client_stub struct { +type deployerControl_client_stub struct { stub codegen.Stub logBatchMetrics *codegen.MethodMetrics } -// Check that logger_client_stub implements the Logger interface. -var _ Logger = (*logger_client_stub)(nil) +// Check that deployerControl_client_stub implements the deployerControl interface. +var _ deployerControl = (*deployerControl_client_stub)(nil) -func (s logger_client_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { +func (s deployerControl_client_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { // Update metrics. var requestBytes, replyBytes int begin := s.logBatchMetrics.Begin() @@ -244,7 +244,7 @@ func (s logger_client_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBat span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.stub.Tracer().Start(ctx, "weaver.Logger.LogBatch", trace.WithSpanKind(trace.SpanKindClient)) + ctx, span = s.stub.Tracer().Start(ctx, "weaver.deployerControl.LogBatch", trace.WithSpanKind(trace.SpanKindClient)) } defer func() { @@ -285,7 +285,7 @@ func (s logger_client_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBat return } -type controller_client_stub struct { +type weaveletControl_client_stub struct { stub codegen.Stub getHealthMetrics *codegen.MethodMetrics getLoadMetrics *codegen.MethodMetrics @@ -295,10 +295,10 @@ type controller_client_stub struct { updateRoutingInfoMetrics *codegen.MethodMetrics } -// Check that controller_client_stub implements the controller interface. -var _ controller = (*controller_client_stub)(nil) +// Check that weaveletControl_client_stub implements the weaveletControl interface. +var _ weaveletControl = (*weaveletControl_client_stub)(nil) -func (s controller_client_stub) GetHealth(ctx context.Context, a0 *protos.GetHealthRequest) (r0 *protos.GetHealthReply, err error) { +func (s weaveletControl_client_stub) GetHealth(ctx context.Context, a0 *protos.GetHealthRequest) (r0 *protos.GetHealthReply, err error) { // Update metrics. var requestBytes, replyBytes int begin := s.getHealthMetrics.Begin() @@ -307,7 +307,7 @@ func (s controller_client_stub) GetHealth(ctx context.Context, a0 *protos.GetHea span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.stub.Tracer().Start(ctx, "weaver.controller.GetHealth", trace.WithSpanKind(trace.SpanKindClient)) + ctx, span = s.stub.Tracer().Start(ctx, "weaver.weaveletControl.GetHealth", trace.WithSpanKind(trace.SpanKindClient)) } defer func() { @@ -349,7 +349,7 @@ func (s controller_client_stub) GetHealth(ctx context.Context, a0 *protos.GetHea return } -func (s controller_client_stub) GetLoad(ctx context.Context, a0 *protos.GetLoadRequest) (r0 *protos.GetLoadReply, err error) { +func (s weaveletControl_client_stub) GetLoad(ctx context.Context, a0 *protos.GetLoadRequest) (r0 *protos.GetLoadReply, err error) { // Update metrics. var requestBytes, replyBytes int begin := s.getLoadMetrics.Begin() @@ -358,7 +358,7 @@ func (s controller_client_stub) GetLoad(ctx context.Context, a0 *protos.GetLoadR span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.stub.Tracer().Start(ctx, "weaver.controller.GetLoad", trace.WithSpanKind(trace.SpanKindClient)) + ctx, span = s.stub.Tracer().Start(ctx, "weaver.weaveletControl.GetLoad", trace.WithSpanKind(trace.SpanKindClient)) } defer func() { @@ -400,7 +400,7 @@ func (s controller_client_stub) GetLoad(ctx context.Context, a0 *protos.GetLoadR return } -func (s controller_client_stub) GetMetrics(ctx context.Context, a0 *protos.GetMetricsRequest) (r0 *protos.GetMetricsReply, err error) { +func (s weaveletControl_client_stub) GetMetrics(ctx context.Context, a0 *protos.GetMetricsRequest) (r0 *protos.GetMetricsReply, err error) { // Update metrics. var requestBytes, replyBytes int begin := s.getMetricsMetrics.Begin() @@ -409,7 +409,7 @@ func (s controller_client_stub) GetMetrics(ctx context.Context, a0 *protos.GetMe span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.stub.Tracer().Start(ctx, "weaver.controller.GetMetrics", trace.WithSpanKind(trace.SpanKindClient)) + ctx, span = s.stub.Tracer().Start(ctx, "weaver.weaveletControl.GetMetrics", trace.WithSpanKind(trace.SpanKindClient)) } defer func() { @@ -451,7 +451,7 @@ func (s controller_client_stub) GetMetrics(ctx context.Context, a0 *protos.GetMe return } -func (s controller_client_stub) GetProfile(ctx context.Context, a0 *protos.GetProfileRequest) (r0 *protos.GetProfileReply, err error) { +func (s weaveletControl_client_stub) GetProfile(ctx context.Context, a0 *protos.GetProfileRequest) (r0 *protos.GetProfileReply, err error) { // Update metrics. var requestBytes, replyBytes int begin := s.getProfileMetrics.Begin() @@ -460,7 +460,7 @@ func (s controller_client_stub) GetProfile(ctx context.Context, a0 *protos.GetPr span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.stub.Tracer().Start(ctx, "weaver.controller.GetProfile", trace.WithSpanKind(trace.SpanKindClient)) + ctx, span = s.stub.Tracer().Start(ctx, "weaver.weaveletControl.GetProfile", trace.WithSpanKind(trace.SpanKindClient)) } defer func() { @@ -502,7 +502,7 @@ func (s controller_client_stub) GetProfile(ctx context.Context, a0 *protos.GetPr return } -func (s controller_client_stub) UpdateComponents(ctx context.Context, a0 *protos.UpdateComponentsRequest) (r0 *protos.UpdateComponentsReply, err error) { +func (s weaveletControl_client_stub) UpdateComponents(ctx context.Context, a0 *protos.UpdateComponentsRequest) (r0 *protos.UpdateComponentsReply, err error) { // Update metrics. var requestBytes, replyBytes int begin := s.updateComponentsMetrics.Begin() @@ -511,7 +511,7 @@ func (s controller_client_stub) UpdateComponents(ctx context.Context, a0 *protos span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.stub.Tracer().Start(ctx, "weaver.controller.UpdateComponents", trace.WithSpanKind(trace.SpanKindClient)) + ctx, span = s.stub.Tracer().Start(ctx, "weaver.weaveletControl.UpdateComponents", trace.WithSpanKind(trace.SpanKindClient)) } defer func() { @@ -553,7 +553,7 @@ func (s controller_client_stub) UpdateComponents(ctx context.Context, a0 *protos return } -func (s controller_client_stub) UpdateRoutingInfo(ctx context.Context, a0 *protos.UpdateRoutingInfoRequest) (r0 *protos.UpdateRoutingInfoReply, err error) { +func (s weaveletControl_client_stub) UpdateRoutingInfo(ctx context.Context, a0 *protos.UpdateRoutingInfoRequest) (r0 *protos.UpdateRoutingInfoReply, err error) { // Update metrics. var requestBytes, replyBytes int begin := s.updateRoutingInfoMetrics.Begin() @@ -562,7 +562,7 @@ func (s controller_client_stub) UpdateRoutingInfo(ctx context.Context, a0 *proto span := trace.SpanFromContext(ctx) if span.SpanContext().IsValid() { // Create a child span for this method. - ctx, span = s.stub.Tracer().Start(ctx, "weaver.controller.UpdateRoutingInfo", trace.WithSpanKind(trace.SpanKindClient)) + ctx, span = s.stub.Tracer().Start(ctx, "weaver.weaveletControl.UpdateRoutingInfo", trace.WithSpanKind(trace.SpanKindClient)) } defer func() { @@ -629,16 +629,16 @@ please file an issue at https://github.com/ServiceWeaver/weaver/issues. // Server stub implementations. -type logger_server_stub struct { - impl Logger +type deployerControl_server_stub struct { + impl deployerControl addLoad func(key uint64, load float64) } -// Check that logger_server_stub implements the codegen.Server interface. -var _ codegen.Server = (*logger_server_stub)(nil) +// Check that deployerControl_server_stub implements the codegen.Server interface. +var _ codegen.Server = (*deployerControl_server_stub)(nil) // GetStubFn implements the codegen.Server interface. -func (s logger_server_stub) GetStubFn(method string) func(ctx context.Context, args []byte) ([]byte, error) { +func (s deployerControl_server_stub) GetStubFn(method string) func(ctx context.Context, args []byte) ([]byte, error) { switch method { case "LogBatch": return s.logBatch @@ -647,7 +647,7 @@ func (s logger_server_stub) GetStubFn(method string) func(ctx context.Context, a } } -func (s logger_server_stub) logBatch(ctx context.Context, args []byte) (res []byte, err error) { +func (s deployerControl_server_stub) logBatch(ctx context.Context, args []byte) (res []byte, err error) { // Catch and return any panics detected during encoding/decoding/rpc. defer func() { if err == nil { @@ -671,16 +671,16 @@ func (s logger_server_stub) logBatch(ctx context.Context, args []byte) (res []by return enc.Data(), nil } -type controller_server_stub struct { - impl controller +type weaveletControl_server_stub struct { + impl weaveletControl addLoad func(key uint64, load float64) } -// Check that controller_server_stub implements the codegen.Server interface. -var _ codegen.Server = (*controller_server_stub)(nil) +// Check that weaveletControl_server_stub implements the codegen.Server interface. +var _ codegen.Server = (*weaveletControl_server_stub)(nil) // GetStubFn implements the codegen.Server interface. -func (s controller_server_stub) GetStubFn(method string) func(ctx context.Context, args []byte) ([]byte, error) { +func (s weaveletControl_server_stub) GetStubFn(method string) func(ctx context.Context, args []byte) ([]byte, error) { switch method { case "GetHealth": return s.getHealth @@ -699,7 +699,7 @@ func (s controller_server_stub) GetStubFn(method string) func(ctx context.Contex } } -func (s controller_server_stub) getHealth(ctx context.Context, args []byte) (res []byte, err error) { +func (s weaveletControl_server_stub) getHealth(ctx context.Context, args []byte) (res []byte, err error) { // Catch and return any panics detected during encoding/decoding/rpc. defer func() { if err == nil { @@ -724,7 +724,7 @@ func (s controller_server_stub) getHealth(ctx context.Context, args []byte) (res return enc.Data(), nil } -func (s controller_server_stub) getLoad(ctx context.Context, args []byte) (res []byte, err error) { +func (s weaveletControl_server_stub) getLoad(ctx context.Context, args []byte) (res []byte, err error) { // Catch and return any panics detected during encoding/decoding/rpc. defer func() { if err == nil { @@ -749,7 +749,7 @@ func (s controller_server_stub) getLoad(ctx context.Context, args []byte) (res [ return enc.Data(), nil } -func (s controller_server_stub) getMetrics(ctx context.Context, args []byte) (res []byte, err error) { +func (s weaveletControl_server_stub) getMetrics(ctx context.Context, args []byte) (res []byte, err error) { // Catch and return any panics detected during encoding/decoding/rpc. defer func() { if err == nil { @@ -774,7 +774,7 @@ func (s controller_server_stub) getMetrics(ctx context.Context, args []byte) (re return enc.Data(), nil } -func (s controller_server_stub) getProfile(ctx context.Context, args []byte) (res []byte, err error) { +func (s weaveletControl_server_stub) getProfile(ctx context.Context, args []byte) (res []byte, err error) { // Catch and return any panics detected during encoding/decoding/rpc. defer func() { if err == nil { @@ -799,7 +799,7 @@ func (s controller_server_stub) getProfile(ctx context.Context, args []byte) (re return enc.Data(), nil } -func (s controller_server_stub) updateComponents(ctx context.Context, args []byte) (res []byte, err error) { +func (s weaveletControl_server_stub) updateComponents(ctx context.Context, args []byte) (res []byte, err error) { // Catch and return any panics detected during encoding/decoding/rpc. defer func() { if err == nil { @@ -824,7 +824,7 @@ func (s controller_server_stub) updateComponents(ctx context.Context, args []byt return enc.Data(), nil } -func (s controller_server_stub) updateRoutingInfo(ctx context.Context, args []byte) (res []byte, err error) { +func (s weaveletControl_server_stub) updateRoutingInfo(ctx context.Context, args []byte) (res []byte, err error) { // Catch and return any panics detected during encoding/decoding/rpc. defer func() { if err == nil { @@ -851,51 +851,51 @@ func (s controller_server_stub) updateRoutingInfo(ctx context.Context, args []by // Reflect stub implementations. -type logger_reflect_stub struct { +type deployerControl_reflect_stub struct { caller func(string, context.Context, []any, []any) error } -// Check that logger_reflect_stub implements the Logger interface. -var _ Logger = (*logger_reflect_stub)(nil) +// Check that deployerControl_reflect_stub implements the deployerControl interface. +var _ deployerControl = (*deployerControl_reflect_stub)(nil) -func (s logger_reflect_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { +func (s deployerControl_reflect_stub) LogBatch(ctx context.Context, a0 *protos.LogEntryBatch) (err error) { err = s.caller("LogBatch", ctx, []any{a0}, []any{}) return } -type controller_reflect_stub struct { +type weaveletControl_reflect_stub struct { caller func(string, context.Context, []any, []any) error } -// Check that controller_reflect_stub implements the controller interface. -var _ controller = (*controller_reflect_stub)(nil) +// Check that weaveletControl_reflect_stub implements the weaveletControl interface. +var _ weaveletControl = (*weaveletControl_reflect_stub)(nil) -func (s controller_reflect_stub) GetHealth(ctx context.Context, a0 *protos.GetHealthRequest) (r0 *protos.GetHealthReply, err error) { +func (s weaveletControl_reflect_stub) GetHealth(ctx context.Context, a0 *protos.GetHealthRequest) (r0 *protos.GetHealthReply, err error) { err = s.caller("GetHealth", ctx, []any{a0}, []any{&r0}) return } -func (s controller_reflect_stub) GetLoad(ctx context.Context, a0 *protos.GetLoadRequest) (r0 *protos.GetLoadReply, err error) { +func (s weaveletControl_reflect_stub) GetLoad(ctx context.Context, a0 *protos.GetLoadRequest) (r0 *protos.GetLoadReply, err error) { err = s.caller("GetLoad", ctx, []any{a0}, []any{&r0}) return } -func (s controller_reflect_stub) GetMetrics(ctx context.Context, a0 *protos.GetMetricsRequest) (r0 *protos.GetMetricsReply, err error) { +func (s weaveletControl_reflect_stub) GetMetrics(ctx context.Context, a0 *protos.GetMetricsRequest) (r0 *protos.GetMetricsReply, err error) { err = s.caller("GetMetrics", ctx, []any{a0}, []any{&r0}) return } -func (s controller_reflect_stub) GetProfile(ctx context.Context, a0 *protos.GetProfileRequest) (r0 *protos.GetProfileReply, err error) { +func (s weaveletControl_reflect_stub) GetProfile(ctx context.Context, a0 *protos.GetProfileRequest) (r0 *protos.GetProfileReply, err error) { err = s.caller("GetProfile", ctx, []any{a0}, []any{&r0}) return } -func (s controller_reflect_stub) UpdateComponents(ctx context.Context, a0 *protos.UpdateComponentsRequest) (r0 *protos.UpdateComponentsReply, err error) { +func (s weaveletControl_reflect_stub) UpdateComponents(ctx context.Context, a0 *protos.UpdateComponentsRequest) (r0 *protos.UpdateComponentsReply, err error) { err = s.caller("UpdateComponents", ctx, []any{a0}, []any{&r0}) return } -func (s controller_reflect_stub) UpdateRoutingInfo(ctx context.Context, a0 *protos.UpdateRoutingInfoRequest) (r0 *protos.UpdateRoutingInfoReply, err error) { +func (s weaveletControl_reflect_stub) UpdateRoutingInfo(ctx context.Context, a0 *protos.UpdateRoutingInfoRequest) (r0 *protos.UpdateRoutingInfoReply, err error) { err = s.caller("UpdateRoutingInfo", ctx, []any{a0}, []any{&r0}) return } diff --git a/weavertest/deployer.go b/weavertest/deployer.go index 30a1f76f2..f7ecc54ff 100644 --- a/weavertest/deployer.go +++ b/weavertest/deployer.go @@ -74,18 +74,18 @@ type deployer struct { // A group contains information about a co-location group. type group struct { - name string // group name - controllers []control.Controller // weavelet controllers - components map[string]bool // started components - addresses map[string]bool // weavelet addresses - subscribers map[string][]control.Controller // routing info subscribers, by component + name string // group name + controllers []control.WeaveletControl // weavelet controllers + components map[string]bool // started components + addresses map[string]bool // weavelet addresses + subscribers map[string][]control.WeaveletControl // routing info subscribers, by component } // handler handles a connection to a weavelet. type handler struct { *deployer group *group - controller control.Controller + controller control.WeaveletControl subscribed map[string]bool // routing info subscriptions, by component } @@ -400,11 +400,12 @@ func (d *deployer) startGroup(g *group) error { if err := d.registerReplica(g, e.WeaveletInfo()); err != nil { return err } - if _, err := e.Controller().UpdateComponents(d.ctx, update); err != nil { + wc := e.WeaveletControl() + if _, err := wc.UpdateComponents(d.ctx, update); err != nil { return err } - handler.controller = e.Controller() - g.controllers = append(g.controllers, e.Controller()) + handler.controller = wc + g.controllers = append(g.controllers, wc) } return nil } @@ -430,7 +431,7 @@ func (d *deployer) group(component string) *group { name: name, components: map[string]bool{}, addresses: map[string]bool{}, - subscribers: map[string][]control.Controller{}, + subscribers: map[string][]control.WeaveletControl{}, } d.groups[name] = g }