-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathmaintenance.go
195 lines (167 loc) · 5.07 KB
/
maintenance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
// Copyright 2021-2022, Offchain Labs, Inc.
// For license information, see https://github.com/nitro/blob/master/LICENSE
package arbnode
import (
"context"
"fmt"
"strconv"
"strings"
"time"
flag "github.com/spf13/pflag"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/nitro/arbnode/redislock"
"github.com/offchainlabs/nitro/execution"
"github.com/offchainlabs/nitro/util/stopwaiter"
)
// Regularly runs db compaction if configured
type MaintenanceRunner struct {
stopwaiter.StopWaiter
exec execution.FullExecutionClient
config MaintenanceConfigFetcher
seqCoordinator *SeqCoordinator
dbs []ethdb.Database
lastMaintenance time.Time
// lock is used to ensures that at any given time, only single node is on
// maintenance mode.
lock *redislock.Simple
}
type MaintenanceConfig struct {
TimeOfDay string `koanf:"time-of-day" reload:"hot"`
Lock redislock.SimpleCfg `koanf:"lock" reload:"hot"`
// Generated: the minutes since start of UTC day to compact at
minutesAfterMidnight int
enabled bool
}
// Returns true if successful
func (c *MaintenanceConfig) parseDbCompactionTime() bool {
if c.TimeOfDay == "" {
return true
}
parts := strings.Split(c.TimeOfDay, ":")
if len(parts) != 2 {
return false
}
hours, err := strconv.Atoi(parts[0])
if err != nil || hours >= 24 {
return false
}
minutes, err := strconv.Atoi(parts[1])
if err != nil || minutes >= 60 {
return false
}
c.enabled = true
c.minutesAfterMidnight = hours*60 + minutes
return true
}
func (c *MaintenanceConfig) Validate() error {
if !c.parseDbCompactionTime() {
return fmt.Errorf("expected sequencer coordinator db compaction time to be in 24-hour HH:MM format but got \"%v\"", c.TimeOfDay)
}
return nil
}
func MaintenanceConfigAddOptions(prefix string, f *flag.FlagSet) {
f.String(prefix+".time-of-day", DefaultMaintenanceConfig.TimeOfDay, "UTC 24-hour time of day to run maintenance (currently only db compaction) at (e.g. 15:00)")
redislock.AddConfigOptions(prefix+".lock", f)
}
var DefaultMaintenanceConfig = MaintenanceConfig{
TimeOfDay: "",
Lock: redislock.DefaultCfg,
minutesAfterMidnight: 0,
}
type MaintenanceConfigFetcher func() *MaintenanceConfig
func NewMaintenanceRunner(config MaintenanceConfigFetcher, seqCoordinator *SeqCoordinator, dbs []ethdb.Database, exec execution.FullExecutionClient) (*MaintenanceRunner, error) {
cfg := config()
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("validating config: %w", err)
}
res := &MaintenanceRunner{
exec: exec,
config: config,
seqCoordinator: seqCoordinator,
dbs: dbs,
lastMaintenance: time.Now().UTC(),
}
if seqCoordinator != nil {
c := func() *redislock.SimpleCfg { return &cfg.Lock }
r := func() bool { return true } // always ready to lock
rl, err := redislock.NewSimple(seqCoordinator.RedisCoordinator().Client, c, r)
if err != nil {
return nil, fmt.Errorf("creating new simple redis lock: %w", err)
}
res.lock = rl
}
return res, nil
}
func (mr *MaintenanceRunner) Start(ctxIn context.Context) {
mr.StopWaiter.Start(ctxIn, mr)
mr.CallIteratively(mr.maybeRunMaintenance)
}
func wentPastTimeOfDay(before time.Time, after time.Time, timeOfDay int) bool {
if !after.After(before) {
return false
}
if after.Sub(before) >= time.Hour*24 {
return true
}
prevMinutes := before.Hour()*60 + before.Minute()
newMinutes := after.Hour()*60 + after.Minute()
if newMinutes < prevMinutes {
newMinutes += 60 * 24
}
dbCompactionMinutes := timeOfDay
if dbCompactionMinutes < prevMinutes {
dbCompactionMinutes += 60 * 24
}
return prevMinutes < dbCompactionMinutes && newMinutes >= dbCompactionMinutes
}
func (mr *MaintenanceRunner) maybeRunMaintenance(ctx context.Context) time.Duration {
config := mr.config()
if !config.enabled {
return time.Minute
}
now := time.Now().UTC()
if !wentPastTimeOfDay(mr.lastMaintenance, now, config.minutesAfterMidnight) {
return time.Minute
}
if mr.seqCoordinator == nil {
mr.lastMaintenance = now
mr.runMaintenance()
return time.Minute
}
if !mr.lock.AttemptLock(ctx) {
return time.Minute
}
defer mr.lock.Release(ctx)
log.Info("Attempting avoiding lockout and handing off", "targetTime", config.TimeOfDay)
// Avoid lockout for the sequencer and try to handoff.
if mr.seqCoordinator.AvoidLockout(ctx) && mr.seqCoordinator.TryToHandoffChosenOne(ctx) {
mr.lastMaintenance = now
mr.runMaintenance()
}
defer mr.seqCoordinator.SeekLockout(ctx) // needs called even if c.Zombify returns false
return time.Minute
}
func (mr *MaintenanceRunner) runMaintenance() {
log.Info("Compacting databases (this may take a while...)")
results := make(chan error, len(mr.dbs))
expected := 0
for _, db := range mr.dbs {
expected++
db := db
go func() {
results <- db.Compact(nil, nil)
}()
}
expected++
go func() {
results <- mr.exec.Maintenance()
}()
for i := 0; i < expected; i++ {
err := <-results
if err != nil {
log.Warn("maintenance error", "err", err)
}
}
log.Info("Done compacting databases")
}