-
Notifications
You must be signed in to change notification settings - Fork 392
/
Copy pathagent.go
166 lines (143 loc) · 3.78 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package cmd
import (
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/distribworks/dkron/v4/dkron"
"github.com/hashicorp/go-plugin"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"github.com/spf13/viper"
)
var ShutdownCh chan (struct{})
var agent *dkron.Agent
const (
// gracefulTimeout controls how long we wait before forcefully terminating
gracefulTimeout = 3 * time.Hour
)
// agentCmd represents the agent command
var agentCmd = &cobra.Command{
Use: "agent",
Short: "Start a dkron agent",
Long: `Start a dkron agent that schedules jobs, listens for executions and runs executors.
It also runs a web UI.`,
PreRunE: func(cmd *cobra.Command, args []string) error {
return initConfig()
},
// Run will execute the main functions of the agent command.
// This includes the main eventloop and starting the server if enabled.
//
// The returned value is the exit code.
// protoc -I proto/ proto/executor.proto --go_out=plugins=grpc:dkron/
RunE: func(cmd *cobra.Command, args []string) error {
return agentRun(args...)
},
}
func init() {
dkronCmd.AddCommand(agentCmd)
agentCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file path")
agentCmd.Flags().AddFlagSet(dkron.ConfigFlagSet())
_ = viper.BindPFlags(agentCmd.Flags())
}
func agentRun(args ...string) error {
// Make sure we clean up any managed plugins at the end of this
p := &Plugins{
LogLevel: config.LogLevel,
NodeName: config.NodeName,
}
if err := p.DiscoverPlugins(); err != nil {
log.Fatal(err)
}
plugins := dkron.Plugins{
Processors: p.Processors,
Executors: p.Executors,
}
agent = dkron.NewAgent(config, dkron.WithPlugins(plugins))
if err := agent.Start(); err != nil {
return err
}
exit := handleSignals()
if exit != 0 {
return fmt.Errorf("exit status: %d", exit)
}
return nil
}
// handleSignals blocks until we get an exit-causing signal
func handleSignals() int {
signalCh := make(chan os.Signal, 4)
signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP)
WAIT:
// Wait for a signal
var sig os.Signal
select {
case s := <-signalCh:
sig = s
case err := <-agent.RetryJoinCh():
fmt.Println("[ERR] agent: Retry join failed: ", err)
return 1
case <-ShutdownCh:
sig = os.Interrupt
}
fmt.Printf("Caught signal: %v", sig)
// Check if this is a SIGHUP
if sig == syscall.SIGHUP {
handleReload()
goto WAIT
}
// Fail fast if not doing a graceful leave
if sig != syscall.SIGTERM && sig != os.Interrupt {
return 1
}
// Attempt a graceful leave
log.Info("agent: Gracefully shutting down agent...")
go func() {
if err := agent.Stop(); err != nil {
fmt.Printf("Error: %s", err)
log.Error(fmt.Sprintf("Error: %s", err))
return
}
}()
gracefulCh := make(chan struct{})
for {
log.Info("Waiting for jobs to finish...")
if agent.GetRunningJobs() < 1 {
log.Info("No jobs left. Exiting.")
break
}
time.Sleep(1 * time.Second)
}
plugin.CleanupClients()
close(gracefulCh)
// Wait for leave or another signal
select {
case <-signalCh:
return 1
case <-time.After(gracefulTimeout):
return 1
case <-gracefulCh:
return 0
}
}
// handleReload is invoked when we should reload our configs, e.g. SIGHUP
func handleReload() {
fmt.Println("Reloading configuration...")
initConfig()
//Config reloading will also reload Notification settings
agent.UpdateTags(config.Tags)
}
// UnmarshalTags is a utility function which takes a slice of strings in
// key=value format and returns them as a tag mapping.
func UnmarshalTags(tags []string) (map[string]string, error) {
result := make(map[string]string)
for _, tag := range tags {
parts := strings.SplitN(tag, "=", 2)
if len(parts) != 2 || len(parts[0]) == 0 {
return nil, fmt.Errorf("invalid tag: '%s'", tag)
}
result[parts[0]] = parts[1]
}
return result, nil
}