Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(plan): use lanes to start services with dependencies #437

6 changes: 3 additions & 3 deletions internals/daemon/api_services.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func v1PostServices(c *Command, r *http.Request, _ *UserState) Response {
if err != nil {
break
}
taskSet, err = servstate.Start(st, services)
taskSet, err = servstate.Start(st, services, servmgr)
case "stop":
services, err = servmgr.StopOrder(payload.Services)
if err != nil {
Expand All @@ -134,7 +134,7 @@ func v1PostServices(c *Command, r *http.Request, _ *UserState) Response {
break
}
var startTasks *state.TaskSet
startTasks, err = servstate.Start(st, services)
startTasks, err = servstate.Start(st, services, servmgr)
if err != nil {
break
}
Expand All @@ -154,7 +154,7 @@ func v1PostServices(c *Command, r *http.Request, _ *UserState) Response {
break
}
var startTasks *state.TaskSet
startTasks, err = servstate.Start(st, startNames)
startTasks, err = servstate.Start(st, startNames, servmgr)
if err != nil {
break
}
Expand Down
2 changes: 1 addition & 1 deletion internals/overlord/servstate/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,7 @@ func (s *S) testServiceLogs(c *C, outputs map[string]string) {

func (s *S) startServices(c *C, services []string) *state.Change {
s.st.Lock()
ts, err := servstate.Start(s.st, services)
ts, err := servstate.Start(s.st, services, s.manager)
c.Check(err, IsNil)
chg := s.st.NewChange("test", "Start test")
chg.AddAll(ts)
Expand Down
69 changes: 63 additions & 6 deletions internals/overlord/servstate/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,85 @@ import (
"fmt"

"github.com/canonical/pebble/internals/overlord/state"
"github.com/canonical/pebble/internals/plan"
)

// ServiceRequest holds the details required to perform service tasks.
type ServiceRequest struct {
Name string
}

func get_or_create_lane(s *state.State, service *plan.Service, service_lane_mapping map[string]int) int {
IronCore864 marked this conversation as resolved.
Show resolved Hide resolved
// if the service has been mapped to a lane
if lane, ok := service_lane_mapping[service.Name]; ok {
return lane
}

// if any dependency has been mapped to a lane
all_dependencies := append(append(service.Requires, service.Before...), service.After...)
for _, dependency := range all_dependencies {
if lane, ok := service_lane_mapping[dependency]; ok {
return lane
}
}

// neither the service itself nor any of its dependencies is mapped to an existing lane
return s.NewLane()
}

func joinLane(s *state.State, task *state.Task, service *plan.Service, service_lane_mapping map[string]int, lane_tasks_mapping map[int][]*state.Task) {
lane := get_or_create_lane(s, service, service_lane_mapping)

task.JoinLane(lane)

// map task to lane
if _, ok := lane_tasks_mapping[lane]; !ok {
lane_tasks_mapping[lane] = nil
}
lane_tasks_mapping[lane] = append(lane_tasks_mapping[lane], task)

// map the service's dependencies to the same lane
service_lane_mapping[service.Name] = lane
all_dependencies := append(append(service.Requires, service.Before...), service.After...)
for _, dependency := range all_dependencies {
service_lane_mapping[dependency] = lane
}
}

func handleWaitFor(lane_tasks_mapping map[int][]*state.Task) {
for _, tasks := range lane_tasks_mapping {
for i := 1; i < len(tasks); i++ {
tasks[i].WaitFor(tasks[i-1])
}
}
}

// Start creates and returns a task set for starting the given services.
func Start(s *state.State, services []string) (*state.TaskSet, error) {
func Start(s *state.State, names []string, m *ServiceManager) (*state.TaskSet, error) {
IronCore864 marked this conversation as resolved.
Show resolved Hide resolved
services := m.getPlan().Services
service_lane_mapping := make(map[string]int)
lane_tasks_mapping := make(map[int][]*state.Task)

var tasks []*state.Task
for _, name := range services {

for _, name := range names {
service, ok := services[name]
if !ok {
return nil, fmt.Errorf("service %q does not exist", name)
}

task := s.NewTask("start", fmt.Sprintf("Start service %q", name))
req := ServiceRequest{
Name: name,
}
task.Set("service-request", &req)
if len(tasks) > 0 {
// TODO Allow non-dependent services to start in parallel.
task.WaitFor(tasks[len(tasks)-1])
}
joinLane(s, task, service, service_lane_mapping, lane_tasks_mapping)

tasks = append(tasks, task)
}

handleWaitFor(lane_tasks_mapping)

return state.NewTaskSet(tasks...), nil
}

Expand Down
109 changes: 107 additions & 2 deletions internals/overlord/servstate/request_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2014-2020 Canonical Ltd
// Copyright (c) 2014-2024 Canonical Ltd
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License version 3 as
Expand All @@ -18,13 +18,73 @@ import (
. "gopkg.in/check.v1"

"github.com/canonical/pebble/internals/overlord/servstate"
. "github.com/canonical/pebble/internals/testutil"
)

func (s *S) TestStart(c *C) {
s.newServiceManager(c)
layer := `
services:
one:
override: replace
command: /bin/sh -c "echo one; sleep 10"
startup: enabled

two:
override: replace
command: /bin/sh -c "echo two; sleep 10"
startup: enabled
`
s.planAddLayer(c, layer)
s.planChanged(c)

s.st.Lock()
defer s.st.Unlock()

tset, err := servstate.Start(s.st, []string{"one", "two"}, s.manager)
c.Assert(err, IsNil)

tasks := tset.Tasks()
c.Assert(len(tasks), Equals, 2)

c.Assert(tasks[0].Kind(), Equals, "start")
req, err := servstate.TaskServiceRequest(tasks[0])
c.Assert(err, IsNil)
c.Assert(req.Name, Equals, "one")

c.Assert(tasks[1].Kind(), Equals, "start")
req, err = servstate.TaskServiceRequest(tasks[1])
c.Assert(err, IsNil)
c.Assert(req.Name, Equals, "two")

c.Assert(tasks[0].Lanes()[0], IntNotEqual, tasks[1].Lanes()[0])
}
IronCore864 marked this conversation as resolved.
Show resolved Hide resolved

func (s *S) TestStartInTheSameLaneAfter(c *C) {
s.newServiceManager(c)
layer := `
services:
one:
override: replace
command: /bin/sh -c "echo one; sleep 10"
startup: enabled

two:
override: replace
command: /bin/sh -c "echo two; sleep 10"
startup: enabled
requires:
- one
after:
- one
`
s.planAddLayer(c, layer)
s.planChanged(c)

s.st.Lock()
defer s.st.Unlock()

tset, err := servstate.Start(s.st, []string{"one", "two"})
tset, err := servstate.Start(s.st, []string{"one", "two"}, s.manager)
c.Assert(err, IsNil)

tasks := tset.Tasks()
Expand All @@ -39,6 +99,51 @@ func (s *S) TestStart(c *C) {
req, err = servstate.TaskServiceRequest(tasks[1])
c.Assert(err, IsNil)
c.Assert(req.Name, Equals, "two")

c.Assert(tasks[0].Lanes()[0], Equals, tasks[1].Lanes()[0])
}

func (s *S) TestStartInTheSameLaneBefore(c *C) {
s.newServiceManager(c)
layer := `
services:
one:
override: replace
command: /bin/sh -c "echo one; sleep 10"
startup: enabled
requires:
- two
before:
- two

two:
override: replace
command: /bin/sh -c "echo two; sleep 10"
startup: enabled
`
s.planAddLayer(c, layer)
s.planChanged(c)

s.st.Lock()
defer s.st.Unlock()

tset, err := servstate.Start(s.st, []string{"one", "two"}, s.manager)
c.Assert(err, IsNil)

tasks := tset.Tasks()
c.Assert(len(tasks), Equals, 2)

c.Assert(tasks[0].Kind(), Equals, "start")
req, err := servstate.TaskServiceRequest(tasks[0])
c.Assert(err, IsNil)
c.Assert(req.Name, Equals, "one")

c.Assert(tasks[1].Kind(), Equals, "start")
req, err = servstate.TaskServiceRequest(tasks[1])
c.Assert(err, IsNil)
c.Assert(req.Name, Equals, "two")

c.Assert(tasks[0].Lanes()[0], Equals, tasks[1].Lanes()[0])
}

func (s *S) TestStop(c *C) {
Expand Down
Loading