From 40c4b1e73c35307baa21fbe64b67e46a247f2fe7 Mon Sep 17 00:00:00 2001 From: John Saigle Date: Thu, 25 Jul 2024 17:14:05 -0400 Subject: [PATCH] node: Add ability to limit Flow Cancel to pairs of chains - Add a concept of "pipes" which consist of a pair of chains - Add field in the Governer that enables flow cancel only for "pipes" which are explicitly enabled - Set the Eth-Sui pipe as the only flow cancel-enabled pair. Sui is the only chain with frequent governor congestion so it is the only one that really needs the Flow Cancel capabilities. Ethereum is the main chain that Sui is interacting with in terms of volume moving in and out of Sui. - Refactor calling sites for flow cancel transfer functions so that all checks are done in one place. (Should help prevent bugs where only one calling site is updated.) --- node/pkg/governor/devnet_config.go | 9 +- node/pkg/governor/flow_cancel_pipes.go | 14 ++ node/pkg/governor/governor.go | 176 ++++++++++++++++++------- node/pkg/governor/governor_db.go | 24 +--- node/pkg/governor/testnet_config.go | 8 +- 5 files changed, 163 insertions(+), 68 deletions(-) create mode 100644 node/pkg/governor/flow_cancel_pipes.go diff --git a/node/pkg/governor/devnet_config.go b/node/pkg/governor/devnet_config.go index ed83636437..5a9cda8547 100644 --- a/node/pkg/governor/devnet_config.go +++ b/node/pkg/governor/devnet_config.go @@ -6,7 +6,7 @@ import ( "github.com/wormhole-foundation/wormhole/sdk/vaa" ) -func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry) { +func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry, []pipe) { gov.logger.Info("setting up devnet config") gov.dayLengthInMinutes = 5 @@ -18,10 +18,15 @@ func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigE } flowCancelTokens := []tokenConfigEntry{} + flowCancelPipes := []pipe{} if gov.flowCancelEnabled { flowCancelTokens = []tokenConfigEntry{ {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usd-coin", decimals: 6, price: 1.001}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 6780118.197035182 } + + flowCancelPipes = []pipe{ + {first: vaa.ChainIDEthereum, second: vaa.ChainIDSui}, + } } chains := []chainConfigEntry{ @@ -29,5 +34,5 @@ func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigE {emitterChainID: vaa.ChainIDEthereum, dailyLimit: 100000}, } - return tokens, flowCancelTokens, chains + return tokens, flowCancelTokens, chains, flowCancelPipes } diff --git a/node/pkg/governor/flow_cancel_pipes.go b/node/pkg/governor/flow_cancel_pipes.go new file mode 100644 index 0000000000..c3f428f0a5 --- /dev/null +++ b/node/pkg/governor/flow_cancel_pipes.go @@ -0,0 +1,14 @@ +package governor + +import "github.com/wormhole-foundation/wormhole/sdk/vaa" + +// FlowCancelPipes returns a list of `pipe`s representing a pair of chains for which flow canceling is enabled. +// In practice this list should contain pairs of chains that have a large amount of volume between each other. +// These are more likely to cause chronic congestion which flow canceling can help to alleviate. +// Pairs of chains that are not frequently congested do not need to enable flow canceling as they should have +// plenty of regular Governor capacity to work with. +func FlowCancelPipes() []pipe { + return []pipe{ + {first: vaa.ChainIDEthereum, second: vaa.ChainIDSui}, + } +} diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 8f6137b1c8..dab5569e84 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -118,8 +118,37 @@ type ( transfers []transfer pending []*pendingEntry } + + // Represents a pair of Governed chains. Ordering is arbitrary. + pipe struct { + first vaa.ChainID + second vaa.ChainID + } ) +// valid checks whether a pipe is valid +func (c *pipe) valid() bool { + // The elements must be different + if c.first == c.second { + return false + } + return true +} + +// equals checks whether two corrdidors are equal. This method exists to demonstrate that the ordering of the +// pipe's elements doesn't matter. It also makes it easier to check whether two chains are 'connected' by a pipe +// without needing to sort or manipulate the elements. +func (c *pipe) equals(c2 *pipe) bool { + if c.first == c2.first && c.second == c2.second { + return true + } + // Ordering doesn't matter + if c.first == c2.second && c2.first == c.second { + return true + } + return false +} + // newTransferFromDbTransfer performs a bounds check on dbTransfer.Value to ensure it can fit into int64. // This should always be the case for normal operation as dbTransfer.Value represents the USD value of a transfer. func newTransferFromDbTransfer(dbTransfer *db.Transfer) (tx transfer, err error) { @@ -131,6 +160,10 @@ func newTransferFromDbTransfer(dbTransfer *db.Transfer) (tx transfer, err error) // addFlowCancelTransfer appends a transfer to a ChainEntry's transfers property. // SECURITY: The calling code is responsible for ensuring that the asset within the transfer is a flow-cancelling asset. +// SECURITY: The calling code is responsible for ensuring that the transfer's source and destination has a matching +// +// flow cancel pipe. +// // SECURITY: This method performs validation to ensure that the Flow Cancel transfer is valid. This is important to // ensure that the Governor usage cannot be lowered due to malicious or invalid transfers. // - the Value must be negative (in order to represent an incoming value) @@ -202,6 +235,9 @@ type ChainGovernor struct { configPublishCounter int64 flowCancelEnabled bool coinGeckoApiKey string + // Pairs of chains for which flow canceling is enabled. Note that an asset may be flow canceling even if + // it was minted on a chain that is not configured to be an 'end' of any of the pipes. + flowCancelPipes []pipe } func NewChainGovernor( @@ -256,18 +292,25 @@ func (gov *ChainGovernor) initConfig() error { configChains := chainList() configTokens := tokenList() flowCancelTokens := []tokenConfigEntry{} + flowCancelPipes := []pipe{} if gov.env == common.UnsafeDevNet { - configTokens, flowCancelTokens, configChains = gov.initDevnetConfig() + configTokens, flowCancelTokens, configChains, flowCancelPipes = gov.initDevnetConfig() } else if gov.env == common.TestNet { - configTokens, flowCancelTokens, configChains = gov.initTestnetConfig() + configTokens, flowCancelTokens, configChains, flowCancelPipes = gov.initTestnetConfig() } else { // mainnet, unit tests, or accountant-mock if gov.flowCancelEnabled { flowCancelTokens = FlowCancelTokenList() + flowCancelPipes = FlowCancelPipes() } } + // We're done with this value for the rest of this function, so write it to the governor struct now + if gov.flowCancelEnabled { + gov.flowCancelPipes = flowCancelPipes + } + for _, ct := range configTokens { addr, err := vaa.StringToAddress(ct.addr) if err != nil { @@ -611,24 +654,13 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now // - This will cause the summed value of Sui to decrease. emitterChainEntry.transfers = append(emitterChainEntry.transfers, transfer) - // Add inverse transfer to destination chain entry if this asset can cancel flows. - key := tokenKey{chain: token.token.chain, addr: token.token.addr} - - tokenEntry := gov.tokens[key] - if tokenEntry != nil { - // Mandatory check to ensure that the token should be able to reduce the Governor limit. - if tokenEntry.flowCancels { - if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok { - if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil { - return false, err - } - } else { - gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", - zap.String("msgID", msg.MessageIDString()), - zap.String("hash", hash), zap.Error(err), - zap.Stringer("target chain", payload.TargetChain), - ) - } + if gov.flowCancelEnabled { + _, err = gov.tryAddFlowCancelTransfer(&transfer) + if err != nil { + gov.logger.Warn("Error when attempting to add a flow cancel transfer", + zap.Error(err), + ) + return false, err } } @@ -636,6 +668,20 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now return true, nil } +// pipeCanFlowCancel checks whether the pipe passed as an argument is present in the list of flow-cancel enabled +// pipes. This method returns false for all values if flow canceling is disabled. +func (gov *ChainGovernor) pipeCanFlowCancel(pipe *pipe) bool { + if !gov.flowCancelEnabled { + return false + } + for _, configuredPipe := range gov.flowCancelPipes { + if pipe.equals(&configuredPipe) { + return true + } + } + return false +} + // IsGovernedMsg determines if the message applies to the governor. It grabs the lock. func (gov *ChainGovernor) IsGovernedMsg(msg *common.MessagePublication) (msgIsGoverned bool, err error) { gov.mutex.Lock() @@ -854,33 +900,16 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP ce.transfers = append(ce.transfers, transfer) gov.msgsSeen[pe.hash] = transferComplete + if gov.flowCancelEnabled { + _, err := gov.tryAddFlowCancelTransfer(&transfer) + if err != nil { + gov.logger.Error("Error when attempting to add a flow cancel transfer", + zap.Error(err), + ) - // Add inverse transfer to destination chain entry if this asset can cancel flows. - key := tokenKey{chain: pe.token.token.chain, addr: pe.token.token.addr} - tokenEntry := gov.tokens[key] - if tokenEntry != nil { - // Mandatory check to ensure that the token should be able to reduce the Governor limit. - if tokenEntry.flowCancels { - if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok { - - if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil { - gov.logger.Warn("could not add flow canceling transfer to destination chain", - zap.String("msgID", dbTransfer.MsgID), - zap.String("hash", pe.hash), - zap.Error(err), - ) - // Process the next pending transfer - continue - } - } else { - gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", - zap.String("msgID", dbTransfer.MsgID), - zap.String("hash", pe.hash), zap.Error(err), - zap.Stringer("target chain", payload.TargetChain), - ) - } } } + } else { delete(gov.msgsSeen, pe.hash) } @@ -924,6 +953,65 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) { return value, nil } +// tryAddFlowCancelTransfer adds inverse transfer to destination chain entry if this asset can cancel flows. +func (gov *ChainGovernor) tryAddFlowCancelTransfer(transfer *transfer) (added bool, err error) { + added = false + + originChain := transfer.dbTransfer.OriginChain + originAddr := transfer.dbTransfer.OriginAddress + hash := transfer.dbTransfer.Hash + target := transfer.dbTransfer.TargetChain + emitter := transfer.dbTransfer.EmitterChain + + pipe := &pipe{emitter, target} + if !gov.pipeCanFlowCancel(pipe) { + return false, nil + } + + key := tokenKey{originChain, originAddr} + tokenEntry := gov.tokens[key] + if tokenEntry == nil { + // Weird but not critical + gov.logger.Warn("Not adding flow cancel transfer because token is not governed", + zap.String("OriginChain", originChain.String()), + zap.String("tokenEntry", originAddr.String()), + zap.String("msgID", transfer.dbTransfer.MsgID), + zap.String("hash", transfer.dbTransfer.Hash), + ) + return false, nil + } + if !tokenEntry.flowCancels { + // Nothing to do in this case + return false, nil + } + + // Ensure destination exists + destinationChainEntry, ok := gov.chains[transfer.dbTransfer.TargetChain] + if !ok { + // Weird that TargetChain does not exist but not relevant for the flow cancel feature. We just + // fail closed here: do not add the flow cancelling transfer. + gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", + zap.String("msgID", transfer.dbTransfer.MsgID), + zap.String("hash", hash), + zap.Stringer("target chain", transfer.dbTransfer.TargetChain), + ) + return false, nil + } + + // Add the 'inverse' transfer to the destination chain entry. + if err := destinationChainEntry.addFlowCancelTransfer(transfer.inverse()); err != nil { + gov.logger.Warn("could not add flow canceling transfer to destination chain", + zap.String("msgID", transfer.dbTransfer.MsgID), + zap.String("hash", transfer.dbTransfer.Hash), + zap.Error(err), + ) + // Process the next pending transfer + return false, err + } + + return true, nil +} + // TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `chainEntry`. In effect, it represents a // chain's "Governor Usage" for a given 24 hour period. // This sum may be reduced by the sum of 'flow cancelling' transfers: that is, transfers of an allow-listed token diff --git a/node/pkg/governor/governor_db.go b/node/pkg/governor/governor_db.go index db928ce868..5304cfc603 100644 --- a/node/pkg/governor/governor_db.go +++ b/node/pkg/governor/governor_db.go @@ -246,26 +246,10 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error { // Reload flow-cancel transfers for the TargetChain. This is important when the node restarts so that a corresponding, // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` and // `CheckPending` loops but those functions do not capture flow-cancelling when the node is restarted. - tokenEntry := gov.tokens[tk] - if tokenEntry != nil { - // Mandatory check to ensure that the token should be able to reduce the Governor limit. - if tokenEntry.flowCancels { - if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok { - if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil { - gov.logger.Error("could not add flow canceling transfer to destination chain", - zap.String("msgID", xfer.MsgID), - zap.String("hash", xfer.Hash), zap.Error(err), - ) - return err - } - } else { - gov.logger.Error("tried to cancel flow but chain entry for target chain does not exist", - zap.String("msgID", xfer.MsgID), - zap.Stringer("token chain", xfer.OriginChain), - zap.Stringer("token address", xfer.OriginAddress), - zap.Stringer("target chain", xfer.TargetChain), - ) - } + if gov.flowCancelEnabled { + _, err := gov.tryAddFlowCancelTransfer(&transfer) + if err != nil { + return err } } return nil diff --git a/node/pkg/governor/testnet_config.go b/node/pkg/governor/testnet_config.go index 2f01878b9c..f670c5170f 100644 --- a/node/pkg/governor/testnet_config.go +++ b/node/pkg/governor/testnet_config.go @@ -6,7 +6,7 @@ import ( "github.com/wormhole-foundation/wormhole/sdk/vaa" ) -func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry) { +func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry, []pipe) { gov.logger.Info("setting up testnet config") tokens := []tokenConfigEntry{ @@ -15,11 +15,15 @@ func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfig } flowCancelTokens := []tokenConfigEntry{} + flowCancelPipes := []pipe{} if gov.flowCancelEnabled { flowCancelTokens = []tokenConfigEntry{ {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usd-coin", decimals: 6, price: 1.001}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 6780118.197035182 } + flowCancelPipes = []pipe{ + {first: vaa.ChainIDEthereum, second: vaa.ChainIDSui}, + } } chains := []chainConfigEntry{ @@ -28,5 +32,5 @@ func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfig {emitterChainID: vaa.ChainIDFantom, dailyLimit: 1000000}, } - return tokens, flowCancelTokens, chains + return tokens, flowCancelTokens, chains, flowCancelPipes }