Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

status: map weavelet id to pids for multi, single and ssh deployment. #767

Merged
merged 4 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion godeps.txt
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,6 @@ github.com/ServiceWeaver/weaver/internal/tool/ssh/impl
os/exec
path/filepath
reflect
slices
sync
syscall
time
Expand Down
4 changes: 2 additions & 2 deletions internal/benchmarks/payload.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions internal/status/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,21 @@ var (
deploymentHTML string
deploymentTemplate = template.Must(template.New("deployment").Funcs(template.FuncMap{
"shorten": logging.ShortenComponent,
"pidjoin": func(pids []int64) string {
s := make([]string, len(pids))
for i, x := range pids {
s[i] = fmt.Sprint(x)
"pidjoin": func(replicas []*Replica) string {
s := make([]string, len(replicas))
for i, x := range replicas {
s[i] = fmt.Sprint(x.Pid)
}
return strings.Join(s, ", ")
},
"widjoin": func(replicas []*Replica) string {
s := make([]string, len(replicas))
for i, x := range replicas {
s[i] = x.WeaveletId[0:8]
}
return strings.Join(s, ", ")

},
"age": func(t *timestamppb.Timestamp) string {
return time.Since(t.AsTime()).Truncate(time.Second).String()
},
Expand Down
16 changes: 9 additions & 7 deletions internal/status/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,24 @@ func formatComponents(w io.Writer, statuses []*Status) {
title := []colors.Text{{{S: "COMPONENTS", Bold: true}}}
t := colors.NewTabularizer(w, title, colors.PrefixDim)
defer t.Flush()
t.Row("APP", "DEPLOYMENT", "COMPONENT", "REPLICA PIDS")
t.Row("APP", "DEPLOYMENT", "COMPONENT", "REPLICA PIDS", "WEAVELET IDS")
for _, status := range statuses {
sort.Slice(status.Components, func(i, j int) bool {
return status.Components[i].Name < status.Components[j].Name
})
for _, component := range status.Components {
prefix, _ := formatId(status.DeploymentId)
c := logging.ShortenComponent(component.Name)
sort.Slice(component.Pids, func(i, j int) bool {
return component.Pids[i] < component.Pids[j]
sort.Slice(component.Replicas, func(i, j int) bool {
return component.Replicas[i].Pid < component.Replicas[j].Pid
})
pids := make([]string, len(component.Pids))
for i, pid := range component.Pids {
pids[i] = fmt.Sprint(pid)
pids := make([]string, len(component.Replicas))
weaveletIds := make([]string, len(component.Replicas))
for i, replica := range component.Replicas {
pids[i] = fmt.Sprint(replica.Pid)
weaveletIds[i] = replica.WeaveletId[0:8]
}
t.Row(status.App, prefix, c, strings.Join(pids, ", "))
t.Row(status.App, prefix, c, strings.Join(pids, ", "), strings.Join(weaveletIds, ", "))
}
}
}
Expand Down
237 changes: 156 additions & 81 deletions internal/status/status.pb.go

Large diffs are not rendered by default.

12 changes: 9 additions & 3 deletions internal/status/status.proto
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,15 @@ message Status {

// Component describes a Service Weaver component.
message Component {
string name = 1; // component name (e.g., Cache)
repeated int64 pids = 3; // PIDs of component replicas
repeated Method methods = 4; // methods
string name = 1; // component name (e.g., Cache)
repeated Replica replicas = 3; // replica details
repeated Method methods = 4; // methods
}

// Replica stores info related to replica
message Replica {
int64 pid = 1; // replica process id
string weaveletId = 2; // replica weavelet id
}

// Method describes a Component method.
Expand Down
6 changes: 4 additions & 2 deletions internal/status/templates/deployment.html
Original file line number Diff line number Diff line change
Expand Up @@ -152,14 +152,16 @@
<th>Component</th>
<th>Replication</th>
<th>PIDs</th>
<th>Weavelet IDs</th>
</tr>
</thead>
<tbody>
{{range $c := .Components}}
<tr>
<td>{{shorten $c.Name}}</td>
<td>{{len $c.Pids}}</td>
<td>{{pidjoin $c.Pids}}</td>
<td>{{len $c.Replicas}}</td>
<td>{{pidjoin $c.Replicas}}</td>
<td>{{widjoin $c.Replicas}}</td>
</tr>
{{end}}
</tbody>
Expand Down
15 changes: 8 additions & 7 deletions internal/tool/multi/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type deployer struct {
type group struct {
name string // group name
envelopes []*envelope.Envelope // envelopes, one per weavelet
pids []int64 // weavelet pids
replicas []*status.Replica // stores replica info such as pid, weavelet id
started map[string]bool // started components
addresses map[string]bool // weavelet addresses
assignments map[string]*protos.Assignment // assignment, by component
Expand Down Expand Up @@ -352,7 +352,10 @@ func (d *deployer) startColocationGroup(g *group) error {
if !ok {
panic("multi deployer child must be a real process")
}
if err := d.registerReplica(g, e.WeaveletAddress(), pid); err != nil {
// Add replica info to group
g.replicas = append(g.replicas, &status.Replica{Pid: int64(pid), WeaveletId: info.Id})
// Register replica
if err := d.registerReplica(g, e.WeaveletAddress()); err != nil {
return err
}
if err := e.UpdateComponents(components); err != nil {
Expand Down Expand Up @@ -507,15 +510,13 @@ func (d *deployer) activateComponent(req *protos.ActivateComponentRequest) error

// registerReplica registers the information about a colocation group replica
// (i.e., a weavelet).
func (d *deployer) registerReplica(g *group, replicaAddr string, pid int) error {
func (d *deployer) registerReplica(g *group, replicaAddr string) error {
// Update addresses and pids.
if g.addresses[replicaAddr] {
// Replica already registered.
return nil
}
g.addresses[replicaAddr] = true
g.pids = append(g.pids, int64(pid))

// Update all assignments.
replicas := maps.Keys(g.addresses)
for component, assignment := range g.assignments {
Expand Down Expand Up @@ -643,8 +644,8 @@ func (d *deployer) Status(context.Context) (*status.Status, error) {
for _, group := range d.groups {
for component := range group.started {
c := &status.Component{
Name: component,
Pids: slices.Clone(group.pids),
Name: component,
Replicas: group.replicas,
}
components = append(components, c)

Expand Down
4 changes: 2 additions & 2 deletions internal/tool/single/single.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 6 additions & 5 deletions internal/tool/ssh/impl/babysitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func RunBabysitter(ctx context.Context) error {
if !ok {
panic("ssh deployer child must be a real process")
}
if err := b.registerReplica(e.WeaveletAddress(), pid); err != nil {
if err := b.registerReplica(e.WeaveletAddress(), pid, id); err != nil {
return err
}
c := metricsCollector{logger: b.logger, envelope: e, info: info}
Expand Down Expand Up @@ -180,15 +180,16 @@ func (b *babysitter) ActivateComponent(_ context.Context, req *protos.ActivateCo

// registerReplica registers the information about a colocation group replica
// (i.e., a weavelet).
func (b *babysitter) registerReplica(replicaAddr string, pid int) error {
func (b *babysitter) registerReplica(replicaAddr string, pid int, weaveletId string) error {
if err := protomsg.Call(b.ctx, protomsg.CallArgs{
Client: http.DefaultClient,
Addr: b.info.ManagerAddr,
URLPath: registerReplicaURL,
Request: &ReplicaToRegister{
Group: b.info.Group,
Address: replicaAddr,
Pid: int64(pid),
Group: b.info.Group,
Address: replicaAddr,
Pid: int64(pid),
WeaveletId: weaveletId,
},
}); err != nil {
return err
Expand Down
13 changes: 5 additions & 8 deletions internal/tool/ssh/impl/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
"os"
"os/exec"
"path/filepath"
"slices"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -131,7 +130,7 @@ type group struct {
started bool // has this group been started?
addresses map[string]bool // weavelet addresses
routings map[string]*versioned.Versioned[*protos.RoutingInfo] // routing info, by component
pids []int64 // weavelet pids
replicas []*status.Replica // stores replica info such as pid, weavelet id
}

type proxyInfo struct {
Expand Down Expand Up @@ -315,13 +314,10 @@ func (m *manager) Status(ctx context.Context) (*status.Status, error) {
g.components.Lock()
cs := maps.Keys(g.components.Val)
g.components.Unlock()
g.mu.Lock()
flouthoc marked this conversation as resolved.
Show resolved Hide resolved
pids := slices.Clone(g.pids)
g.mu.Unlock()
for _, component := range cs {
c := &status.Component{
Name: component,
Pids: pids,
Name: component,
Replicas: g.replicas,
}
components = append(components, c)

Expand Down Expand Up @@ -411,6 +407,7 @@ func (m *manager) group(component string) *group {
addresses: map[string]bool{},
components: versioned.Version(map[string]bool{}),
routings: map[string]*versioned.Versioned[*protos.RoutingInfo]{},
replicas: []*status.Replica{},
}
m.groups[name] = g
}
Expand Down Expand Up @@ -471,7 +468,7 @@ func (m *manager) registerReplica(_ context.Context, req *ReplicaToRegister) err
return true
}
g.addresses[req.Address] = true
g.pids = append(g.pids, req.Pid)
g.replicas = append(g.replicas, &status.Replica{Pid: req.Pid, WeaveletId: req.WeaveletId})
return false
}
if record() {
Expand Down
24 changes: 17 additions & 7 deletions internal/tool/ssh/impl/ssh.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions internal/tool/ssh/impl/ssh.proto
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ message BabysitterMetrics {
// a given colocation group (i.e., a weavelet).
message ReplicaToRegister {
string group = 1;
string address = 2; // Replica internal address.
int64 pid = 3; // Replica pid.
string address = 2; // Replica internal address.
int64 pid = 3; // Replica pid.
string weaveletId = 4; // Replica weavelet id
}
6 changes: 3 additions & 3 deletions internal/weaver/singleweavelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,15 +426,15 @@ func (w *SingleWeavelet) ServeStatus(ctx context.Context) error {
func (w *SingleWeavelet) Status(context.Context) (*status.Status, error) {
w.mu.Lock()
defer w.mu.Unlock()

pid := int64(os.Getpid())
stats := w.stats.GetStatsStatusz()
var components []*status.Component
for component := range w.components {
c := &status.Component{
Name: component,
Pids: []int64{pid},
Name: component,
Replicas: []*status.Replica{},
}
c.Replicas = append(c.Replicas, &status.Replica{Pid: pid, WeaveletId: w.id})
components = append(components, c)

// TODO(mwhittaker): Unify with ui package and remove duplication.
Expand Down
4 changes: 2 additions & 2 deletions runtime/protos/config.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions runtime/protos/runtime.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions weavertest/internal/protos/ping.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading