Skip to content

Commit

Permalink
Make profile collection a component method. (#691)
Browse files Browse the repository at this point in the history
Also, stop tracking whether or not CPU profiling is already on
explicitly; let the Go runtime do it.

* Update godeps

* Fix unused function lint error
  • Loading branch information
ghemawat authored Dec 19, 2023
1 parent ded6ee6 commit ca81bd4
Show file tree
Hide file tree
Showing 16 changed files with 818 additions and 757 deletions.
5 changes: 5 additions & 0 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,8 @@ func (*noopController) UpdateComponents(context.Context, *protos.UpdateComponent
func (*noopController) UpdateRoutingInfo(context.Context, *protos.UpdateRoutingInfoRequest) (*protos.UpdateRoutingInfoReply, error) {
return nil, fmt.Errorf("controller.UpdateRoutingInfo not implemented")
}

// GetProfile implements controller nterface.
func (*noopController) GetProfile(context.Context, *protos.GetProfileRequest) (*protos.GetProfileReply, error) {
return nil, fmt.Errorf("controller.GetProfile not implemented")
}
5 changes: 2 additions & 3 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ github.com/ServiceWeaver/weaver/internal/env
fmt
strings
github.com/ServiceWeaver/weaver/internal/envelope/conn
bytes
context
fmt
github.com/ServiceWeaver/weaver/internal/queue
Expand All @@ -321,9 +320,7 @@ github.com/ServiceWeaver/weaver/internal/envelope/conn
google.golang.org/protobuf/proto
io
net
runtime/pprof
sync
time
github.com/ServiceWeaver/weaver/internal/files
fmt
os
Expand Down Expand Up @@ -697,6 +694,7 @@ github.com/ServiceWeaver/weaver/internal/versioned
github.com/google/uuid
sync
github.com/ServiceWeaver/weaver/internal/weaver
bytes
context
crypto/tls
crypto/x509
Expand Down Expand Up @@ -744,6 +742,7 @@ github.com/ServiceWeaver/weaver/internal/weaver
os/signal
path/filepath
reflect
runtime/pprof
sort
strings
sync
Expand Down
3 changes: 3 additions & 0 deletions internal/control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ type Controller interface {

// UpdateRoutingInfo updates the weavelet with a component's most recent routing info.
UpdateRoutingInfo(context.Context, *protos.UpdateRoutingInfoRequest) (*protos.UpdateRoutingInfoReply, error)

// GetProfile gets a profile from the weavelet.
GetProfile(context.Context, *protos.GetProfileRequest) (*protos.GetProfileReply, error)
}
13 changes: 0 additions & 13 deletions internal/envelope/conn/envelope_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,6 @@ func (e *EnvelopeConn) GetLoadRPC() (*protos.LoadReport, error) {
return reply.GetLoadReply.Load, nil
}

// GetProfileRPC gets a profile from the weavelet. There can only be one
// outstanding GetProfileRPC at a time.
func (e *EnvelopeConn) GetProfileRPC(req *protos.GetProfileRequest) ([]byte, error) {
reply, err := e.rpc(&protos.EnvelopeMsg{GetProfileRequest: req})
if err != nil {
return nil, err
}
if reply.GetProfileReply == nil {
return nil, fmt.Errorf("nil GetProfileReply received from weavelet")
}
return reply.GetProfileReply.Data, nil
}

func (e *EnvelopeConn) rpc(request *protos.EnvelopeMsg) (*protos.WeaveletMsg, error) {
response, err := e.conn.doBlockingRPC(request)
if err != nil {
Expand Down
44 changes: 0 additions & 44 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
package conn

import (
"bytes"
"context"
"fmt"
"io"
"net"
"runtime/pprof"
"time"

"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/metrics"
"github.com/ServiceWeaver/weaver/runtime/protomsg"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/ServiceWeaver/weaver/runtime/version"
)
Expand Down Expand Up @@ -176,22 +172,6 @@ func (w *WeaveletConn) handleMessage(handler WeaveletHandler, msg *protos.Envelo
Error: errstring(err),
GetLoadReply: reply,
})
case msg.GetProfileRequest != nil:
// This is a blocking call, and therefore we process it in a separate
// goroutine. Note that this will cause profiling requests to be
// processed out-of-order w.r.t. other messages.
id := msg.Id
req := protomsg.Clone(msg.GetProfileRequest)
go func() {
data, err := Profile(req)
// Reply with profile data.
w.conn.send(&protos.WeaveletMsg{
Id: -id,
Error: errstring(err),
GetProfileReply: &protos.GetProfileReply{Data: data},
})
}()
return nil
default:
err := fmt.Errorf("weavelet_conn: unexpected message %+v", msg)
w.conn.cleanup(err)
Expand Down Expand Up @@ -304,27 +284,3 @@ func (w *WeaveletConn) SendLogEntry(entry *protos.LogEntry) error {
func (w *WeaveletConn) SendTraceSpans(spans *protos.TraceSpans) error {
return w.conn.send(&protos.WeaveletMsg{TraceSpans: spans})
}

// Profile collects profiles for the weavelet.
func Profile(req *protos.GetProfileRequest) ([]byte, error) {
var buf bytes.Buffer
switch req.ProfileType {
case protos.ProfileType_Heap:
if err := pprof.WriteHeapProfile(&buf); err != nil {
return nil, err
}
case protos.ProfileType_CPU:
if req.CpuDurationNs == 0 {
return nil, fmt.Errorf("invalid zero duration for the CPU profile collection")
}
dur := time.Duration(req.CpuDurationNs) * time.Nanosecond
if err := pprof.StartCPUProfile(&buf); err != nil {
return nil, err
}
time.Sleep(dur)
pprof.StopCPUProfile()
default:
return nil, fmt.Errorf("unspecified profile collection type")
}
return buf.Bytes(), nil
}
42 changes: 42 additions & 0 deletions internal/testdeployer/remoteweavelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/protomsg"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/google/pprof/profile"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -806,3 +807,44 @@ func TestUpdateBadRoutingInfo(t *testing.T) {
t.Fatal(activateErr)
}
}

func TestProfile(t *testing.T) {
// Ensure things are started.
ctx := context.Background()
placement := map[string][]string{
"1": {componenta},
"2": {componentb, componentc},
"3": {componentb, componentc},
}
d := deploy(t, ctx, placement)
defer d.shutdown()
testComponents(d)

target := d.weavelets["2"]
for _, typ := range []protos.ProfileType{protos.ProfileType_Heap, protos.ProfileType_CPU} {
typ := typ
t.Run(typ.String(), func(t *testing.T) {
// Send a profiling request and wait for a reply.
start := time.Now()
req := &protos.GetProfileRequest{
ProfileType: typ,
CpuDurationNs: int64(100 * time.Millisecond / time.Nanosecond),
}
reply, err := target.wlet.GetProfile(ctx, req)
if err != nil {
t.Fatal(err)
}
end := time.Now()

// Small sanity check of the profile.
p, err := profile.ParseData(reply.Data)
if err != nil {
t.Fatal(err)
}
when := time.Unix(0, p.TimeNanos)
if when.Before(start) || end.Before(when) {
t.Errorf("profile timestamp %v is not in profiling time range [%v,%v]", when, start, end)
}
})
}
}
1 change: 1 addition & 0 deletions internal/tool/multi/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ func runProfiling(_ context.Context, req *protos.GetProfileRequest, processes ma
for _, envelopes := range processes {
group := make([]func() ([]byte, error), 0, len(envelopes))
for _, e := range envelopes {
e := e
group = append(group, func() ([]byte, error) {
return e.GetProfile(req)
})
Expand Down
55 changes: 55 additions & 0 deletions internal/weaver/profile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package weaver

import (
"bytes"
"context"
"fmt"
"runtime/pprof"
"time"

"github.com/ServiceWeaver/weaver/runtime/protos"
)

// getProfile collects a profile of this process.
func getProfile(ctx context.Context, req *protos.GetProfileRequest) ([]byte, error) {
var buf bytes.Buffer
switch req.ProfileType {
case protos.ProfileType_Heap:
if err := pprof.WriteHeapProfile(&buf); err != nil {
return nil, err
}
case protos.ProfileType_CPU:
if req.CpuDurationNs == 0 {
return nil, fmt.Errorf("invalid zero duration for the CPU profile collection")
}
dur := time.Duration(req.CpuDurationNs) * time.Nanosecond
if err := pprof.StartCPUProfile(&buf); err != nil {
return nil, err
}
select {
case <-ctx.Done():
pprof.StopCPUProfile()
return nil, ctx.Err()
case <-time.After(dur):
// All done
}
pprof.StopCPUProfile()
default:
return nil, fmt.Errorf("unspecified profile collection type")
}
return buf.Bytes(), nil
}
11 changes: 10 additions & 1 deletion internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (w *RemoteWeavelet) UpdateComponents(ctx context.Context, req *protos.Updat
return &protos.UpdateComponentsReply{}, errors.Join(errs...)
}

// UpdateRoutingInfo implements the conn.WeaverHandler interface.
// UpdateRoutingInfo implements controller.UpdateRoutingInfo.
func (w *RemoteWeavelet) UpdateRoutingInfo(ctx context.Context, req *protos.UpdateRoutingInfoRequest) (reply *protos.UpdateRoutingInfoReply, err error) {
if req.RoutingInfo == nil {
w.syslogger.Error("Failed to update nil routing info")
Expand Down Expand Up @@ -571,6 +571,15 @@ func (w *RemoteWeavelet) UpdateRoutingInfo(ctx context.Context, req *protos.Upda
return &protos.UpdateRoutingInfoReply{}, nil
}

// GetProfile implements controller.GetProfile.
func (w *RemoteWeavelet) GetProfile(ctx context.Context, req *protos.GetProfileRequest) (*protos.GetProfileReply, error) {
data, err := getProfile(ctx, req)
if err != nil {
return nil, err
}
return &protos.GetProfileReply{Data: data}, nil
}

// Info returns the EnvelopeInfo received from the envelope.
func (w *RemoteWeavelet) Info() *protos.EnvelopeInfo {
return w.conn.EnvelopeInfo()
Expand Down
5 changes: 2 additions & 3 deletions internal/weaver/singleweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/ServiceWeaver/weaver/internal/config"
"github.com/ServiceWeaver/weaver/internal/env"
"github.com/ServiceWeaver/weaver/internal/envelope/conn"
imetrics "github.com/ServiceWeaver/weaver/internal/metrics"
"github.com/ServiceWeaver/weaver/internal/status"
"github.com/ServiceWeaver/weaver/internal/tool/single"
Expand Down Expand Up @@ -494,8 +493,8 @@ func (w *SingleWeavelet) Metrics(context.Context) (*status.Metrics, error) {
}

// Profile implements the status.Server interface.
func (w *SingleWeavelet) Profile(_ context.Context, req *protos.GetProfileRequest) (*protos.GetProfileReply, error) {
data, err := conn.Profile(req)
func (w *SingleWeavelet) Profile(ctx context.Context, req *protos.GetProfileRequest) (*protos.GetProfileReply, error) {
data, err := getProfile(ctx, req)
return &protos.GetProfileReply{Data: data}, err
}

Expand Down
25 changes: 4 additions & 21 deletions runtime/envelope/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ type Envelope struct {
stdoutPipe io.ReadCloser // stdout pipe from the weavelet
stderrPipe io.ReadCloser // stderr pipe from the weavelet
controller control.Controller // Stub that talks to the weavelet controller

mu sync.Mutex // guards the following fields
profiling bool // are we currently collecting a profile?
}

// Options contains optional arguments for the envelope.
Expand Down Expand Up @@ -302,20 +299,6 @@ func (e *Envelope) Serve(h EnvelopeHandler) error {
return stopErr
}

// toggleProfiling compares the value of e.profiling to the given expected
// value, and if they are the same, toggles the value of e.profiling and
// returns true; otherwise, it leaves the value of e.profiling unchanged
// and returns false.
func (e *Envelope) toggleProfiling(expected bool) bool {
e.mu.Lock()
defer e.mu.Unlock()
if e.profiling != expected {
return false
}
e.profiling = !e.profiling
return true
}

// Pid returns the process id of the subprocess.
func (e *Envelope) Pid() int {
return e.cmd.Process.Pid
Expand All @@ -337,11 +320,11 @@ func (e *Envelope) GetHealth() protos.HealthStatus {

// GetProfile gets a profile from the weavelet.
func (e *Envelope) GetProfile(req *protos.GetProfileRequest) ([]byte, error) {
if ok := e.toggleProfiling(false); !ok {
return nil, fmt.Errorf("profiling already in progress")
reply, err := e.controller.GetProfile(context.TODO(), req)
if err != nil {
return nil, err
}
defer e.toggleProfiling(true)
return e.conn.GetProfileRPC(req)
return reply.Data, nil
}

// GetMetrics returns a weavelet's metrics.
Expand Down
Loading

0 comments on commit ca81bd4

Please sign in to comment.