Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make profile collection a component method. #691

Merged
merged 3 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,8 @@ func (*noopController) UpdateComponents(context.Context, *protos.UpdateComponent
func (*noopController) UpdateRoutingInfo(context.Context, *protos.UpdateRoutingInfoRequest) (*protos.UpdateRoutingInfoReply, error) {
return nil, fmt.Errorf("controller.UpdateRoutingInfo not implemented")
}

// GetProfile implements controller nterface.
func (*noopController) GetProfile(context.Context, *protos.GetProfileRequest) (*protos.GetProfileReply, error) {
return nil, fmt.Errorf("controller.GetProfile not implemented")
}
5 changes: 2 additions & 3 deletions godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,6 @@ github.com/ServiceWeaver/weaver/internal/env
fmt
strings
github.com/ServiceWeaver/weaver/internal/envelope/conn
bytes
context
fmt
github.com/ServiceWeaver/weaver/internal/queue
Expand All @@ -321,9 +320,7 @@ github.com/ServiceWeaver/weaver/internal/envelope/conn
google.golang.org/protobuf/proto
io
net
runtime/pprof
sync
time
github.com/ServiceWeaver/weaver/internal/files
fmt
os
Expand Down Expand Up @@ -697,6 +694,7 @@ github.com/ServiceWeaver/weaver/internal/versioned
github.com/google/uuid
sync
github.com/ServiceWeaver/weaver/internal/weaver
bytes
context
crypto/tls
crypto/x509
Expand Down Expand Up @@ -744,6 +742,7 @@ github.com/ServiceWeaver/weaver/internal/weaver
os/signal
path/filepath
reflect
runtime/pprof
sort
strings
sync
Expand Down
3 changes: 3 additions & 0 deletions internal/control/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@ type Controller interface {

// UpdateRoutingInfo updates the weavelet with a component's most recent routing info.
UpdateRoutingInfo(context.Context, *protos.UpdateRoutingInfoRequest) (*protos.UpdateRoutingInfoReply, error)

// GetProfile gets a profile from the weavelet.
GetProfile(context.Context, *protos.GetProfileRequest) (*protos.GetProfileReply, error)
}
13 changes: 0 additions & 13 deletions internal/envelope/conn/envelope_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,6 @@ func (e *EnvelopeConn) GetLoadRPC() (*protos.LoadReport, error) {
return reply.GetLoadReply.Load, nil
}

// GetProfileRPC gets a profile from the weavelet. There can only be one
// outstanding GetProfileRPC at a time.
func (e *EnvelopeConn) GetProfileRPC(req *protos.GetProfileRequest) ([]byte, error) {
reply, err := e.rpc(&protos.EnvelopeMsg{GetProfileRequest: req})
if err != nil {
return nil, err
}
if reply.GetProfileReply == nil {
return nil, fmt.Errorf("nil GetProfileReply received from weavelet")
}
return reply.GetProfileReply.Data, nil
}

func (e *EnvelopeConn) rpc(request *protos.EnvelopeMsg) (*protos.WeaveletMsg, error) {
response, err := e.conn.doBlockingRPC(request)
if err != nil {
Expand Down
44 changes: 0 additions & 44 deletions internal/envelope/conn/weavelet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
package conn

import (
"bytes"
"context"
"fmt"
"io"
"net"
"runtime/pprof"
"time"

"github.com/ServiceWeaver/weaver/runtime"
"github.com/ServiceWeaver/weaver/runtime/metrics"
"github.com/ServiceWeaver/weaver/runtime/protomsg"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/ServiceWeaver/weaver/runtime/version"
)
Expand Down Expand Up @@ -176,22 +172,6 @@ func (w *WeaveletConn) handleMessage(handler WeaveletHandler, msg *protos.Envelo
Error: errstring(err),
GetLoadReply: reply,
})
case msg.GetProfileRequest != nil:
// This is a blocking call, and therefore we process it in a separate
// goroutine. Note that this will cause profiling requests to be
// processed out-of-order w.r.t. other messages.
id := msg.Id
req := protomsg.Clone(msg.GetProfileRequest)
go func() {
data, err := Profile(req)
// Reply with profile data.
w.conn.send(&protos.WeaveletMsg{
Id: -id,
Error: errstring(err),
GetProfileReply: &protos.GetProfileReply{Data: data},
})
}()
return nil
default:
err := fmt.Errorf("weavelet_conn: unexpected message %+v", msg)
w.conn.cleanup(err)
Expand Down Expand Up @@ -304,27 +284,3 @@ func (w *WeaveletConn) SendLogEntry(entry *protos.LogEntry) error {
func (w *WeaveletConn) SendTraceSpans(spans *protos.TraceSpans) error {
return w.conn.send(&protos.WeaveletMsg{TraceSpans: spans})
}

// Profile collects profiles for the weavelet.
func Profile(req *protos.GetProfileRequest) ([]byte, error) {
var buf bytes.Buffer
switch req.ProfileType {
case protos.ProfileType_Heap:
if err := pprof.WriteHeapProfile(&buf); err != nil {
return nil, err
}
case protos.ProfileType_CPU:
if req.CpuDurationNs == 0 {
return nil, fmt.Errorf("invalid zero duration for the CPU profile collection")
}
dur := time.Duration(req.CpuDurationNs) * time.Nanosecond
if err := pprof.StartCPUProfile(&buf); err != nil {
return nil, err
}
time.Sleep(dur)
pprof.StopCPUProfile()
default:
return nil, fmt.Errorf("unspecified profile collection type")
}
return buf.Bytes(), nil
}
42 changes: 42 additions & 0 deletions internal/testdeployer/remoteweavelet_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/ServiceWeaver/weaver/runtime/logging"
"github.com/ServiceWeaver/weaver/runtime/protomsg"
"github.com/ServiceWeaver/weaver/runtime/protos"
"github.com/google/pprof/profile"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -806,3 +807,44 @@ func TestUpdateBadRoutingInfo(t *testing.T) {
t.Fatal(activateErr)
}
}

func TestProfile(t *testing.T) {
// Ensure things are started.
ctx := context.Background()
placement := map[string][]string{
"1": {componenta},
"2": {componentb, componentc},
"3": {componentb, componentc},
}
d := deploy(t, ctx, placement)
defer d.shutdown()
testComponents(d)

target := d.weavelets["2"]
for _, typ := range []protos.ProfileType{protos.ProfileType_Heap, protos.ProfileType_CPU} {
typ := typ
t.Run(typ.String(), func(t *testing.T) {
// Send a profiling request and wait for a reply.
start := time.Now()
req := &protos.GetProfileRequest{
ProfileType: typ,
CpuDurationNs: int64(100 * time.Millisecond / time.Nanosecond),
}
reply, err := target.wlet.GetProfile(ctx, req)
if err != nil {
t.Fatal(err)
}
end := time.Now()

// Small sanity check of the profile.
p, err := profile.ParseData(reply.Data)
if err != nil {
t.Fatal(err)
}
when := time.Unix(0, p.TimeNanos)
if when.Before(start) || end.Before(when) {
t.Errorf("profile timestamp %v is not in profiling time range [%v,%v]", when, start, end)
}
})
}
}
1 change: 1 addition & 0 deletions internal/tool/multi/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,7 @@ func runProfiling(_ context.Context, req *protos.GetProfileRequest, processes ma
for _, envelopes := range processes {
group := make([]func() ([]byte, error), 0, len(envelopes))
for _, e := range envelopes {
e := e
group = append(group, func() ([]byte, error) {
return e.GetProfile(req)
})
Expand Down
55 changes: 55 additions & 0 deletions internal/weaver/profile.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package weaver

import (
"bytes"
"context"
"fmt"
"runtime/pprof"
"time"

"github.com/ServiceWeaver/weaver/runtime/protos"
)

// getProfile collects a profile of this process.
func getProfile(ctx context.Context, req *protos.GetProfileRequest) ([]byte, error) {
var buf bytes.Buffer
switch req.ProfileType {
case protos.ProfileType_Heap:
if err := pprof.WriteHeapProfile(&buf); err != nil {
return nil, err
}
case protos.ProfileType_CPU:
if req.CpuDurationNs == 0 {
return nil, fmt.Errorf("invalid zero duration for the CPU profile collection")
}
dur := time.Duration(req.CpuDurationNs) * time.Nanosecond
if err := pprof.StartCPUProfile(&buf); err != nil {
return nil, err
}
select {
case <-ctx.Done():
pprof.StopCPUProfile()
return nil, ctx.Err()
case <-time.After(dur):
// All done
}
pprof.StopCPUProfile()
default:
return nil, fmt.Errorf("unspecified profile collection type")
}
return buf.Bytes(), nil
}
11 changes: 10 additions & 1 deletion internal/weaver/remoteweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (w *RemoteWeavelet) UpdateComponents(ctx context.Context, req *protos.Updat
return &protos.UpdateComponentsReply{}, errors.Join(errs...)
}

// UpdateRoutingInfo implements the conn.WeaverHandler interface.
// UpdateRoutingInfo implements controller.UpdateRoutingInfo.
func (w *RemoteWeavelet) UpdateRoutingInfo(ctx context.Context, req *protos.UpdateRoutingInfoRequest) (reply *protos.UpdateRoutingInfoReply, err error) {
if req.RoutingInfo == nil {
w.syslogger.Error("Failed to update nil routing info")
Expand Down Expand Up @@ -571,6 +571,15 @@ func (w *RemoteWeavelet) UpdateRoutingInfo(ctx context.Context, req *protos.Upda
return &protos.UpdateRoutingInfoReply{}, nil
}

// GetProfile implements controller.GetProfile.
func (w *RemoteWeavelet) GetProfile(ctx context.Context, req *protos.GetProfileRequest) (*protos.GetProfileReply, error) {
data, err := getProfile(ctx, req)
if err != nil {
return nil, err
}
return &protos.GetProfileReply{Data: data}, nil
}

// Info returns the EnvelopeInfo received from the envelope.
func (w *RemoteWeavelet) Info() *protos.EnvelopeInfo {
return w.conn.EnvelopeInfo()
Expand Down
5 changes: 2 additions & 3 deletions internal/weaver/singleweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (

"github.com/ServiceWeaver/weaver/internal/config"
"github.com/ServiceWeaver/weaver/internal/env"
"github.com/ServiceWeaver/weaver/internal/envelope/conn"
imetrics "github.com/ServiceWeaver/weaver/internal/metrics"
"github.com/ServiceWeaver/weaver/internal/status"
"github.com/ServiceWeaver/weaver/internal/tool/single"
Expand Down Expand Up @@ -494,8 +493,8 @@ func (w *SingleWeavelet) Metrics(context.Context) (*status.Metrics, error) {
}

// Profile implements the status.Server interface.
func (w *SingleWeavelet) Profile(_ context.Context, req *protos.GetProfileRequest) (*protos.GetProfileReply, error) {
data, err := conn.Profile(req)
func (w *SingleWeavelet) Profile(ctx context.Context, req *protos.GetProfileRequest) (*protos.GetProfileReply, error) {
data, err := getProfile(ctx, req)
return &protos.GetProfileReply{Data: data}, err
}

Expand Down
25 changes: 4 additions & 21 deletions runtime/envelope/envelope.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,6 @@ type Envelope struct {
stdoutPipe io.ReadCloser // stdout pipe from the weavelet
stderrPipe io.ReadCloser // stderr pipe from the weavelet
controller control.Controller // Stub that talks to the weavelet controller

mu sync.Mutex // guards the following fields
profiling bool // are we currently collecting a profile?
}

// Options contains optional arguments for the envelope.
Expand Down Expand Up @@ -302,20 +299,6 @@ func (e *Envelope) Serve(h EnvelopeHandler) error {
return stopErr
}

// toggleProfiling compares the value of e.profiling to the given expected
// value, and if they are the same, toggles the value of e.profiling and
// returns true; otherwise, it leaves the value of e.profiling unchanged
// and returns false.
func (e *Envelope) toggleProfiling(expected bool) bool {
e.mu.Lock()
defer e.mu.Unlock()
if e.profiling != expected {
return false
}
e.profiling = !e.profiling
return true
}

// Pid returns the process id of the subprocess.
func (e *Envelope) Pid() int {
return e.cmd.Process.Pid
Expand All @@ -337,11 +320,11 @@ func (e *Envelope) GetHealth() protos.HealthStatus {

// GetProfile gets a profile from the weavelet.
func (e *Envelope) GetProfile(req *protos.GetProfileRequest) ([]byte, error) {
if ok := e.toggleProfiling(false); !ok {
return nil, fmt.Errorf("profiling already in progress")
reply, err := e.controller.GetProfile(context.TODO(), req)
if err != nil {
return nil, err
}
defer e.toggleProfiling(true)
return e.conn.GetProfileRPC(req)
return reply.Data, nil
}

// GetMetrics returns a weavelet's metrics.
Expand Down
Loading