Skip to content

Commit

Permalink
Merge pull request #1388 from kaleido-io/eventstreams
Browse files Browse the repository at this point in the history
Use separate event stream per namespace
  • Loading branch information
nguyer authored Mar 4, 2024
2 parents 2e7263c + dd281b8 commit 5964d51
Show file tree
Hide file tree
Showing 35 changed files with 1,752 additions and 1,593 deletions.
3 changes: 3 additions & 0 deletions db/migrations/postgres/000116_tx_type_not_null.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
BEGIN;
ALTER TABLE messages ALTER COLUMN tx_parent_type DROP NOT NULL;
COMMIT;
5 changes: 5 additions & 0 deletions db/migrations/postgres/000116_tx_type_not_null.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
BEGIN;
UPDATE messages SET tx_parent_type = ''
WHERE tx_parent_type IS NULL;
ALTER TABLE messages ALTER COLUMN tx_parent_type SET NOT NULL;
COMMIT;
4 changes: 4 additions & 0 deletions db/migrations/sqlite/000116_tx_type_not_null.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE messages RENAME COLUMN tx_parent_type TO tx_parent_type_temp;
ALTER TABLE messages ADD COLUMN tx_parent_type VARCHAR(64);
UPDATE messages SET tx_parent_type = tx_parent_type_temp;
ALTER TABLE messages DROP COLUMN tx_parent_type_temp;
5 changes: 5 additions & 0 deletions db/migrations/sqlite/000116_tx_type_not_null.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
UPDATE messages SET tx_parent_type = '' WHERE tx_parent_type IS NULL;
ALTER TABLE messages RENAME COLUMN tx_parent_type TO tx_parent_type_temp;
ALTER TABLE messages ADD COLUMN tx_parent_type VARCHAR(64) DEFAULT '' NOT NULL;
UPDATE messages SET tx_parent_type = tx_parent_type_temp;
ALTER TABLE messages DROP COLUMN tx_parent_type_temp;
646 changes: 75 additions & 571 deletions go.work.sum

Large diffs are not rendered by default.

25 changes: 25 additions & 0 deletions internal/assets/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ type Manager interface {
// From operations.OperationHandler
PrepareOperation(ctx context.Context, op *core.Operation) (*core.PreparedOperation, error)
RunOperation(ctx context.Context, op *core.PreparedOperation) (outputs fftypes.JSONObject, phase core.OpPhase, err error)

// Starts the namespace on each of the configured token plugins
Start(ctx context.Context) error
}

type assetManager struct {
Expand Down Expand Up @@ -170,6 +173,28 @@ func (am *assetManager) GetTokenConnectors(ctx context.Context) []*core.TokenCon
return connectors
}

func (am *assetManager) Start(ctx context.Context) error {
f := database.TokenPoolQueryFactory.NewFilter(ctx).And()
pools, _, err := am.database.GetTokenPools(ctx, am.namespace, f)
if err != nil {
return err
}

for _, plugin := range am.tokens {
activePools := []*core.TokenPool{}
for _, pool := range pools {
if pool.Connector == plugin.ConnectorName() && pool.Active {
activePools = append(activePools, pool)
}
}
err := plugin.StartNamespace(ctx, am.namespace, activePools)
if err != nil {
return err
}
}
return nil
}

