diff --git a/cli/cluster/cmd/publish.go b/cli/cluster/cmd/publish.go index 10d67aed0..2ad15a5ad 100644 --- a/cli/cluster/cmd/publish.go +++ b/cli/cluster/cmd/publish.go @@ -55,7 +55,7 @@ func PublishUri(publishCtx PublishCtx, uri *url.URL) error { if instance == "" { // The easy case, just publish the configuration as is. - return publisher.Publish(publishCtx.Src) + return publisher.Publish(0, publishCtx.Src) } return replaceInstanceConfig(instance, publishCtx.Config, collector, publisher) @@ -74,7 +74,7 @@ func PublishCluster(publishCtx PublishCtx, path, instance string) error { if instance == "" { // The easy case, just publish the configuration as is. - return publisher.Publish(publishCtx.Src) + return publisher.Publish(0, publishCtx.Src) } collector, err := publishCtx.Collectors.NewFile(path) diff --git a/cli/cluster/etcd.go b/cli/cluster/etcd.go index 420fe41d6..9be426175 100644 --- a/cli/cluster/etcd.go +++ b/cli/cluster/etcd.go @@ -130,8 +130,9 @@ func (collector EtcdAllCollector) Collect() ([]integrity.Data, error) { collected := []integrity.Data{} for _, kv := range resp.Kvs { collected = append(collected, integrity.Data{ - Source: string(kv.Key), - Value: kv.Value, + Source: string(kv.Key), + Value: kv.Value, + Revision: kv.ModRevision, }) } @@ -188,8 +189,9 @@ func (collector EtcdKeyCollector) Collect() ([]integrity.Data, error) { collected := []integrity.Data{ { - Source: string(resp.Kvs[0].Key), - Value: resp.Kvs[0].Value, + Source: string(resp.Kvs[0].Key), + Value: resp.Kvs[0].Value, + Revision: resp.Kvs[0].ModRevision, }, } return collected, nil @@ -221,7 +223,11 @@ func NewEtcdAllDataPublisher(getter EtcdTxnGetter, } // Publish publishes the configuration into etcd to the given prefix. -func (publisher EtcdAllDataPublisher) Publish(data []byte) error { +func (publisher EtcdAllDataPublisher) Publish(revision int64, data []byte) error { + if revision != 0 { + return fmt.Errorf("failed to publish data into etcd: target revision %d is not supported", + revision) + } if data == nil { return fmt.Errorf("failed to publish data into etcd: data does not exist") } @@ -301,16 +307,9 @@ func (publisher EtcdAllDataPublisher) Publish(data []byte) error { return nil } -// EtcdPutter is the interface that wraps put from etcd method. -type EtcdPutter interface { - // Put puts a key-value pair into etcd. - Put(ctx context.Context, key, val string, - opts ...clientv3.OpOption) (*clientv3.PutResponse, error) -} - // EtcdKeyDataPublisher publishes a data into etcd for a prefix and a key. type EtcdKeyDataPublisher struct { - putter EtcdPutter + getter EtcdTxnGetter prefix string key string timeout time.Duration @@ -318,10 +317,10 @@ type EtcdKeyDataPublisher struct { // NewEtcdKeyDataPublisher creates a new EtcdKeyDataPublisher object to publish // a data to etcd with the prefix and key during the timeout. -func NewEtcdKeyDataPublisher(putter EtcdPutter, +func NewEtcdKeyDataPublisher(getter EtcdTxnGetter, prefix, key string, timeout time.Duration) EtcdKeyDataPublisher { return EtcdKeyDataPublisher{ - putter: putter, + getter: getter, prefix: prefix, key: key, timeout: timeout, @@ -329,7 +328,8 @@ func NewEtcdKeyDataPublisher(putter EtcdPutter, } // Publish publishes the configuration into etcd to the given prefix and key. -func (publisher EtcdKeyDataPublisher) Publish(data []byte) error { +// If passed revision is not 0, the data will be published only if target key revision the same. +func (publisher EtcdKeyDataPublisher) Publish(revision int64, data []byte) error { if data == nil { return fmt.Errorf("failed to publish data into etcd: data does not exist") } @@ -343,10 +343,17 @@ func (publisher EtcdKeyDataPublisher) Publish(data []byte) error { defer cancel() } - _, err := publisher.putter.Put(ctx, key, string(data)) + txn := publisher.getter.Txn(ctx) + if revision != 0 { + txn = txn.If(clientv3.Compare(clientv3.ModRevision(key), "=", revision)) + } + tresp, err := txn.Then(clientv3.OpPut(key, string(data))).Commit() if err != nil { return fmt.Errorf("failed to put data into etcd: %w", err) } + if !tresp.Succeeded { + return fmt.Errorf("failed to put data into etcd: wrong revision") + } return nil } diff --git a/cli/cluster/etcd_test.go b/cli/cluster/etcd_test.go index 9aa94938f..9bbfdf6ac 100644 --- a/cli/cluster/etcd_test.go +++ b/cli/cluster/etcd_test.go @@ -79,23 +79,6 @@ func (getter *MockEtcdTxnGetter) Txn(ctx context.Context) clientv3.Txn { return getter.TxnRet } -type MockEtcdPutter struct { - Ctx context.Context - Key string - Val string - Ops []clientv3.OpOption - Err error -} - -func (putter *MockEtcdPutter) Put(ctx context.Context, key string, val string, - opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { - putter.Ctx = ctx - putter.Key = key - putter.Val = val - putter.Ops = opts - return nil, putter.Err -} - func TestClientKVImplementsEtcdGetter(t *testing.T) { var ( kv clientv3.KV @@ -177,34 +160,40 @@ func TestEtcdAllCollector_Collect_merge(t *testing.T) { { Kvs: []*mvccpb.KeyValue{ &mvccpb.KeyValue{ - Key: []byte("k"), - Value: []byte("f: a\nb: a\n"), + Key: []byte("k"), + Value: []byte("f: a\nb: a\n"), + ModRevision: 1, }, }, Expected: []integrity.Data{{ - Source: "k", - Value: []byte("f: a\nb: a\n"), + Source: "k", + Value: []byte("f: a\nb: a\n"), + Revision: 1, }}, }, { Kvs: []*mvccpb.KeyValue{ &mvccpb.KeyValue{ - Key: []byte("k"), - Value: []byte("f: a\nb: a\n"), + Key: []byte("k"), + Value: []byte("f: a\nb: a\n"), + ModRevision: 1, }, &mvccpb.KeyValue{ - Key: []byte("k"), - Value: []byte("f: b\nb: b\nc: b\n"), + Key: []byte("k"), + Value: []byte("f: b\nb: b\nc: b\n"), + ModRevision: 2, }, }, Expected: []integrity.Data{ { - Source: "k", - Value: []byte("f: a\nb: a\n"), + Source: "k", + Value: []byte("f: a\nb: a\n"), + Revision: 1, }, { - Source: "k", - Value: []byte("f: b\nb: b\nc: b\n"), + Source: "k", + Value: []byte("f: b\nb: b\nc: b\n"), + Revision: 2, }, }, }, @@ -303,14 +292,16 @@ func TestEtcdKeyCollector_Collect_key(t *testing.T) { mock := &MockEtcdGetter{ Kvs: []*mvccpb.KeyValue{ &mvccpb.KeyValue{ - Key: []byte("k"), - Value: []byte("f: a\nb: a\n"), + Key: []byte("k"), + Value: []byte("f: a\nb: a\n"), + ModRevision: 1, }, }, } expected := []integrity.Data{{ - Source: "k", - Value: []byte("f: a\nb: a\n"), + Source: "k", + Value: []byte("f: a\nb: a\n"), + Revision: 1, }} config, err := cluster.NewEtcdKeyCollector(mock, "foo", "key", 0).Collect() @@ -324,12 +315,14 @@ func TestEtcdKeyCollector_Collect_too_many(t *testing.T) { mock := &MockEtcdGetter{ Kvs: []*mvccpb.KeyValue{ &mvccpb.KeyValue{ - Key: []byte("k"), - Value: []byte("f: a\nb: a\n"), + Key: []byte("k"), + Value: []byte("f: a\nb: a\n"), + ModRevision: 1, }, &mvccpb.KeyValue{ - Key: []byte("k"), - Value: []byte("f: b\nb: b\nc: b\n"), + Key: []byte("k"), + Value: []byte("f: b\nb: b\nc: b\n"), + ModRevision: 2, }, }, } @@ -364,7 +357,7 @@ func TestEtcdAllDataPublisher_Publish_get_inputs(t *testing.T) { for _, tc := range cases { t.Run(tc.Prefix, func(t *testing.T) { mock := &MockEtcdTxnGetter{} - cluster.NewEtcdAllDataPublisher(mock, tc.Prefix, 0).Publish(data) + cluster.NewEtcdAllDataPublisher(mock, tc.Prefix, 0).Publish(0, data) assert.NotNil(t, mock.Ctx) assert.Equal(t, tc.Key, mock.Key) @@ -431,7 +424,7 @@ func TestEtcdAllDataPublisher_Publish_txn_inputs(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { publisher := cluster.NewEtcdAllDataPublisher(tc.Mock, "/foo", 0) - publisher.Publish([]byte{}) + publisher.Publish(0, []byte{}) assert.Len(t, tc.Mock.TxnRet.IfCs, tc.IfLen) assert.Len(t, tc.Mock.TxnRet.ThenOps, tc.ThenLen) @@ -462,7 +455,7 @@ func TestEtcdDataPublishers_Publish_data_nil(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - err := tc.Publisher.Publish(nil) + err := tc.Publisher.Publish(0, nil) assert.EqualError(t, err, "failed to publish data into etcd: data does not exist") @@ -482,7 +475,7 @@ func TestEtcdDataPublishers_Publish_publisher_nil(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { assert.Panics(t, func() { - tc.Publisher.Publish([]byte{}) + tc.Publisher.Publish(0, []byte{}) }) }) } @@ -520,7 +513,7 @@ func TestEtcdAllDataPublisher_Publish_errors(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { publisher := cluster.NewEtcdAllDataPublisher(tc.Mock, "prefix", 0) - err := publisher.Publish([]byte{}) + err := publisher.Publish(0, []byte{}) if tc.Expected != "" { assert.EqualError(t, err, tc.Expected) } else { @@ -530,6 +523,14 @@ func TestEtcdAllDataPublisher_Publish_errors(t *testing.T) { } } +func TestEtcdAllDataPublisher_Publish_revision(t *testing.T) { + mock := &MockEtcdTxnGetter{} + publisher := cluster.NewEtcdAllDataPublisher(mock, "prefix", 0) + err := publisher.Publish(1, []byte{}) + assert.EqualError(t, err, + "failed to publish data into etcd: target revision 1 is not supported") +} + func TestEtcdAllDataPublisher_Publish_timeout(t *testing.T) { cases := []time.Duration{0, 60 * time.Second} @@ -537,7 +538,7 @@ func TestEtcdAllDataPublisher_Publish_timeout(t *testing.T) { t.Run(fmt.Sprint(tc), func(t *testing.T) { mock := &MockEtcdTxnGetter{} publisher := cluster.NewEtcdAllDataPublisher(mock, "prefix", tc) - err := publisher.Publish([]byte{}) + err := publisher.Publish(0, []byte{}) require.NoError(t, err) require.NotNil(t, mock.Ctx) @@ -574,7 +575,7 @@ func TestEtcdAllDataPublisher_Publish_timeout_exit(t *testing.T) { timeout := 100 * time.Millisecond delta := 30 * time.Millisecond publisher := cluster.NewEtcdAllDataPublisher(mock, "prefix", timeout) - err := publisher.Publish([]byte{}) + err := publisher.Publish(0, []byte{}) assert.EqualError(t, err, "context deadline exceeded") assert.InDelta(t, timeout, time.Since(before), float64(delta)) } @@ -605,37 +606,58 @@ func TestEtcdKeyDataPublisher_Publish_inputs(t *testing.T) { for _, tc := range cases { t.Run(tc.Expected, func(t *testing.T) { - mock := &MockEtcdPutter{Err: fmt.Errorf("foo")} + mock := &MockEtcdTxnGetter{TxnRet: &MockTxn{Err: fmt.Errorf("foo")}} publisher := cluster.NewEtcdKeyDataPublisher(mock, tc.Prefix, tc.Key, 0) - publisher.Publish(data) + publisher.Publish(0, data) - assert.NotNil(t, mock.Ctx) - assert.Equal(t, tc.Expected, mock.Key) - assert.Equal(t, data, []byte(mock.Val)) - assert.Len(t, mock.Ops, 0) + assert.NotNil(t, mock.CtxTxn) + assert.Equal(t, + []clientv3.Op{clientv3.OpPut(tc.Expected, string(data))}, mock.TxnRet.ThenOps) + assert.Nil(t, mock.TxnRet.IfCs) + assert.Nil(t, mock.TxnRet.ElseOps) }) } } +func TestEtcdKeyDataPublisher_Publish_modRevision(t *testing.T) { + prefix := "/foo" + key := "key" + modRevision := int64(5) + data := []byte("foo bar") + expected := "/foo/config/key" + mock := &MockEtcdTxnGetter{TxnRet: &MockTxn{Err: fmt.Errorf("foo")}} + publisher := cluster.NewEtcdKeyDataPublisher(mock, prefix, key, 0) + + publisher.Publish(modRevision, data) + assert.NotNil(t, mock.CtxTxn) + assert.Equal(t, + []clientv3.Op{clientv3.OpPut(expected, string(data))}, + mock.TxnRet.ThenOps) + assert.Equal(t, + []clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(expected), "=", modRevision)}, + mock.TxnRet.IfCs) + assert.Nil(t, mock.TxnRet.ElseOps) +} + func TestEtcdKeyDataPublisher_Publish_error(t *testing.T) { - mock := &MockEtcdPutter{Err: fmt.Errorf("foo")} + mock := &MockEtcdTxnGetter{TxnRet: &MockTxn{Err: fmt.Errorf("foo")}} publisher := cluster.NewEtcdKeyDataPublisher(mock, "", "", 0) - err := publisher.Publish([]byte{}) + err := publisher.Publish(0, []byte{}) assert.EqualError(t, err, "failed to put data into etcd: foo") } func TestEtcdKeyDataPublisher_Publish_timeout(t *testing.T) { cases := []time.Duration{0, 60 * time.Second} - mock := &MockEtcdPutter{Err: fmt.Errorf("foo")} + mock := &MockEtcdTxnGetter{TxnRet: &MockTxn{Err: fmt.Errorf("foo")}} for _, tc := range cases { t.Run(fmt.Sprint(tc), func(t *testing.T) { publisher := cluster.NewEtcdKeyDataPublisher(mock, "", "", tc) - publisher.Publish([]byte{}) + publisher.Publish(0, []byte{}) expected := time.Now().Add(tc) - deadline, ok := mock.Ctx.Deadline() + deadline, ok := mock.CtxTxn.Deadline() if tc == 0 { assert.False(t, ok) } else { diff --git a/cli/cluster/file.go b/cli/cluster/file.go index ec3a7903e..abef44f7c 100644 --- a/cli/cluster/file.go +++ b/cli/cluster/file.go @@ -55,7 +55,11 @@ func NewFileDataPublisher(path string) FileDataPublisher { } // Publish publishes the data to a file for the given path. -func (publisher FileDataPublisher) Publish(data []byte) error { +func (publisher FileDataPublisher) Publish(revision int64, data []byte) error { + if revision != 0 { + return fmt.Errorf("failed to publish data into file: target revision %d is not supported", + revision) + } if publisher.path == "" { return fmt.Errorf("file path is empty") } diff --git a/cli/cluster/file_test.go b/cli/cluster/file_test.go index ffbdf63db..c2d8e387a 100644 --- a/cli/cluster/file_test.go +++ b/cli/cluster/file_test.go @@ -64,20 +64,27 @@ func TestNewFileDataPublisher(t *testing.T) { } func TestFileDataPublisher_Publish_empty_path(t *testing.T) { - err := cluster.NewFileDataPublisher("").Publish([]byte{}) + err := cluster.NewFileDataPublisher("").Publish(0, []byte{}) assert.EqualError(t, err, "file path is empty") } func TestFileDataPublisher_Publish_empty_data(t *testing.T) { - err := cluster.NewFileDataPublisher("foo").Publish(nil) + err := cluster.NewFileDataPublisher("foo").Publish(0, nil) assert.EqualError(t, err, "failed to publish data into \"foo\": data does not exist") } +func TestFileDataPublisher_Publish_revision(t *testing.T) { + err := cluster.NewFileDataPublisher("foo").Publish(1, []byte{}) + + assert.EqualError(t, err, + "failed to publish data into file: target revision 1 is not supported") +} + func TestFileDataPublisher_Publish_error(t *testing.T) { - err := cluster.NewFileDataPublisher("/some/invalid/path").Publish([]byte{}) + err := cluster.NewFileDataPublisher("/some/invalid/path").Publish(0, []byte{}) assert.Error(t, err) } @@ -87,7 +94,7 @@ func TestFileDataPublisher_Publish_data(t *testing.T) { path := filepath.Join(dir, "testfile") data := []byte("foo") - err := cluster.NewFileDataPublisher(path).Publish(data) + err := cluster.NewFileDataPublisher(path).Publish(0, data) require.NoError(t, err) read, err := os.ReadFile(path) @@ -110,7 +117,7 @@ func TestFileDataPublisher_Publish_data_exist_file(t *testing.T) { originalMode := fi.Mode() data := []byte("foo") - err = cluster.NewFileDataPublisher(path).Publish(data) + err = cluster.NewFileDataPublisher(path).Publish(0, data) require.NoError(t, err) read, err := os.ReadFile(path) diff --git a/cli/cluster/integration_test.go b/cli/cluster/integration_test.go index 33163e88d..a5061374d 100644 --- a/cli/cluster/integration_test.go +++ b/cli/cluster/integration_test.go @@ -167,7 +167,7 @@ func etcdPut(t *testing.T, etcd *clientv3.Client, key, value string) { require.NotNil(t, presp) } -func etcdGet(t *testing.T, etcd *clientv3.Client, key string) []byte { +func etcdGet(t *testing.T, etcd *clientv3.Client, key string) ([]byte, int64) { t.Helper() ctx, cancel := context.WithTimeout(context.Background(), timeout) @@ -177,11 +177,11 @@ func etcdGet(t *testing.T, etcd *clientv3.Client, key string) []byte { require.NoError(t, err) require.NotNil(t, resp) if len(resp.Kvs) == 0 { - return []byte("") + return []byte(""), 0 } require.Len(t, resp.Kvs, 1) - return resp.Kvs[0].Value + return resp.Kvs[0].Value, resp.Kvs[0].ModRevision } type connectEtcdOpts struct { @@ -578,10 +578,11 @@ func TestEtcdDataPublishers_Publish_single(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - err = tc.Publisher.Publish(data) + err = tc.Publisher.Publish(0, data) assert.NoError(t, err) - assert.Equal(t, data, etcdGet(t, etcd, "/foo/config/"+tc.Key)) + actual, _ := etcdGet(t, etcd, "/foo/config/"+tc.Key) + assert.Equal(t, data, actual) }) } } @@ -609,11 +610,12 @@ func TestEtcdDataPublishers_Publish_rewrite(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { - err = tc.Publisher.Publish(oldData) + err = tc.Publisher.Publish(0, oldData) require.NoError(t, err) - err = tc.Publisher.Publish(newData) + err = tc.Publisher.Publish(0, newData) assert.NoError(t, err) - assert.Equal(t, newData, etcdGet(t, etcd, "/foo/config/"+tc.Key)) + actual, _ := etcdGet(t, etcd, "/foo/config/"+tc.Key) + assert.Equal(t, newData, actual) }) } } @@ -632,15 +634,50 @@ func TestEtcdAllDataPublisher_Publish_rewrite_prefix(t *testing.T) { etcdPut(t, etcd, "/foo/config/foo", "zoo") data := []byte("zoo bar foo") - err = cluster.NewEtcdAllDataPublisher(etcd, "/foo/", timeout).Publish(data) + err = cluster.NewEtcdAllDataPublisher(etcd, "/foo/", timeout).Publish(0, data) assert.NoError(t, err) - assert.Equal(t, []byte(""), etcdGet(t, etcd, "/foo/config/")) - assert.Equal(t, []byte(""), etcdGet(t, etcd, "/foo/config/foo")) - assert.Equal(t, data, etcdGet(t, etcd, "/foo/config/all")) + + actual, _ := etcdGet(t, etcd, "/foo/config/") + assert.Equal(t, []byte(""), actual) + + actual, _ = etcdGet(t, etcd, "/foo/config/foo") + assert.Equal(t, []byte(""), actual) + + actual, _ = etcdGet(t, etcd, "/foo/config/all") + assert.Equal(t, data, actual) } -func TestEtcdAllDataPublisher_Publish_ignore_prefix(t *testing.T) { +func TestEtcdKeyDataPublisher_Publish_modRevision_specified(t *testing.T) { + inst := startEtcd(t, httpEndpoint, etcdOpts{}) + defer stopEtcd(t, inst) + + endpoints := []string{httpEndpoint} + etcd, err := cluster.ConnectEtcd(cluster.EtcdOpts{Endpoints: endpoints}) + require.NoError(t, err) + require.NotNil(t, etcd) + defer etcd.Close() + + etcdPut(t, etcd, "/foo/config/key", "bar") + _, modRevision := etcdGet(t, etcd, "/foo/config/key") + + data := []byte("baz") + + publisher := cluster.NewEtcdKeyDataPublisher(etcd, "/foo", "key", timeout) + // Use wrong revision. + err = publisher.Publish(modRevision-1, data) + assert.Errorf(t, err, "failed to put data into etcd: wrong revision") + actual, _ := etcdGet(t, etcd, "/foo/config/key") + assert.Equal(t, []byte("bar"), actual) + + // Use right revision. + err = publisher.Publish(modRevision, data) + assert.NoError(t, err) + actual, _ = etcdGet(t, etcd, "/foo/config/key") + assert.Equal(t, data, actual) +} + +func TestEtcdKeyDataPublisher_Publish_ignore_prefix(t *testing.T) { inst := startEtcd(t, httpEndpoint, etcdOpts{}) defer stopEtcd(t, inst) @@ -654,12 +691,18 @@ func TestEtcdAllDataPublisher_Publish_ignore_prefix(t *testing.T) { etcdPut(t, etcd, "/foo/config/foo", "zoo") data := []byte("zoo bar foo") - err = cluster.NewEtcdKeyDataPublisher(etcd, "/foo/", "all", timeout).Publish(data) + err = cluster.NewEtcdKeyDataPublisher(etcd, "/foo/", "all", timeout).Publish(0, data) assert.NoError(t, err) - assert.Equal(t, []byte("foo"), etcdGet(t, etcd, "/foo/config/")) - assert.Equal(t, []byte("zoo"), etcdGet(t, etcd, "/foo/config/foo")) - assert.Equal(t, data, etcdGet(t, etcd, "/foo/config/all")) + + actual, _ := etcdGet(t, etcd, "/foo/config/") + assert.Equal(t, []byte("foo"), actual) + + actual, _ = etcdGet(t, etcd, "/foo/config/foo") + assert.Equal(t, []byte("zoo"), actual) + + actual, _ = etcdGet(t, etcd, "/foo/config/all") + assert.Equal(t, data, actual) } func TestEtcdAllDataPublisher_collect_publish_collect(t *testing.T) { diff --git a/cli/cluster/tarantool.go b/cli/cluster/tarantool.go index 5b266fe69..efd9d20e6 100644 --- a/cli/cluster/tarantool.go +++ b/cli/cluster/tarantool.go @@ -13,7 +13,7 @@ import ( // tarantoolCall retursns result of a function call via tarantool connector. func tarantoolCall(conn tarantool.Connector, - fun string, args []any, timeout time.Duration) ([]any, error) { + fun string, args []any, timeout time.Duration) ([][]any, error) { req := tarantool.NewCallRequest(fun).Args(args) if timeout != 0 { @@ -24,12 +24,12 @@ func tarantoolCall(conn tarantool.Connector, req = req.Context(ctx) } - response, err := conn.Do(req).Get() - if err != nil { + var result [][]any + if err := conn.Do(req).GetTyped(&result); err != nil { return nil, err } - return response.Data, err + return result, nil } // TarantoolAllCollector collects data from a Tarantool for a whole prefix. @@ -67,8 +67,9 @@ func (collector TarantoolAllCollector) Collect() ([]integrity.Data, error) { collected := []integrity.Data{} for _, data := range resp.Data { collected = append(collected, integrity.Data{ - Source: data.Path, - Value: []byte(data.Value), + Source: data.Path, + Value: []byte(data.Value), + Revision: data.ModRevision, }) } @@ -117,8 +118,9 @@ func (collector TarantoolKeyCollector) Collect() ([]integrity.Data, error) { return []integrity.Data{ { - Source: key, - Value: []byte(resp.Data[0].Value), + Source: key, + Value: []byte(resp.Data[0].Value), + Revision: resp.Data[0].ModRevision, }, }, err } @@ -142,7 +144,12 @@ func NewTarantoolAllDataPublisher(conn tarantool.Connector, } // Publish publishes the configuration into Tarantool to the given prefix. -func (publisher TarantoolAllDataPublisher) Publish(data []byte) error { +func (publisher TarantoolAllDataPublisher) Publish(revision int64, data []byte) error { + if revision != 0 { + return fmt.Errorf( + "failed to publish data into tarantool: target revision %d is not supported", + revision) + } prefix := getConfigPrefix(publisher.prefix) key := prefix + "all" args := []any{ @@ -184,11 +191,21 @@ func NewTarantoolKeyDataPublisher(conn tarantool.Connector, // Publish publishes the configuration into Tarantool to the given prefix and // key. -func (publisher TarantoolKeyDataPublisher) Publish(data []byte) error { +// If passed revision is not 0, the data will be published only if target key revision the same. +func (publisher TarantoolKeyDataPublisher) Publish(revision int64, data []byte) error { key := getConfigPrefix(publisher.prefix) + publisher.key - args := []any{key, string(data)} - _, err := tarantoolCall(publisher.conn, "config.storage.put", args, publisher.timeout) + txn := map[any]any{ + "on_success": []any{ + []any{"put", key, string(data)}, + }, + } + if revision != 0 { + txn["predicates"] = []any{"mod_revision", "==", revision} + } + args := []any{txn} + + _, err := tarantoolCall(publisher.conn, "config.storage.txn", args, publisher.timeout) if err != nil { return fmt.Errorf("failed to put data into tarantool: %w", err) } @@ -197,8 +214,9 @@ func (publisher TarantoolKeyDataPublisher) Publish(data []byte) error { type tarantoolResponse struct { Data []struct { - Path string - Value string + Path string + Value string + ModRevision int64 `mapstructure:"mod_revision"` } } @@ -216,8 +234,9 @@ func tarantoolGet(conn tarantool.Connector, return resp, fmt.Errorf("unexpected response from tarantool: %q", data) } - if err := mapstructure.Decode(data[0], &resp); err != nil { - return resp, fmt.Errorf("failed to map response from tarantool: %q", data[0]) + rawResp := data[0][0] + if err := mapstructure.Decode(rawResp, &resp); err != nil { + return resp, fmt.Errorf("failed to map response from tarantool: %q", rawResp) } return resp, nil diff --git a/cli/cluster/tarantool_test.go b/cli/cluster/tarantool_test.go index 13f80c0fd..0ad14103b 100644 --- a/cli/cluster/tarantool_test.go +++ b/cli/cluster/tarantool_test.go @@ -43,6 +43,13 @@ func TestNewTarantoolDataPublishers(t *testing.T) { } } +func TestAllTarantoolDataPublisher_Publish_revision(t *testing.T) { + publisher := cluster.NewTarantoolAllDataPublisher(nil, "", 0) + err := publisher.Publish(1, []byte{}) + assert.EqualError( + t, err, "failed to publish data into tarantool: target revision 1 is not supported") +} + func TestNewTarantoolDataPublishers_Publish_nil_evaler(t *testing.T) { cases := []struct { Name string @@ -55,7 +62,7 @@ func TestNewTarantoolDataPublishers_Publish_nil_evaler(t *testing.T) { for _, tc := range cases { t.Run(tc.Name, func(t *testing.T) { assert.Panics(t, func() { - tc.Publisher.Publish([]byte{}) + tc.Publisher.Publish(0, []byte{}) }) }) } diff --git a/cli/cluster/yaml.go b/cli/cluster/yaml.go index 62c50641c..b7bd0b99d 100644 --- a/cli/cluster/yaml.go +++ b/cli/cluster/yaml.go @@ -86,5 +86,5 @@ func (publisher YamlConfigPublisher) Publish(config *Config) error { return fmt.Errorf("config does not exist") } - return publisher.publisher.Publish([]byte(config.String())) + return publisher.publisher.Publish(0, []byte(config.String())) } diff --git a/cli/cluster/yaml_test.go b/cli/cluster/yaml_test.go index 6b76df66e..43dd31687 100644 --- a/cli/cluster/yaml_test.go +++ b/cli/cluster/yaml_test.go @@ -230,10 +230,10 @@ func TestYamlCollectorDecorator_unique(t *testing.T) { assert.Error(t, err) } -type dataPublishFunc func(data []byte) error +type dataPublishFunc func(revision int64, data []byte) error -func (f dataPublishFunc) Publish(data []byte) error { - return f(data) +func (f dataPublishFunc) Publish(revision int64, data []byte) error { + return f(revision, data) } func TestNewYamlConfigPublisher(t *testing.T) { @@ -261,7 +261,7 @@ func TestYamlConfigPublisher_Publish_nil_config(t *testing.T) { func TestYamlConfigPublisher_Publish_publish_data(t *testing.T) { var input []byte publisher := cluster.NewYamlConfigPublisher(dataPublishFunc( - func(data []byte) error { + func(revision int64, data []byte) error { input = data return nil })) @@ -283,7 +283,7 @@ zoo: func TestYamlConfigPublisher_Publish_error(t *testing.T) { err := fmt.Errorf("any") publisher := cluster.NewYamlConfigPublisher(dataPublishFunc( - func([]byte) error { + func(int64, []byte) error { return err })) config := cluster.NewConfig() diff --git a/cli/integrity/datacollector.go b/cli/integrity/datacollector.go index ed54e6abe..3161742ae 100644 --- a/cli/integrity/datacollector.go +++ b/cli/integrity/datacollector.go @@ -13,6 +13,8 @@ type Data struct { Source string // Value is data collected. Value []byte + // Revision is data revision. + Revision int64 } // DataCollector interface must be implemented by a source collector. diff --git a/cli/integrity/datapublisher.go b/cli/integrity/datapublisher.go index bffb7f8ca..69574adf4 100644 --- a/cli/integrity/datapublisher.go +++ b/cli/integrity/datapublisher.go @@ -10,7 +10,7 @@ import ( // DataPublisher interface must be implemented by a raw data publisher. type DataPublisher interface { // Publish publishes the interface or returns an error. - Publish(data []byte) error + Publish(revision int64, data []byte) error } // Data publisher factory creates new data publishers.