-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathorchestrator.go
135 lines (107 loc) · 3.37 KB
/
orchestrator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package orchestrator
import (
"errors"
"fmt"
"log"
"reflect"
)
// Every TransactionalRoute is created from multiple State those are connected with and edge
// Each edge has a priority and a condition
// To go to the doAction step the edge sorted by priority and the first do-Action which comply with the condition called
// In this scenario retry and backoff algorithm can be define as a edge which it's priority will be decrease with each time execution
// Orchestrator handover context between registered TransactionalRoute, based on their identifier
const DefaultRecoveryRouteId = "RECOVERY_ROUTE"
type (
orchestrator struct {
// registered routes
routes map[string]Route
// TransactionalRoute handler
rh *routeRunner
ec chan error
}
defaultRecoveryRoute struct {
}
)
func (drr *defaultRecoveryRoute) GetRouteId() string {
return "RECOVERY_ROUTE"
}
func (drr *defaultRecoveryRoute) GetStartState() *State {
return &State{
name: "default_recovery_state",
transitions: nil,
action: func(ctx *context) error {
return nil
},
}
}
func (drr *defaultRecoveryRoute) GetEndpoints() []*Endpoint {
return nil
}
// NewOrchestrator create and init orchestrator
func NewOrchestrator() *orchestrator {
return &orchestrator{
routes: make(map[string]Route),
}
}
// Register is for register a route with it's unique identifier
func (o *orchestrator) Register(r Route) error {
if o.routes[r.GetRouteId()] != nil {
return errors.New(fmt.Sprintf("duplicate route id %s", r.GetRouteId()))
}
if r.GetRouteId() == DefaultRecoveryRouteId {
return errors.New(DefaultRecoveryRouteId + " route id is reserved")
}
o.routes[r.GetRouteId()] = r
return nil
}
// Initialization define recovery route and define transition between routes (HierarchicalRoute feature)
func (o *orchestrator) Initialization(recoveryRoute Route) error {
if err := o.defineRecoveryRoute(recoveryRoute); err != nil {
return err
}
return o.defineHierarchicalRouteTransitions()
}
// Exec start the execution process from the route id with a context
func (o *orchestrator) Exec(from string, ctx *context, errCh chan error) {
if o.routes[from] == nil {
log.Fatalf("route %s not found", from)
}
o.ec = errCh
rh := newRouteRunner(o.routes[from].GetStartState(), o.routes[DefaultRecoveryRouteId].GetStartState())
rh.run(ctx, o.ec)
}
func (o *orchestrator) defineHierarchicalRouteTransitions() error {
for _, s := range o.routes {
for _, e := range s.GetEndpoints() {
if o.routes[e.To] == nil {
return errors.New(fmt.Sprintf("route id %s not found", e.To))
}
e.State.createTransition(o.routes[e.To].GetStartState(), Default,
func(ctx context) bool {
return true
})
if reflect.TypeOf(o.routes[e.To]) == reflect.TypeOf(&TransactionalRoute{}) {
o.routes[e.To].GetStartState().createTransition(e.State, Default,
func(ctx context) bool {
return ctx.GetVariable(transactionalRouteStatusHeaderKey) == transactionalRouteStatusRollback
})
}
}
}
return nil
}
// "RECOVERY_ROUTE" route id is reserved for recovery route
func (o *orchestrator) defineRecoveryRoute(route Route) error {
r := route
if route == nil {
r = &defaultRecoveryRoute{}
}
if r.GetRouteId() != DefaultRecoveryRouteId {
return errors.New("recovery route id is not 'RECOVERY_ROUTE'")
}
o.routes[DefaultRecoveryRouteId] = r
return nil
}
func (o *orchestrator) shutdown() error {
return nil
}