diff --git a/node/pkg/governor/devnet_config.go b/node/pkg/governor/devnet_config.go index ed83636437..5a9cda8547 100644 --- a/node/pkg/governor/devnet_config.go +++ b/node/pkg/governor/devnet_config.go @@ -6,7 +6,7 @@ import ( "github.com/wormhole-foundation/wormhole/sdk/vaa" ) -func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry) { +func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry, []pipe) { gov.logger.Info("setting up devnet config") gov.dayLengthInMinutes = 5 @@ -18,10 +18,15 @@ func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigE } flowCancelTokens := []tokenConfigEntry{} + flowCancelPipes := []pipe{} if gov.flowCancelEnabled { flowCancelTokens = []tokenConfigEntry{ {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usd-coin", decimals: 6, price: 1.001}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 6780118.197035182 } + + flowCancelPipes = []pipe{ + {first: vaa.ChainIDEthereum, second: vaa.ChainIDSui}, + } } chains := []chainConfigEntry{ @@ -29,5 +34,5 @@ func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigE {emitterChainID: vaa.ChainIDEthereum, dailyLimit: 100000}, } - return tokens, flowCancelTokens, chains + return tokens, flowCancelTokens, chains, flowCancelPipes } diff --git a/node/pkg/governor/flow_cancel_pipes.go b/node/pkg/governor/flow_cancel_pipes.go new file mode 100644 index 0000000000..c3f428f0a5 --- /dev/null +++ b/node/pkg/governor/flow_cancel_pipes.go @@ -0,0 +1,14 @@ +package governor + +import "github.com/wormhole-foundation/wormhole/sdk/vaa" + +// FlowCancelPipes returns a list of `pipe`s representing a pair of chains for which flow canceling is enabled. +// In practice this list should contain pairs of chains that have a large amount of volume between each other. +// These are more likely to cause chronic congestion which flow canceling can help to alleviate. +// Pairs of chains that are not frequently congested do not need to enable flow canceling as they should have +// plenty of regular Governor capacity to work with. +func FlowCancelPipes() []pipe { + return []pipe{ + {first: vaa.ChainIDEthereum, second: vaa.ChainIDSui}, + } +} diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 8f6137b1c8..dab5569e84 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -118,8 +118,37 @@ type ( transfers []transfer pending []*pendingEntry } + + // Represents a pair of Governed chains. Ordering is arbitrary. + pipe struct { + first vaa.ChainID + second vaa.ChainID + } ) +// valid checks whether a pipe is valid +func (c *pipe) valid() bool { + // The elements must be different + if c.first == c.second { + return false + } + return true +} + +// equals checks whether two corrdidors are equal. This method exists to demonstrate that the ordering of the +// pipe's elements doesn't matter. It also makes it easier to check whether two chains are 'connected' by a pipe +// without needing to sort or manipulate the elements. +func (c *pipe) equals(c2 *pipe) bool { + if c.first == c2.first && c.second == c2.second { + return true + } + // Ordering doesn't matter + if c.first == c2.second && c2.first == c.second { + return true + } + return false +} + // newTransferFromDbTransfer performs a bounds check on dbTransfer.Value to ensure it can fit into int64. // This should always be the case for normal operation as dbTransfer.Value represents the USD value of a transfer. func newTransferFromDbTransfer(dbTransfer *db.Transfer) (tx transfer, err error) { @@ -131,6 +160,10 @@ func newTransferFromDbTransfer(dbTransfer *db.Transfer) (tx transfer, err error) // addFlowCancelTransfer appends a transfer to a ChainEntry's transfers property. // SECURITY: The calling code is responsible for ensuring that the asset within the transfer is a flow-cancelling asset. +// SECURITY: The calling code is responsible for ensuring that the transfer's source and destination has a matching +// +// flow cancel pipe. +// // SECURITY: This method performs validation to ensure that the Flow Cancel transfer is valid. This is important to // ensure that the Governor usage cannot be lowered due to malicious or invalid transfers. // - the Value must be negative (in order to represent an incoming value) @@ -202,6 +235,9 @@ type ChainGovernor struct { configPublishCounter int64 flowCancelEnabled bool coinGeckoApiKey string + // Pairs of chains for which flow canceling is enabled. Note that an asset may be flow canceling even if + // it was minted on a chain that is not configured to be an 'end' of any of the pipes. + flowCancelPipes []pipe } func NewChainGovernor( @@ -256,18 +292,25 @@ func (gov *ChainGovernor) initConfig() error { configChains := chainList() configTokens := tokenList() flowCancelTokens := []tokenConfigEntry{} + flowCancelPipes := []pipe{} if gov.env == common.UnsafeDevNet { - configTokens, flowCancelTokens, configChains = gov.initDevnetConfig() + configTokens, flowCancelTokens, configChains, flowCancelPipes = gov.initDevnetConfig() } else if gov.env == common.TestNet { - configTokens, flowCancelTokens, configChains = gov.initTestnetConfig() + configTokens, flowCancelTokens, configChains, flowCancelPipes = gov.initTestnetConfig() } else { // mainnet, unit tests, or accountant-mock if gov.flowCancelEnabled { flowCancelTokens = FlowCancelTokenList() + flowCancelPipes = FlowCancelPipes() } } + // We're done with this value for the rest of this function, so write it to the governor struct now + if gov.flowCancelEnabled { + gov.flowCancelPipes = flowCancelPipes + } + for _, ct := range configTokens { addr, err := vaa.StringToAddress(ct.addr) if err != nil { @@ -611,24 +654,13 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now // - This will cause the summed value of Sui to decrease. emitterChainEntry.transfers = append(emitterChainEntry.transfers, transfer) - // Add inverse transfer to destination chain entry if this asset can cancel flows. - key := tokenKey{chain: token.token.chain, addr: token.token.addr} - - tokenEntry := gov.tokens[key] - if tokenEntry != nil { - // Mandatory check to ensure that the token should be able to reduce the Governor limit. - if tokenEntry.flowCancels { - if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok { - if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil { - return false, err - } - } else { - gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", - zap.String("msgID", msg.MessageIDString()), - zap.String("hash", hash), zap.Error(err), - zap.Stringer("target chain", payload.TargetChain), - ) - } + if gov.flowCancelEnabled { + _, err = gov.tryAddFlowCancelTransfer(&transfer) + if err != nil { + gov.logger.Warn("Error when attempting to add a flow cancel transfer", + zap.Error(err), + ) + return false, err } } @@ -636,6 +668,20 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now return true, nil } +// pipeCanFlowCancel checks whether the pipe passed as an argument is present in the list of flow-cancel enabled +// pipes. This method returns false for all values if flow canceling is disabled. +func (gov *ChainGovernor) pipeCanFlowCancel(pipe *pipe) bool { + if !gov.flowCancelEnabled { + return false + } + for _, configuredPipe := range gov.flowCancelPipes { + if pipe.equals(&configuredPipe) { + return true + } + } + return false +} + // IsGovernedMsg determines if the message applies to the governor. It grabs the lock. func (gov *ChainGovernor) IsGovernedMsg(msg *common.MessagePublication) (msgIsGoverned bool, err error) { gov.mutex.Lock() @@ -854,33 +900,16 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP ce.transfers = append(ce.transfers, transfer) gov.msgsSeen[pe.hash] = transferComplete + if gov.flowCancelEnabled { + _, err := gov.tryAddFlowCancelTransfer(&transfer) + if err != nil { + gov.logger.Error("Error when attempting to add a flow cancel transfer", + zap.Error(err), + ) - // Add inverse transfer to destination chain entry if this asset can cancel flows. - key := tokenKey{chain: pe.token.token.chain, addr: pe.token.token.addr} - tokenEntry := gov.tokens[key] - if tokenEntry != nil { - // Mandatory check to ensure that the token should be able to reduce the Governor limit. - if tokenEntry.flowCancels { - if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok { - - if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil { - gov.logger.Warn("could not add flow canceling transfer to destination chain", - zap.String("msgID", dbTransfer.MsgID), - zap.String("hash", pe.hash), - zap.Error(err), - ) - // Process the next pending transfer - continue - } - } else { - gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", - zap.String("msgID", dbTransfer.MsgID), - zap.String("hash", pe.hash), zap.Error(err), - zap.Stringer("target chain", payload.TargetChain), - ) - } } } + } else { delete(gov.msgsSeen, pe.hash) } @@ -924,6 +953,65 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) { return value, nil } +// tryAddFlowCancelTransfer adds inverse transfer to destination chain entry if this asset can cancel flows. +func (gov *ChainGovernor) tryAddFlowCancelTransfer(transfer *transfer) (added bool, err error) { + added = false + + originChain := transfer.dbTransfer.OriginChain + originAddr := transfer.dbTransfer.OriginAddress + hash := transfer.dbTransfer.Hash + target := transfer.dbTransfer.TargetChain + emitter := transfer.dbTransfer.EmitterChain + + pipe := &pipe{emitter, target} + if !gov.pipeCanFlowCancel(pipe) { + return false, nil + } + + key := tokenKey{originChain, originAddr} + tokenEntry := gov.tokens[key] + if tokenEntry == nil { + // Weird but not critical + gov.logger.Warn("Not adding flow cancel transfer because token is not governed", + zap.String("OriginChain", originChain.String()), + zap.String("tokenEntry", originAddr.String()), + zap.String("msgID", transfer.dbTransfer.MsgID), + zap.String("hash", transfer.dbTransfer.Hash), + ) + return false, nil + } + if !tokenEntry.flowCancels { + // Nothing to do in this case + return false, nil + } + + // Ensure destination exists + destinationChainEntry, ok := gov.chains[transfer.dbTransfer.TargetChain] + if !ok { + // Weird that TargetChain does not exist but not relevant for the flow cancel feature. We just + // fail closed here: do not add the flow cancelling transfer. + gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", + zap.String("msgID", transfer.dbTransfer.MsgID), + zap.String("hash", hash), + zap.Stringer("target chain", transfer.dbTransfer.TargetChain), + ) + return false, nil + } + + // Add the 'inverse' transfer to the destination chain entry. + if err := destinationChainEntry.addFlowCancelTransfer(transfer.inverse()); err != nil { + gov.logger.Warn("could not add flow canceling transfer to destination chain", + zap.String("msgID", transfer.dbTransfer.MsgID), + zap.String("hash", transfer.dbTransfer.Hash), + zap.Error(err), + ) + // Process the next pending transfer + return false, err + } + + return true, nil +} + // TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `chainEntry`. In effect, it represents a // chain's "Governor Usage" for a given 24 hour period. // This sum may be reduced by the sum of 'flow cancelling' transfers: that is, transfers of an allow-listed token diff --git a/node/pkg/governor/governor_db.go b/node/pkg/governor/governor_db.go index db928ce868..5304cfc603 100644 --- a/node/pkg/governor/governor_db.go +++ b/node/pkg/governor/governor_db.go @@ -246,26 +246,10 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error { // Reload flow-cancel transfers for the TargetChain. This is important when the node restarts so that a corresponding, // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` and // `CheckPending` loops but those functions do not capture flow-cancelling when the node is restarted. - tokenEntry := gov.tokens[tk] - if tokenEntry != nil { - // Mandatory check to ensure that the token should be able to reduce the Governor limit. - if tokenEntry.flowCancels { - if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok { - if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil { - gov.logger.Error("could not add flow canceling transfer to destination chain", - zap.String("msgID", xfer.MsgID), - zap.String("hash", xfer.Hash), zap.Error(err), - ) - return err - } - } else { - gov.logger.Error("tried to cancel flow but chain entry for target chain does not exist", - zap.String("msgID", xfer.MsgID), - zap.Stringer("token chain", xfer.OriginChain), - zap.Stringer("token address", xfer.OriginAddress), - zap.Stringer("target chain", xfer.TargetChain), - ) - } + if gov.flowCancelEnabled { + _, err := gov.tryAddFlowCancelTransfer(&transfer) + if err != nil { + return err } } return nil diff --git a/node/pkg/governor/testnet_config.go b/node/pkg/governor/testnet_config.go index 2f01878b9c..f670c5170f 100644 --- a/node/pkg/governor/testnet_config.go +++ b/node/pkg/governor/testnet_config.go @@ -6,7 +6,7 @@ import ( "github.com/wormhole-foundation/wormhole/sdk/vaa" ) -func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry) { +func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry, []pipe) { gov.logger.Info("setting up testnet config") tokens := []tokenConfigEntry{ @@ -15,11 +15,15 @@ func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfig } flowCancelTokens := []tokenConfigEntry{} + flowCancelPipes := []pipe{} if gov.flowCancelEnabled { flowCancelTokens = []tokenConfigEntry{ {chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usd-coin", decimals: 6, price: 1.001}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 6780118.197035182 } + flowCancelPipes = []pipe{ + {first: vaa.ChainIDEthereum, second: vaa.ChainIDSui}, + } } chains := []chainConfigEntry{ @@ -28,5 +32,5 @@ func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfig {emitterChainID: vaa.ChainIDFantom, dailyLimit: 1000000}, } - return tokens, flowCancelTokens, chains + return tokens, flowCancelTokens, chains, flowCancelPipes }