Skip to content

Commit

Permalink
Allow yaml slurm serializer
Browse files Browse the repository at this point in the history
  • Loading branch information
dougnd committed Mar 17, 2024
1 parent dc0b89d commit b2f816d
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 147 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.21
require (
github.com/mattn/go-sqlite3 v1.14.18
golang.org/x/crypto v0.13.0
gopkg.in/yaml.v3 v3.0.1
nhooyr.io/websocket v1.8.10
)

Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,9 @@ golang.org/x/sys v0.12.0 h1:CM0HF96J0hcLAwsHPJZjfdNzs0gftsLfgKt57wWHJ0o=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.12.0 h1:/ZfYdc3zq+q02Rv9vGqTeSItdzZTSNDmfTi0mBAuidU=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
nhooyr.io/websocket v1.8.10 h1:mv4p+MnGrLDcPlBoWsvPP7XCzTYMXP9F9eIGoKbgx7Q=
nhooyr.io/websocket v1.8.10/go.mod h1:rN9OFWIUwuxg4fR5tELlYC04bXYowCP9GX47ivo2l+c=
4 changes: 4 additions & 0 deletions jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type Job struct {
UsedMemory Bytes

Nodes []Node

// Raw holds a scheduler specific type. It can be used by the scheduler
// plugin when creating a nodestats session.
Raw interface{}
}

