Skip to content

Commit

Permalink
cluster: add integrity integration
Browse files Browse the repository at this point in the history
The patch adds a possibility of dependency injection of integrity
collectors and publishers into the cluster module and commands.

The global variables are now used instead of a command context. These
places are marked with `TODO` label and should be refactored later.
  • Loading branch information
oleg-jukovec authored and LeonidVas committed Dec 15, 2023
1 parent 8676f41 commit 19eefb1
Show file tree
Hide file tree
Showing 18 changed files with 419 additions and 114 deletions.
32 changes: 23 additions & 9 deletions cli/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,8 @@ func Instantiate(cluster ClusterConfig, name string) *Config {

// collectEtcdConfig collects a configuration from etcd with options from
// the cluster configuration.
func collectEtcdConfig(clusterConfig ClusterConfig) (*Config, error) {
func collectEtcdConfig(collectors CollectorFactory,
clusterConfig ClusterConfig) (*Config, error) {
etcdConfig := clusterConfig.Config.Etcd
opts := EtcdOpts{
Endpoints: etcdConfig.Endpoints,
Expand Down Expand Up @@ -328,7 +329,11 @@ func collectEtcdConfig(clusterConfig ClusterConfig) (*Config, error) {
}
defer etcd.Close()

etcdCollector := NewEtcdAllCollector(etcd, etcdConfig.Prefix, opts.Timeout)
etcdCollector, err := collectors.NewEtcd(etcd, etcdConfig.Prefix, "", opts.Timeout)
if err != nil {
return nil, fmt.Errorf("failed to create etcd collector: %w", err)
}

etcdRawConfig, err := etcdCollector.Collect()
if err != nil {
return nil, fmt.Errorf("unable to get config from etcd: %w", err)
Expand All @@ -338,7 +343,8 @@ func collectEtcdConfig(clusterConfig ClusterConfig) (*Config, error) {

// collectTarantoolConfig collects a configuration from tarantool config
// storage with options from the tarantool configuration.
func collectTarantoolConfig(clusterConfig ClusterConfig) (*Config, error) {
func collectTarantoolConfig(collectors CollectorFactory,
clusterConfig ClusterConfig) (*Config, error) {
tarantoolConfig := clusterConfig.Config.Storage
var opts []connector.ConnectOpts
for _, endpoint := range tarantoolConfig.Endpoints {
Expand Down Expand Up @@ -379,9 +385,13 @@ func collectTarantoolConfig(clusterConfig ClusterConfig) (*Config, error) {
}
defer pool.Close()

tarantoolCollector := NewTarantoolAllCollector(pool,
tarantoolConfig.Prefix,
tarantoolCollector, err := collectors.NewTarantool(pool,
tarantoolConfig.Prefix, "",
time.Duration(tarantoolConfig.Timeout*float64(time.Second)))
if err != nil {
return nil, fmt.Errorf("failed to create tarantool config storage collector: %w", err)
}

tarantoolRawConfig, err := tarantoolCollector.Collect()
if err != nil {
return nil, fmt.Errorf("failed to get config from tarantool config storage: %w", err)
Expand All @@ -393,7 +403,7 @@ func collectTarantoolConfig(clusterConfig ClusterConfig) (*Config, error) {
// a config file. It uses a a config file, etcd and default environment
// variables as sources. The function returns a cluster config as is, without
// merging of settings from scopes: global, group, replicaset, instance.
func GetClusterConfig(path string) (ClusterConfig, error) {
func GetClusterConfig(collectors CollectorFactory, path string) (ClusterConfig, error) {
ret := ClusterConfig{}
if path == "" {
return ret, fmt.Errorf("a configuration file must be set")
Expand All @@ -408,7 +418,11 @@ func GetClusterConfig(path string) (ClusterConfig, error) {
}
config.Merge(mainEnvConfig)

collector := NewFileCollector(path)
collector, err := collectors.NewFile(path)
if err != nil {
return ret, fmt.Errorf("failed to create a file collector: %w", err)
}

fileConfig, err := collector.Collect()
if err != nil {
fmtErr := "unable to get cluster config from %q: %w"
Expand All @@ -421,15 +435,15 @@ func GetClusterConfig(path string) (ClusterConfig, error) {
return ret, fmt.Errorf("unable to parse cluster config from file: %w", err)
}
if len(clusterConfig.Config.Etcd.Endpoints) > 0 {
etcdConfig, err := collectEtcdConfig(clusterConfig)
etcdConfig, err := collectEtcdConfig(collectors, clusterConfig)
if err != nil {
return ret, err
}
config.Merge(etcdConfig)
}

if len(clusterConfig.Config.Storage.Endpoints) > 0 {
tarantoolConfig, err := collectTarantoolConfig(clusterConfig)
tarantoolConfig, err := collectTarantoolConfig(collectors, clusterConfig)
if err != nil {
return ret, err
}
Expand Down
21 changes: 14 additions & 7 deletions cli/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ func TestInstantiate_inherbit(t *testing.T) {
}

func TestGetClusterConfig_path(t *testing.T) {
config, err := cluster.GetClusterConfig("testdata/app/config.yaml")
collectors := cluster.NewCollectorFactory()
config, err := cluster.GetClusterConfig(collectors, "testdata/app/config.yaml")

require.NoError(t, err)
assert.Equal(t, `app:
Expand Down Expand Up @@ -426,7 +427,8 @@ too: 3
func TestGetClusterConfig_environment(t *testing.T) {
os.Setenv("TT_WAL_DIR_DEFAULT", "envdir")
os.Setenv("TT_WAL_MODE_DEFAULT", "envmode")
config, err := cluster.GetClusterConfig("testdata/app/config.yaml")
collectors := cluster.NewCollectorFactory()
config, err := cluster.GetClusterConfig(collectors, "testdata/app/config.yaml")

os.Unsetenv("TT_WAL_DIR_DEFAULT")
os.Unsetenv("TT_WAL_MODE_DEFAULT")
Expand Down Expand Up @@ -464,22 +466,25 @@ wal:
}

func TestGetClusterConfig_invalid_apppath(t *testing.T) {
config, err := cluster.GetClusterConfig("some/non/exist")
collectors := cluster.NewCollectorFactory()
config, err := cluster.GetClusterConfig(collectors, "some/non/exist")

assert.Error(t, err)
assert.NotNil(t, config)
}

func TestGetClusterConfig_nopath(t *testing.T) {
config, err := cluster.GetClusterConfig("")
collectors := cluster.NewCollectorFactory()
config, err := cluster.GetClusterConfig(collectors, "")
expected := "a configuration file must be set"

assert.EqualError(t, err, expected)
assert.NotNil(t, config)
}

func TestGetInstanceConfig_file(t *testing.T) {
cconfig, err := cluster.GetClusterConfig("testdata/app/config.yaml")
collectors := cluster.NewCollectorFactory()
cconfig, err := cluster.GetClusterConfig(collectors, "testdata/app/config.yaml")
require.NoError(t, err)
config, err := cluster.GetInstanceConfig(cconfig, "c")

Expand All @@ -498,7 +503,8 @@ zoo: 2
}

func TestGetInstanceConfig_environment(t *testing.T) {
cconfig, err := cluster.GetClusterConfig("testdata/app/config.yaml")
collectors := cluster.NewCollectorFactory()
cconfig, err := cluster.GetClusterConfig(collectors, "testdata/app/config.yaml")
require.NoError(t, err)
os.Setenv("TT_WAL_DIR", "envdir")
config, err := cluster.GetInstanceConfig(cconfig, "c")
Expand All @@ -519,7 +525,8 @@ zoo: 2
}

func TestGetInstanceConfig_noinstance(t *testing.T) {
cconfig, err := cluster.GetClusterConfig("testdata/app/config.yaml")
collectors := cluster.NewCollectorFactory()
cconfig, err := cluster.GetClusterConfig(collectors, "testdata/app/config.yaml")
require.NoError(t, err)
_, err = cluster.GetInstanceConfig(cconfig, "unknown")
expected := "an instance \"unknown\" not found"
Expand Down
47 changes: 34 additions & 13 deletions cli/cluster/cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,10 @@ func connectEtcd(uriOpts UriOpts, connOpts connectOpts) (*clientv3.Client, error
}

// createPublisher creates a new data publisher and collector based on UriOpts.
func createPublisherAndCollector(connOpts connectOpts,
func createPublisherAndCollector(
publishers cluster.DataPublisherFactory,
collectors cluster.CollectorFactory,
connOpts connectOpts,
opts UriOpts) (cluster.DataPublisher, cluster.Collector, func(), error) {
prefix, key, timeout := opts.Prefix, opts.Key, opts.Timeout

Expand All @@ -193,13 +196,22 @@ func createPublisherAndCollector(connOpts connectOpts,
var (
publisher cluster.DataPublisher
collector cluster.Collector
err error
)
if key == "" {
publisher = cluster.NewTarantoolAllDataPublisher(conn, prefix, timeout)
collector = cluster.NewTarantoolAllCollector(conn, prefix, timeout)
} else {
publisher = cluster.NewTarantoolKeyDataPublisher(conn, prefix, key, timeout)
collector = cluster.NewTarantoolKeyCollector(conn, prefix, key, timeout)
if publishers != nil {
publisher, err = publishers.NewTarantool(conn, prefix, key, timeout)
if err != nil {
return nil, nil, nil,
fmt.Errorf("failed to create tarantool config storage publisher: %w", err)
}
}

if collectors != nil {
collector, err = collectors.NewTarantool(conn, prefix, key, timeout)
if err != nil {
return nil, nil, nil,
fmt.Errorf("failed to create tarantool config storage collector: %w", err)
}
}
return publisher, collector, func() { conn.Close() }, nil
}
Expand All @@ -209,13 +221,22 @@ func createPublisherAndCollector(connOpts connectOpts,
var (
publisher cluster.DataPublisher
collector cluster.Collector
err error
)
if key == "" {
publisher = cluster.NewEtcdAllDataPublisher(etcdcli, prefix, timeout)
collector = cluster.NewEtcdAllCollector(etcdcli, prefix, timeout)
} else {
publisher = cluster.NewEtcdKeyDataPublisher(etcdcli, prefix, key, timeout)
collector = cluster.NewEtcdKeyCollector(etcdcli, prefix, key, timeout)
if publishers != nil {
publisher, err = publishers.NewEtcd(etcdcli, prefix, key, timeout)
if err != nil {
return nil, nil, nil,
fmt.Errorf("failed to create etcd publisher: %w", err)
}
}

if collectors != nil {
collector, err = collectors.NewEtcd(etcdcli, prefix, key, timeout)
if err != nil {
return nil, nil, nil,
fmt.Errorf("failed to create etcd collector: %w", err)
}
}
return publisher, collector, func() { etcdcli.Close() }, nil
}
Expand Down
21 changes: 18 additions & 3 deletions cli/cluster/cmd/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ type PublishCtx struct {
// Force defines whether the publish should be forced and a validation step
// is omitted.
Force bool
// Publishers defines a used data publishers factory.
Publishers cluster.DataPublisherFactory
// Collectors defines a used collectors factory.
Collectors cluster.CollectorFactory
// Src is a raw data to publish.
Src []byte
// Config is a parsed raw data configuration to publish.
Expand All @@ -39,7 +43,10 @@ func PublishUri(publishCtx PublishCtx, uri *url.URL) error {
Username: publishCtx.Username,
Password: publishCtx.Password,
}
publisher, collector, cancel, err := createPublisherAndCollector(connOpts, uriOpts)
publisher, collector, cancel, err := createPublisherAndCollector(
publishCtx.Publishers,
publishCtx.Collectors,
connOpts, uriOpts)
if err != nil {
return err
}
Expand All @@ -59,13 +66,21 @@ func PublishCluster(publishCtx PublishCtx, path, instance string) error {
return err
}

publisher := cluster.NewFileDataPublisher(path)
publisher, err := publishCtx.Publishers.NewFile(path)
if err != nil {
return fmt.Errorf("failed to create a file publisher: %w", err)
}

if instance == "" {
// The easy case, just publish the configuration as is.
return publisher.Publish(publishCtx.Src)
}

collector := cluster.NewFileCollector(path)
collector, err := publishCtx.Collectors.NewFile(path)
if err != nil {
return fmt.Errorf("failed to create a file collector: %w", err)
}

return replaceInstanceConfig(instance, publishCtx.Config, collector, publisher)
}

Expand Down
9 changes: 7 additions & 2 deletions cli/cluster/cmd/show.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ type ShowCtx struct {
Username string
// Password defines a password for connection.
Password string
// Collectors defines a used collectors factory.
Collectors cluster.CollectorFactory
// Validate defines whether the command will check the showed
// configuration.
Validate bool
Expand All @@ -29,7 +31,10 @@ func ShowUri(showCtx ShowCtx, uri *url.URL) error {
Username: showCtx.Username,
Password: showCtx.Password,
}
_, collector, cancel, err := createPublisherAndCollector(connOpts, uriOpts)
_, collector, cancel, err := createPublisherAndCollector(
nil,
showCtx.Collectors,
connOpts, uriOpts)
if err != nil {
return err
}
Expand All @@ -50,7 +55,7 @@ func ShowUri(showCtx ShowCtx, uri *url.URL) error {

// ShowCluster shows a full cluster configuration for a configuration path.
func ShowCluster(showCtx ShowCtx, path, name string) error {
config, err := cluster.GetClusterConfig(path)
config, err := cluster.GetClusterConfig(showCtx.Collectors, path)
if err != nil {
return fmt.Errorf("failed to get a cluster configuration: %w", err)
}
Expand Down
54 changes: 54 additions & 0 deletions cli/cluster/collectorfactory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package cluster

import (
"time"

clientv3 "go.etcd.io/etcd/client/v3"

"github.com/tarantool/tt/cli/connector"
)

// CollectorFactory creates new data collectors.
type CollectorFactory interface {
// NewFile creates a new data collector to collect configuration from a file.
NewFile(path string) (Collector, error)
// NewEtcd creates a new data collector to collect configuration from etcd.
NewEtcd(etcdcli *clientv3.Client,
prefix, key string, timeout time.Duration) (Collector, error)
// NewTarantool creates a new data collector to collect configuration from
// tarantool config storage.
NewTarantool(conn connector.Connector,
prefix, key string, timeout time.Duration) (Collector, error)
}

// collectorsFactory is a type that implements a default CollectorFactory.
type collectorsFactory struct{}

// NewCollectorFactory creates a new CollectorFactory.
func NewCollectorFactory() CollectorFactory {
return collectorsFactory{}
}

// NewFiler creates a new file configuration collector.
func (factory collectorsFactory) NewFile(path string) (Collector, error) {
return NewFileCollector(path), nil
}

// NewEtcd creates a new etcd configuration collector.
func (factory collectorsFactory) NewEtcd(etcdcli *clientv3.Client,
prefix, key string, timeout time.Duration) (Collector, error) {
if key == "" {
return NewEtcdAllCollector(etcdcli, prefix, timeout), nil
}
return NewEtcdKeyCollector(etcdcli, prefix, key, timeout), nil
}

// NewTarantool creates creates a new tarantool config storage configuration
// collector.
func (factory collectorsFactory) NewTarantool(conn connector.Connector,
prefix, key string, timeout time.Duration) (Collector, error) {
if key == "" {
return NewTarantoolAllCollector(conn, prefix, timeout), nil
}
return NewTarantoolKeyCollector(conn, prefix, key, timeout), nil
}
Loading

0 comments on commit 19eefb1

Please sign in to comment.