Skip to content

Commit

Permalink
cluster: support revision
Browse files Browse the repository at this point in the history
This patch adds `mod_revision` field support to the
cluster etcd/tcs publishers/collectors.

Also, fix a bug with tcs response decoding: use `resp[0][0]`
instead of `resp[0]`.

Part of #315
  • Loading branch information
askalt authored and psergee committed Feb 14, 2024
1 parent 48644aa commit c98976b
Show file tree
Hide file tree
Showing 12 changed files with 232 additions and 121 deletions.
4 changes: 2 additions & 2 deletions cli/cluster/cmd/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func PublishUri(publishCtx PublishCtx, uri *url.URL) error {

if instance == "" {
// The easy case, just publish the configuration as is.
return publisher.Publish(publishCtx.Src)
return publisher.Publish(0, publishCtx.Src)
}

return replaceInstanceConfig(instance, publishCtx.Config, collector, publisher)
Expand All @@ -74,7 +74,7 @@ func PublishCluster(publishCtx PublishCtx, path, instance string) error {

if instance == "" {
// The easy case, just publish the configuration as is.
return publisher.Publish(publishCtx.Src)
return publisher.Publish(0, publishCtx.Src)
}

collector, err := publishCtx.Collectors.NewFile(path)
Expand Down
41 changes: 24 additions & 17 deletions cli/cluster/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,9 @@ func (collector EtcdAllCollector) Collect() ([]integrity.Data, error) {
collected := []integrity.Data{}
for _, kv := range resp.Kvs {
collected = append(collected, integrity.Data{
Source: string(kv.Key),
Value: kv.Value,
Source: string(kv.Key),
Value: kv.Value,
Revision: kv.ModRevision,
})
}

Expand Down Expand Up @@ -188,8 +189,9 @@ func (collector EtcdKeyCollector) Collect() ([]integrity.Data, error) {

collected := []integrity.Data{
{
Source: string(resp.Kvs[0].Key),
Value: resp.Kvs[0].Value,
Source: string(resp.Kvs[0].Key),
Value: resp.Kvs[0].Value,
Revision: resp.Kvs[0].ModRevision,
},
}
return collected, nil
Expand Down Expand Up @@ -221,7 +223,11 @@ func NewEtcdAllDataPublisher(getter EtcdTxnGetter,
}

// Publish publishes the configuration into etcd to the given prefix.
func (publisher EtcdAllDataPublisher) Publish(data []byte) error {
func (publisher EtcdAllDataPublisher) Publish(revision int64, data []byte) error {
if revision != 0 {
return fmt.Errorf("failed to publish data into etcd: target revision %d is not supported",
revision)
}
if data == nil {
return fmt.Errorf("failed to publish data into etcd: data does not exist")
}
Expand Down Expand Up @@ -301,35 +307,29 @@ func (publisher EtcdAllDataPublisher) Publish(data []byte) error {
return nil
}

// EtcdPutter is the interface that wraps put from etcd method.
type EtcdPutter interface {
// Put puts a key-value pair into etcd.
Put(ctx context.Context, key, val string,
opts ...clientv3.OpOption) (*clientv3.PutResponse, error)
}

// EtcdKeyDataPublisher publishes a data into etcd for a prefix and a key.
type EtcdKeyDataPublisher struct {
putter EtcdPutter
getter EtcdTxnGetter
prefix string
key string
timeout time.Duration
}

// NewEtcdKeyDataPublisher creates a new EtcdKeyDataPublisher object to publish
// a data to etcd with the prefix and key during the timeout.
func NewEtcdKeyDataPublisher(putter EtcdPutter,
func NewEtcdKeyDataPublisher(getter EtcdTxnGetter,
prefix, key string, timeout time.Duration) EtcdKeyDataPublisher {
return EtcdKeyDataPublisher{
putter: putter,
getter: getter,
prefix: prefix,
key: key,
timeout: timeout,
}
}

// Publish publishes the configuration into etcd to the given prefix and key.
func (publisher EtcdKeyDataPublisher) Publish(data []byte) error {
// If passed revision is not 0, the data will be published only if target key revision the same.
func (publisher EtcdKeyDataPublisher) Publish(revision int64, data []byte) error {
if data == nil {
return fmt.Errorf("failed to publish data into etcd: data does not exist")
}
Expand All @@ -343,10 +343,17 @@ func (publisher EtcdKeyDataPublisher) Publish(data []byte) error {
defer cancel()
}

_, err := publisher.putter.Put(ctx, key, string(data))
txn := publisher.getter.Txn(ctx)
if revision != 0 {
txn = txn.If(clientv3.Compare(clientv3.ModRevision(key), "=", revision))
}
tresp, err := txn.Then(clientv3.OpPut(key, string(data))).Commit()
if err != nil {
return fmt.Errorf("failed to put data into etcd: %w", err)
}
if !tresp.Succeeded {
return fmt.Errorf("failed to put data into etcd: wrong revision")
}

return nil
}
Expand Down
132 changes: 77 additions & 55 deletions cli/cluster/etcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,23 +79,6 @@ func (getter *MockEtcdTxnGetter) Txn(ctx context.Context) clientv3.Txn {
return getter.TxnRet
}

type MockEtcdPutter struct {
Ctx context.Context
Key string
Val string
Ops []clientv3.OpOption
Err error
}

func (putter *MockEtcdPutter) Put(ctx context.Context, key string, val string,
opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
putter.Ctx = ctx
putter.Key = key
putter.Val = val
putter.Ops = opts
return nil, putter.Err
}

func TestClientKVImplementsEtcdGetter(t *testing.T) {
var (
kv clientv3.KV
Expand Down Expand Up @@ -177,34 +160,40 @@ func TestEtcdAllCollector_Collect_merge(t *testing.T) {
{
Kvs: []*mvccpb.KeyValue{
&mvccpb.KeyValue{
Key: []byte("k"),
Value: []byte("f: a\nb: a\n"),
Key: []byte("k"),
Value: []byte("f: a\nb: a\n"),
ModRevision: 1,
},
},
Expected: []integrity.Data{{
Source: "k",
Value: []byte("f: a\nb: a\n"),
Source: "k",
Value: []byte("f: a\nb: a\n"),
Revision: 1,
}},
},
{
Kvs: []*mvccpb.KeyValue{
&mvccpb.KeyValue{
Key: []byte("k"),
Value: []byte("f: a\nb: a\n"),
Key: []byte("k"),
Value: []byte("f: a\nb: a\n"),
ModRevision: 1,
},
&mvccpb.KeyValue{
Key: []byte("k"),
Value: []byte("f: b\nb: b\nc: b\n"),
Key: []byte("k"),
Value: []byte("f: b\nb: b\nc: b\n"),
ModRevision: 2,
},
},
Expected: []integrity.Data{
{
Source: "k",
Value: []byte("f: a\nb: a\n"),
Source: "k",
Value: []byte("f: a\nb: a\n"),
Revision: 1,
},
{
Source: "k",
Value: []byte("f: b\nb: b\nc: b\n"),
Source: "k",
Value: []byte("f: b\nb: b\nc: b\n"),
Revision: 2,
},
},
},
Expand Down Expand Up @@ -303,14 +292,16 @@ func TestEtcdKeyCollector_Collect_key(t *testing.T) {
mock := &MockEtcdGetter{
Kvs: []*mvccpb.KeyValue{
&mvccpb.KeyValue{
Key: []byte("k"),
Value: []byte("f: a\nb: a\n"),
Key: []byte("k"),
Value: []byte("f: a\nb: a\n"),
ModRevision: 1,
},
},
}
expected := []integrity.Data{{
Source: "k",
Value: []byte("f: a\nb: a\n"),
Source: "k",
Value: []byte("f: a\nb: a\n"),
Revision: 1,
}}

config, err := cluster.NewEtcdKeyCollector(mock, "foo", "key", 0).Collect()
Expand All @@ -324,12 +315,14 @@ func TestEtcdKeyCollector_Collect_too_many(t *testing.T) {
mock := &MockEtcdGetter{
Kvs: []*mvccpb.KeyValue{
&mvccpb.KeyValue{
Key: []byte("k"),
Value: []byte("f: a\nb: a\n"),
Key: []byte("k"),
Value: []byte("f: a\nb: a\n"),
ModRevision: 1,
},
&mvccpb.KeyValue{
Key: []byte("k"),
Value: []byte("f: b\nb: b\nc: b\n"),
Key: []byte("k"),
Value: []byte("f: b\nb: b\nc: b\n"),
ModRevision: 2,
},
},
}
Expand Down Expand Up @@ -364,7 +357,7 @@ func TestEtcdAllDataPublisher_Publish_get_inputs(t *testing.T) {
for _, tc := range cases {
t.Run(tc.Prefix, func(t *testing.T) {
mock := &MockEtcdTxnGetter{}
cluster.NewEtcdAllDataPublisher(mock, tc.Prefix, 0).Publish(data)
cluster.NewEtcdAllDataPublisher(mock, tc.Prefix, 0).Publish(0, data)

assert.NotNil(t, mock.Ctx)
assert.Equal(t, tc.Key, mock.Key)
Expand Down Expand Up @@ -431,7 +424,7 @@ func TestEtcdAllDataPublisher_Publish_txn_inputs(t *testing.T) {
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
publisher := cluster.NewEtcdAllDataPublisher(tc.Mock, "/foo", 0)
publisher.Publish([]byte{})
publisher.Publish(0, []byte{})

assert.Len(t, tc.Mock.TxnRet.IfCs, tc.IfLen)
assert.Len(t, tc.Mock.TxnRet.ThenOps, tc.ThenLen)
Expand Down Expand Up @@ -462,7 +455,7 @@ func TestEtcdDataPublishers_Publish_data_nil(t *testing.T) {

for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
err := tc.Publisher.Publish(nil)
err := tc.Publisher.Publish(0, nil)

assert.EqualError(t, err,
"failed to publish data into etcd: data does not exist")
Expand All @@ -482,7 +475,7 @@ func TestEtcdDataPublishers_Publish_publisher_nil(t *testing.T) {
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
assert.Panics(t, func() {
tc.Publisher.Publish([]byte{})
tc.Publisher.Publish(0, []byte{})
})
})
}
Expand Down Expand Up @@ -520,7 +513,7 @@ func TestEtcdAllDataPublisher_Publish_errors(t *testing.T) {
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
publisher := cluster.NewEtcdAllDataPublisher(tc.Mock, "prefix", 0)
err := publisher.Publish([]byte{})
err := publisher.Publish(0, []byte{})
if tc.Expected != "" {
assert.EqualError(t, err, tc.Expected)
} else {
Expand All @@ -530,14 +523,22 @@ func TestEtcdAllDataPublisher_Publish_errors(t *testing.T) {
}
}

func TestEtcdAllDataPublisher_Publish_revision(t *testing.T) {
mock := &MockEtcdTxnGetter{}
publisher := cluster.NewEtcdAllDataPublisher(mock, "prefix", 0)
err := publisher.Publish(1, []byte{})
assert.EqualError(t, err,
"failed to publish data into etcd: target revision 1 is not supported")
}

func TestEtcdAllDataPublisher_Publish_timeout(t *testing.T) {
cases := []time.Duration{0, 60 * time.Second}

for _, tc := range cases {
t.Run(fmt.Sprint(tc), func(t *testing.T) {
mock := &MockEtcdTxnGetter{}
publisher := cluster.NewEtcdAllDataPublisher(mock, "prefix", tc)
err := publisher.Publish([]byte{})
err := publisher.Publish(0, []byte{})

require.NoError(t, err)
require.NotNil(t, mock.Ctx)
Expand Down Expand Up @@ -574,7 +575,7 @@ func TestEtcdAllDataPublisher_Publish_timeout_exit(t *testing.T) {
timeout := 100 * time.Millisecond
delta := 30 * time.Millisecond
publisher := cluster.NewEtcdAllDataPublisher(mock, "prefix", timeout)
err := publisher.Publish([]byte{})
err := publisher.Publish(0, []byte{})
assert.EqualError(t, err, "context deadline exceeded")
assert.InDelta(t, timeout, time.Since(before), float64(delta))
}
Expand Down Expand Up @@ -605,37 +606,58 @@ func TestEtcdKeyDataPublisher_Publish_inputs(t *testing.T) {

for _, tc := range cases {
t.Run(tc.Expected, func(t *testing.T) {
mock := &MockEtcdPutter{Err: fmt.Errorf("foo")}
mock := &MockEtcdTxnGetter{TxnRet: &MockTxn{Err: fmt.Errorf("foo")}}
publisher := cluster.NewEtcdKeyDataPublisher(mock, tc.Prefix, tc.Key, 0)
publisher.Publish(data)
publisher.Publish(0, data)

assert.NotNil(t, mock.Ctx)
assert.Equal(t, tc.Expected, mock.Key)
assert.Equal(t, data, []byte(mock.Val))
assert.Len(t, mock.Ops, 0)
assert.NotNil(t, mock.CtxTxn)
assert.Equal(t,
[]clientv3.Op{clientv3.OpPut(tc.Expected, string(data))}, mock.TxnRet.ThenOps)
assert.Nil(t, mock.TxnRet.IfCs)
assert.Nil(t, mock.TxnRet.ElseOps)
})
}
}

func TestEtcdKeyDataPublisher_Publish_modRevision(t *testing.T) {
prefix := "/foo"
key := "key"
modRevision := int64(5)
data := []byte("foo bar")
expected := "/foo/config/key"
mock := &MockEtcdTxnGetter{TxnRet: &MockTxn{Err: fmt.Errorf("foo")}}
publisher := cluster.NewEtcdKeyDataPublisher(mock, prefix, key, 0)

publisher.Publish(modRevision, data)
assert.NotNil(t, mock.CtxTxn)
assert.Equal(t,
[]clientv3.Op{clientv3.OpPut(expected, string(data))},
mock.TxnRet.ThenOps)
assert.Equal(t,
[]clientv3.Cmp{clientv3.Compare(clientv3.ModRevision(expected), "=", modRevision)},
mock.TxnRet.IfCs)
assert.Nil(t, mock.TxnRet.ElseOps)
}

func TestEtcdKeyDataPublisher_Publish_error(t *testing.T) {
mock := &MockEtcdPutter{Err: fmt.Errorf("foo")}
mock := &MockEtcdTxnGetter{TxnRet: &MockTxn{Err: fmt.Errorf("foo")}}
publisher := cluster.NewEtcdKeyDataPublisher(mock, "", "", 0)
err := publisher.Publish([]byte{})
err := publisher.Publish(0, []byte{})

assert.EqualError(t, err, "failed to put data into etcd: foo")
}

func TestEtcdKeyDataPublisher_Publish_timeout(t *testing.T) {
cases := []time.Duration{0, 60 * time.Second}
mock := &MockEtcdPutter{Err: fmt.Errorf("foo")}
mock := &MockEtcdTxnGetter{TxnRet: &MockTxn{Err: fmt.Errorf("foo")}}

for _, tc := range cases {
t.Run(fmt.Sprint(tc), func(t *testing.T) {
publisher := cluster.NewEtcdKeyDataPublisher(mock, "", "", tc)
publisher.Publish([]byte{})
publisher.Publish(0, []byte{})

expected := time.Now().Add(tc)
deadline, ok := mock.Ctx.Deadline()
deadline, ok := mock.CtxTxn.Deadline()
if tc == 0 {
assert.False(t, ok)
} else {
Expand Down
6 changes: 5 additions & 1 deletion cli/cluster/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ func NewFileDataPublisher(path string) FileDataPublisher {
}

// Publish publishes the data to a file for the given path.
func (publisher FileDataPublisher) Publish(data []byte) error {
func (publisher FileDataPublisher) Publish(revision int64, data []byte) error {
if revision != 0 {
return fmt.Errorf("failed to publish data into file: target revision %d is not supported",
revision)
}
if publisher.path == "" {
return fmt.Errorf("file path is empty")
}
Expand Down
Loading

0 comments on commit c98976b

Please sign in to comment.