func (j *Job) IsRunning() bool {
Expand Down
190 changes: 101 additions & 89 deletions slurm/sacct.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,69 +10,56 @@ import (
"time"

"github.com/clemsonciti/jobperf"
"gopkg.in/yaml.v3"
)

type sacctResponse struct {
Meta meta `json:"meta"`
Jobs []sacctJob `json:"jobs"`
Warnings []any `json:"warnings"`
Errors []any `json:"errors"`
Meta meta `json:"meta" yaml:"meta"`
Jobs []sacctJob `json:"jobs" yaml:"jobs"`
Warnings []any `json:"warnings" yaml:"warnings"`
Errors []any `json:"errors" yaml:"errors"`
}

type sacctTresUnit struct {
Type string `json:"type"`
Name string `json:"name"`
ID int `json:"id"`
Count int `json:"count"`
Type string `json:"type" yaml:"type"`
Name string `json:"name" yaml:"name"`
ID int `json:"id" yaml:"id"`
Count int `json:"count" yaml:"count"`
}

type sacctTresTaskUnit struct {
sacctTresUnit
Node string `json:"node"`
Task int `json:"task"`
Node string `json:"node" yaml:"node"`
Task int `json:"task" yaml:"task"`
}

type sacctJobTres struct {
Allocated []sacctTresUnit `json:"allocated"`
Requested []sacctTresUnit `json:"requested"`
Allocated []sacctTresUnit `json:"allocated" yaml:"allocated"`
Requested []sacctTresUnit `json:"requested" yaml:"requested"`
}

type sacctJobState struct {
CurrentRaw json.RawMessage `json:"current"`
Reason string `json:"reason"`
}

func (s *sacctJobState) Current() string {
var stringState string
err := json.Unmarshal(s.CurrentRaw, &stringState)
if err == nil {
return stringState
}
var sliceState []string
err = json.Unmarshal(s.CurrentRaw, &sliceState)
if err == nil {
return sliceState[0]
}

return "Unknown"
Current jobStatus `json:"current" yaml:"current"`
Reason string `json:"reason" yaml:"reason"`
}

type sacctJobTime struct {
// Elapsed seems to be in seconds.
Elapsed int `json:"elapsed"`
End optionalValue `json:"end"`
Start optionalValue `json:"start"`
Eligible int `json:"eligible"`
Submission int `json:"submission"`
Suspended int `json:"suspended"`
Limit optionalValue `json:"limit"` // Limit seems to be in minutes.
Total sacctCPUTime `json:"total"`
User sacctCPUTime `json:"user"`
Elapsed int `json:"elapsed" yaml:"elapsed"`
End optionalValue `json:"end" yaml:"end"`
Start optionalValue `json:"start" yaml:"start"`
Eligible int `json:"eligible" yaml:"eligible"`
Submission int `json:"submission" yaml:"submission"`
Suspended int `json:"suspended" yaml:"suspended"`
// Limit seems to be in minutes.
Limit optionalValue `json:"limit" yaml:"limit"`
Total sacctCPUTime `json:"total" yaml:"total"`
User sacctCPUTime `json:"user" yaml:"user"`
}

type sacctCPUTime struct {
Seconds int `json:"seconds"`
Microseconds int `json:"microseconds"`
Seconds int `json:"seconds" yaml:"seconds"`
Microseconds int `json:"microseconds" yaml:"microseconds"`
}

func (t *sacctCPUTime) toDuration() time.Duration {
Expand All @@ -81,40 +68,55 @@ func (t *sacctCPUTime) toDuration() time.Duration {
}

type sacctJobStepTime struct {
Elapsed int `json:"elapsed"`
End optionalValue `json:"end"`
Start optionalValue `json:"start"`
System sacctCPUTime `json:"system"`
User sacctCPUTime `json:"user"`
Elapsed int `json:"elapsed" yaml:"elapsed"`
End optionalValue `json:"end" yaml:"end"`
Start optionalValue `json:"start" yaml:"start"`
System sacctCPUTime `json:"system" yaml:"system"`
User sacctCPUTime `json:"user" yaml:"user"`
}

type sacctJobStepExitCode struct {
StatusRaw json.RawMessage `json:"status"`
ReturnCode optionalValue `json:"return_code"`
Status jobStatus `json:"status" yaml:"status"`
ReturnCode optionalValue `json:"return_code" yaml:"return_code"`
}

func (s *sacctJobStepExitCode) Status() string {
type jobStatus string

func (s *jobStatus) UnmarshalYAML(n *yaml.Node) error {
var stringState string
err := json.Unmarshal(s.StatusRaw, &stringState)
if err == nil {
return stringState
if n.Decode(&stringState) == nil {
*s = jobStatus(stringState)
return nil
}
var sliceState []string
err = json.Unmarshal(s.StatusRaw, &sliceState)
if err == nil {
return sliceState[0]
if err := n.Decode(&sliceState); err != nil {
return err
}
*s = jobStatus(sliceState[0])
return nil
}

return "Unknown"
func (s *jobStatus) UnmarshalJSON(b []byte) error {
var stringState string
if json.Unmarshal(b, &stringState) == nil {
*s = jobStatus(stringState)
return nil
}
var sliceState []string
if err := json.Unmarshal(b, &sliceState); err != nil {
return err
}
*s = jobStatus(sliceState[0])
return nil
}

type sacctJobStepNodes struct {
Count int `json:"count"`
Range string `json:"range"`
List []string `json:"list"`
Count int `json:"count" yaml:"count"`
Range string `json:"range" yaml:"range"`
List []string `json:"list" yaml:"list"`
}
type sacctJobStepTasks struct {
Count int `json:"count"`
Count int `json:"count" yaml:"count"`
}
type sacctJobStepInfo struct {
/*
Expand All @@ -123,44 +125,44 @@ type sacctJobStepInfo struct {
StepID string `json:"step_id"`
} `json:"id"`
*/
Name string `json:"name"`
Name string `json:"name" yaml:"name"`
}

type sacctJobStepTresStats struct {
Total []sacctTresUnit `json:"total"`
Average []sacctTresUnit `json:"average"`
Min []sacctTresTaskUnit `json:"min"`
Max []sacctTresTaskUnit `json:"max"`
Total []sacctTresUnit `json:"total" yaml:"total"`
Average []sacctTresUnit `json:"average" yaml:"average"`
Min []sacctTresTaskUnit `json:"min" yaml:"min"`
Max []sacctTresTaskUnit `json:"max" yaml:"max"`
}

type sacctJobStepTres struct {
Requested sacctJobStepTresStats `json:"requested"`
Consumed sacctJobStepTresStats `json:"consumed"`
Allocated []sacctTresUnit `json:"allocated"`
Requested sacctJobStepTresStats `json:"requested" yaml:"requested"`
Consumed sacctJobStepTresStats `json:"consumed" yaml:"consumed"`
Allocated []sacctTresUnit `json:"allocated" yaml:"allocated"`
}

type sacctJobStep struct {
//State string `json:"state"`
Time sacctJobStepTime `json:"time"`
ExitCode sacctJobStepExitCode `json:"exit_code"`
Nodes sacctJobStepNodes `json:"nodes"`
Tasks sacctJobStepTasks `json:"tasks"`
Step sacctJobStepInfo `json:"step"`
Tres sacctJobStepTres `json:"tres"`
Time sacctJobStepTime `json:"time" yaml:"time"`
ExitCode sacctJobStepExitCode `json:"exit_code" yaml:"exit_code"`
Nodes sacctJobStepNodes `json:"nodes" yaml:"nodes"`
Tasks sacctJobStepTasks `json:"tasks" yaml:"tasks"`
Step sacctJobStepInfo `json:"step" yaml:"step"`
Tres sacctJobStepTres `json:"tres" yaml:"tres"`
}

type sacctJob struct {
Account string `json:"account"`
AllocationNodes int `json:"allocation_nodes"`
Cluster string `json:"cluster"`
JobID int `json:"job_id"`
Name string `json:"name"`
Partition string `json:"partition"`
User string `json:"user"`
State sacctJobState `json:"state"`
Steps []sacctJobStep `json:"steps"`
Tres sacctJobTres `json:"tres"`
Time sacctJobTime `json:"time"`
Account string `json:"account" yaml:"account"`
AllocationNodes int `json:"allocation_nodes" yaml:"allocation_nodes"`
Cluster string `json:"cluster" yaml:"cluster"`
JobID int `json:"job_id" yaml:"job_id"`
Name string `json:"name" yaml:"name"`
Partition string `json:"partition" yaml:"partition"`
User string `json:"user" yaml:"user"`
State sacctJobState `json:"state" yaml:"state"`
Steps []sacctJobStep `json:"steps" yaml:"steps"`
Tres sacctJobTres `json:"tres" yaml:"tres"`
Time sacctJobTime `json:"time" yaml:"time"`
}

func getTresByTypeName(tres []sacctTresUnit, typeName string, name string) (bool, int) {
Expand All @@ -180,9 +182,14 @@ func getTresTaskByTypeName(tres []sacctTresTaskUnit, typeName string, name strin
return false, 0
}

func sacctGetJobByID(jobID string) (*jobperf.Job, error) {
func (e jobEngine) sacctGetJobByID(jobID string) (*jobperf.Job, error) {
slog.Debug("fetching job by id", "jobID", jobID, "method", "sacct")
cmd := exec.Command("sacct", "--job", jobID, "--json")
var cmd *exec.Cmd
if e.mode == slurmModeJSON {
cmd = exec.Command("sacct", "--job", jobID, "--json")
} else {
cmd = exec.Command("sacct", "--job", jobID, "--yaml")
}
var out bytes.Buffer
cmd.Stdout = &out
err := cmd.Run()
Expand All @@ -191,24 +198,29 @@ func sacctGetJobByID(jobID string) (*jobperf.Job, error) {
}

var parsed sacctResponse
err = json.Unmarshal(out.Bytes(), &parsed)
if e.mode == slurmModeJSON {
err = json.Unmarshal(out.Bytes(), &parsed)
} else {
err = yaml.Unmarshal(out.Bytes(), &parsed)
}
if err != nil {
return nil, fmt.Errorf("failed to parse sacct response for job id %v: %w", jobID, err)
}
if len(parsed.Jobs) != 1 {
return nil, fmt.Errorf("unexpected number of jobs returned from sacct: %v", len(parsed.Jobs))
}
parsedJob := parsed.Jobs[0]
parsedJob := &parsed.Jobs[0]

jobOut := jobperf.Job{
ID: strconv.Itoa(parsedJob.JobID),
Name: parsedJob.Name,
Owner: parsedJob.User,
State: parsedJob.State.Current(),
State: string(parsedJob.State.Current),
StartTime: time.Unix(parsedJob.Time.Start.number, 0),
Walltime: time.Duration(parsedJob.Time.Limit.number) * time.Minute,
UsedWalltime: time.Duration(parsedJob.Time.Elapsed) * time.Second,
UsedCPUTime: parsedJob.Time.Total.toDuration(),
Raw: parsedJob,
}
_, jobOut.CoresTotal = getTresByTypeName(parsedJob.Tres.Allocated, "cpu", "")
_, memMb := getTresByTypeName(parsedJob.Tres.Allocated, "mem", "")
Expand Down
49 changes: 43 additions & 6 deletions slurm/slurm.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"os/exec"
"os/user"
"strconv"
"time"

"github.com/clemsonciti/jobperf"
Expand All @@ -24,15 +25,37 @@ func IsAvailable() bool {
return false
}

type jobEngine struct{}
type slurmMode int

const (
slurmModeJSON slurmMode = iota
slurmModeYAML
)

type jobEngine struct {
mode slurmMode
}

func NewJobEngine() jobperf.JobEngine {
cmd := exec.Command("sacct", "--json")
err := cmd.Run()
if err == nil {
slog.Debug("using json slurm mode")
return jobEngine{mode: slurmModeJSON}
}
cmd = exec.Command("sacct", "--yaml")
err = cmd.Run()
if err == nil {
slog.Debug("using yaml slurm mode")
return jobEngine{mode: slurmModeYAML}
}
slog.Debug("both json and yaml fail")
return jobEngine{}
}

func (_ jobEngine) GetJobByID(jobID string) (*jobperf.Job, error) {
squeueJob, squeueErr := squeueGetJobByID(jobID)
sacctJob, sacctErr := sacctGetJobByID(jobID)
func (e jobEngine) GetJobByID(jobID string) (*jobperf.Job, error) {
squeueJob, squeueErr := e.squeueGetJobByID(jobID)
sacctJob, sacctErr := e.sacctGetJobByID(jobID)
slog.Debug("fetching complete", "squeueErr", squeueErr, "sacctErr", sacctErr)

if squeueErr != nil && sacctErr != nil {
Expand Down Expand Up @@ -71,8 +94,22 @@ func (_ jobEngine) NodeStatsSession(j *jobperf.Job, hostname string) (jobperf.No
"--exact",
"--nodes", "1",
"--ntasks", "1",
"-w", hostname,
ex, "-nodestats"}
"-w", hostname}
if rawSacctJob, ok := j.Raw.(*sacctJob); ok {
if rawSacctJob.Partition != "" {
params = append(params, "-p", rawSacctJob.Partition)
}
}
// The "-time.Second*5" is a fudge factor and may not be needed. I'm not
// sure the behaviour if you request a step with timelimit longer than we
// have left.
timeLeft := j.Walltime - time.Since(j.StartTime) - time.Second*5
if timeLeft > 0 {
params = append(params, "-t", strconv.Itoa(int(timeLeft.Minutes())))
}

params = append(params,
ex, "-nodestats")
if me.Username == "root" {
cmdName = "sudo"
params = append([]string{"-u", j.Owner, "srun"}, params...)
Expand Down
Loading

0 comments on commit b2f816d

Please sign in to comment.