func (am *assetManager) getDefaultTokenConnector(ctx context.Context) (string, error) {
tokenConnectors := am.GetTokenConnectors(ctx)
if len(tokenConnectors) != 1 {
Expand Down
84 changes: 84 additions & 0 deletions internal/assets/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package assets
import (
"context"
"errors"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -160,3 +161,86 @@ func TestGetTokenConnectors(t *testing.T) {
assert.Equal(t, 1, len(connectors))
assert.Equal(t, "magic-tokens", connectors[0].Name)
}

func TestStart(t *testing.T) {
coreconfig.Reset()
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mim := &identitymanagermocks.Manager{}
msa := &syncasyncmocks.Bridge{}
mbm := &broadcastmocks.Manager{}
mpm := &privatemessagingmocks.Manager{}
mti := &tokenmocks.Plugin{}
mm := &metricsmocks.Manager{}
mom := &operationmocks.Manager{}
mcm := &contractmocks.Manager{}
cmi := &cachemocks.Manager{}
cmi.On("GetCache", mock.Anything).Return(nil, nil)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)
mdi.On("GetTokenPools", mock.Anything, mock.Anything, mock.Anything).Return([]*core.TokenPool{
{
Connector: "hot_tokens",
Active: true,
},
}, nil, nil)
mti.On("StartNamespace", mock.Anything, mock.Anything, mock.Anything).Return(nil)
mti.On("ConnectorName").Return("hot_tokens")
txHelper, _ := txcommon.NewTransactionHelper(context.Background(), "ns1", mdi, mdm, cmi)
am, err := NewAssetManager(context.Background(), "ns1", "blockchain_plugin", mdi, map[string]tokens.Plugin{"magic-tokens": mti}, mim, msa, mbm, mpm, mm, mom, mcm, txHelper, cmi)
assert.NoError(t, err)
err = am.Start(context.Background())
assert.NoError(t, err)
}

func TestStartDBError(t *testing.T) {
coreconfig.Reset()
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mim := &identitymanagermocks.Manager{}
msa := &syncasyncmocks.Bridge{}
mbm := &broadcastmocks.Manager{}
mpm := &privatemessagingmocks.Manager{}
mti := &tokenmocks.Plugin{}
mm := &metricsmocks.Manager{}
mom := &operationmocks.Manager{}
mcm := &contractmocks.Manager{}
cmi := &cachemocks.Manager{}
cmi.On("GetCache", mock.Anything).Return(nil, nil)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)
mdi.On("GetTokenPools", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop"))
txHelper, _ := txcommon.NewTransactionHelper(context.Background(), "ns1", mdi, mdm, cmi)
am, err := NewAssetManager(context.Background(), "ns1", "blockchain_plugin", mdi, map[string]tokens.Plugin{"magic-tokens": mti}, mim, msa, mbm, mpm, mm, mom, mcm, txHelper, cmi)
assert.NoError(t, err)
err = am.Start(context.Background())
assert.Regexp(t, "pop", err)
}

func TestStartError(t *testing.T) {
coreconfig.Reset()
mdi := &databasemocks.Plugin{}
mdm := &datamocks.Manager{}
mim := &identitymanagermocks.Manager{}
msa := &syncasyncmocks.Bridge{}
mbm := &broadcastmocks.Manager{}
mpm := &privatemessagingmocks.Manager{}
mti := &tokenmocks.Plugin{}
mm := &metricsmocks.Manager{}
mom := &operationmocks.Manager{}
mcm := &contractmocks.Manager{}
cmi := &cachemocks.Manager{}
cmi.On("GetCache", mock.Anything).Return(nil, nil)
mom.On("RegisterHandler", mock.Anything, mock.Anything, mock.Anything)
mdi.On("GetTokenPools", mock.Anything, mock.Anything, mock.Anything).Return([]*core.TokenPool{
{
Connector: "hot_tokens",
Active: true,
},
}, nil, nil)
mti.On("StartNamespace", mock.Anything, mock.Anything, mock.Anything).Return(fmt.Errorf("pop"))
mti.On("ConnectorName").Return("hot_tokens")
txHelper, _ := txcommon.NewTransactionHelper(context.Background(), "ns1", mdi, mdm, cmi)
am, err := NewAssetManager(context.Background(), "ns1", "blockchain_plugin", mdi, map[string]tokens.Plugin{"magic-tokens": mti}, mim, msa, mbm, mpm, mm, mom, mcm, txHelper, cmi)
assert.NoError(t, err)
err = am.Start(context.Background())
assert.Regexp(t, "pop", err)
}
3 changes: 3 additions & 0 deletions internal/assets/token_approval.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ func (am *assetManager) NewApproval(approval *core.TokenApprovalInput) syncasync
approval: approval,
idempotentSubmit: approval.IdempotencyKey != "",
}
if approval.Namespace == "" {
approval.Namespace = am.namespace
}
sender.setDefaults()
return sender
}
Expand Down
9 changes: 9 additions & 0 deletions internal/assets/token_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ func (am *assetManager) validateTransfer(ctx context.Context, transfer *core.Tok

func (am *assetManager) MintTokens(ctx context.Context, transfer *core.TokenTransferInput, waitConfirm bool) (out *core.TokenTransfer, err error) {
transfer.Type = core.TokenTransferTypeMint
if transfer.Namespace == "" {
transfer.Namespace = am.namespace
}

sender := am.NewTransfer(transfer)
if am.metrics.IsMetricsEnabled() {
Expand All @@ -135,6 +138,9 @@ func (am *assetManager) MintTokens(ctx context.Context, transfer *core.TokenTran

func (am *assetManager) BurnTokens(ctx context.Context, transfer *core.TokenTransferInput, waitConfirm bool) (out *core.TokenTransfer, err error) {
transfer.Type = core.TokenTransferTypeBurn
if transfer.Namespace == "" {
transfer.Namespace = am.namespace
}

sender := am.NewTransfer(transfer)
if am.metrics.IsMetricsEnabled() {
Expand All @@ -150,6 +156,9 @@ func (am *assetManager) BurnTokens(ctx context.Context, transfer *core.TokenTran

func (am *assetManager) TransferTokens(ctx context.Context, transfer *core.TokenTransferInput, waitConfirm bool) (out *core.TokenTransfer, err error) {
transfer.Type = core.TokenTransferTypeTransfer
if transfer.Namespace == "" {
transfer.Namespace = am.namespace
}

sender := am.NewTransfer(transfer)
if am.metrics.IsMetricsEnabled() {
Expand Down
Loading

0 comments on commit 5964d51

Please sign in to comment.