-
Notifications
You must be signed in to change notification settings - Fork 392
/
Copy pathplugins.go
158 lines (135 loc) · 4.01 KB
/
plugins.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package cmd
import (
"os"
"os/exec"
"path"
"path/filepath"
"strings"
"github.com/distribworks/dkron/v4/dkron"
dkplugin "github.com/distribworks/dkron/v4/plugin"
"github.com/hashicorp/go-plugin"
"github.com/kardianos/osext"
"github.com/sirupsen/logrus"
"github.com/spf13/viper"
)
var embededPlugins = []string{"shell", "http"}
type Plugins struct {
Processors map[string]dkplugin.Processor
Executors map[string]dkplugin.Executor
LogLevel string
NodeName string
}
// Discover plugins located on disk
//
// We look in the following places for plugins:
//
// 1. Dkron configuration path
// 2. Path where Dkron is installed
//
// Whichever file is discoverd LAST wins.
func (p *Plugins) DiscoverPlugins() error {
p.Processors = make(map[string]dkplugin.Processor)
p.Executors = make(map[string]dkplugin.Executor)
pluginDir := filepath.Join("/etc", "dkron", "plugins")
if viper.ConfigFileUsed() != "" {
pluginDir = filepath.Join(filepath.Dir(viper.ConfigFileUsed()), "plugins")
}
// Look in /etc/dkron/plugins (or the used config path)
processors, err := plugin.Discover("dkron-processor-*", pluginDir)
if err != nil {
return err
}
// Look in /etc/dkron/plugins (or the used config path)
executors, err := plugin.Discover("dkron-executor-*", pluginDir)
if err != nil {
return err
}
// Next, look in the same directory as the Dkron executable, usually
// /usr/local/bin. If found, this replaces what we found in the config path.
exePath, err := osext.Executable()
if err != nil {
logrus.WithError(err).Error("Error loading exe directory")
} else {
p, err := plugin.Discover("dkron-processor-*", filepath.Dir(exePath))
if err != nil {
return err
}
processors = append(processors, p...)
e, err := plugin.Discover("dkron-executor-*", filepath.Dir(exePath))
if err != nil {
return err
}
executors = append(executors, e...)
}
for _, file := range processors {
pluginName, ok := getPluginName(file)
if !ok {
continue
}
raw, err := p.pluginFactory(file, []string{}, dkplugin.ProcessorPluginName)
if err != nil {
return err
}
p.Processors[pluginName] = raw.(dkplugin.Processor)
}
for _, file := range executors {
pluginName, ok := getPluginName(file)
if !ok {
continue
}
raw, err := p.pluginFactory(file, []string{}, dkplugin.ExecutorPluginName)
if err != nil {
return err
}
p.Executors[pluginName] = raw.(dkplugin.Executor)
}
// Load the embeded plugins
for _, pluginName := range embededPlugins {
raw, err := p.pluginFactory(exePath, []string{pluginName}, dkplugin.ExecutorPluginName)
if err != nil {
return err
}
p.Executors[pluginName] = raw.(dkplugin.Executor)
}
return nil
}
func getPluginName(file string) (string, bool) {
// Look for foo-bar-baz. The plugin name is "baz"
base := path.Base(file)
parts := strings.SplitN(base, "-", 3)
if len(parts) != 3 {
return "", false
}
// This cleans off the .exe for windows plugins
name := strings.TrimSuffix(parts[2], ".exe")
return name, true
}
func (p *Plugins) pluginFactory(path string, args []string, pluginType string) (interface{}, error) {
// Build the plugin client configuration and init the plugin
var config plugin.ClientConfig
config.Cmd = exec.Command(path, args...)
config.HandshakeConfig = dkplugin.Handshake
config.Managed = true
config.Plugins = dkplugin.PluginMap
config.SyncStdout = os.Stdout
config.SyncStderr = os.Stderr
config.Logger = &dkron.HCLogAdapter{Logger: dkron.InitLogger(p.LogLevel, p.NodeName), LoggerName: "plugins"}
switch pluginType {
case dkplugin.ProcessorPluginName:
config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolNetRPC}
case dkplugin.ExecutorPluginName:
config.AllowedProtocols = []plugin.Protocol{plugin.ProtocolGRPC}
}
client := plugin.NewClient(&config)
// Request the RPC client so we can get the provider
// so we can build the actual RPC-implemented provider.
rpcClient, err := client.Client()
if err != nil {
return nil, err
}
raw, err := rpcClient.Dispense(pluginType)
if err != nil {
return nil, err
}
return raw, nil
}