Skip to content

Commit

Permalink
node: Add ability to limit Flow Cancel to pairs of chains
Browse files Browse the repository at this point in the history
- Add a concept of "pipes" which consist of a pair of chains
- Add field in the Governer that enables flow cancel only for "pipes"
  which are explicitly enabled
- Set the Eth-Sui pipe as the only flow cancel-enabled pair. Sui is the
  only chain with frequent governor congestion so it is the only one
  that really needs the Flow Cancel capabilities. Ethereum is the main
  chain that Sui is interacting with in terms of volume moving in and
  out of Sui.
- Refactor calling sites for flow cancel transfer functions so that all
  checks are done in one place. (Should help prevent bugs where only one
  calling site is updated.)
  • Loading branch information
johnsaigle committed Jan 21, 2025
1 parent 4f0e46f commit 40c4b1e
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 68 deletions.
9 changes: 7 additions & 2 deletions node/pkg/governor/devnet_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)

func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry) {
func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry, []pipe) {
gov.logger.Info("setting up devnet config")

gov.dayLengthInMinutes = 5
Expand All @@ -18,16 +18,21 @@ func (gov *ChainGovernor) initDevnetConfig() ([]tokenConfigEntry, []tokenConfigE
}

flowCancelTokens := []tokenConfigEntry{}
flowCancelPipes := []pipe{}
if gov.flowCancelEnabled {
flowCancelTokens = []tokenConfigEntry{
{chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usd-coin", decimals: 6, price: 1.001}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 6780118.197035182
}

flowCancelPipes = []pipe{
{first: vaa.ChainIDEthereum, second: vaa.ChainIDSui},
}
}

chains := []chainConfigEntry{
{emitterChainID: vaa.ChainIDSolana, dailyLimit: 100, bigTransactionSize: 75},
{emitterChainID: vaa.ChainIDEthereum, dailyLimit: 100000},
}

return tokens, flowCancelTokens, chains
return tokens, flowCancelTokens, chains, flowCancelPipes
}
14 changes: 14 additions & 0 deletions node/pkg/governor/flow_cancel_pipes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package governor

import "github.com/wormhole-foundation/wormhole/sdk/vaa"

// FlowCancelPipes returns a list of `pipe`s representing a pair of chains for which flow canceling is enabled.
// In practice this list should contain pairs of chains that have a large amount of volume between each other.
// These are more likely to cause chronic congestion which flow canceling can help to alleviate.
// Pairs of chains that are not frequently congested do not need to enable flow canceling as they should have
// plenty of regular Governor capacity to work with.
func FlowCancelPipes() []pipe {
return []pipe{
{first: vaa.ChainIDEthereum, second: vaa.ChainIDSui},
}
}
176 changes: 132 additions & 44 deletions node/pkg/governor/governor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,37 @@ type (
transfers []transfer
pending []*pendingEntry
}

// Represents a pair of Governed chains. Ordering is arbitrary.
pipe struct {
first vaa.ChainID
second vaa.ChainID
}
)

// valid checks whether a pipe is valid
func (c *pipe) valid() bool {
// The elements must be different
if c.first == c.second {
return false
}
return true
}

// equals checks whether two corrdidors are equal. This method exists to demonstrate that the ordering of the
// pipe's elements doesn't matter. It also makes it easier to check whether two chains are 'connected' by a pipe
// without needing to sort or manipulate the elements.
func (c *pipe) equals(c2 *pipe) bool {
if c.first == c2.first && c.second == c2.second {
return true
}
// Ordering doesn't matter
if c.first == c2.second && c2.first == c.second {
return true
}
return false
}

// newTransferFromDbTransfer performs a bounds check on dbTransfer.Value to ensure it can fit into int64.
// This should always be the case for normal operation as dbTransfer.Value represents the USD value of a transfer.
func newTransferFromDbTransfer(dbTransfer *db.Transfer) (tx transfer, err error) {
Expand All @@ -131,6 +160,10 @@ func newTransferFromDbTransfer(dbTransfer *db.Transfer) (tx transfer, err error)

// addFlowCancelTransfer appends a transfer to a ChainEntry's transfers property.
// SECURITY: The calling code is responsible for ensuring that the asset within the transfer is a flow-cancelling asset.
// SECURITY: The calling code is responsible for ensuring that the transfer's source and destination has a matching
//
// flow cancel pipe.
//
// SECURITY: This method performs validation to ensure that the Flow Cancel transfer is valid. This is important to
// ensure that the Governor usage cannot be lowered due to malicious or invalid transfers.
// - the Value must be negative (in order to represent an incoming value)
Expand Down Expand Up @@ -202,6 +235,9 @@ type ChainGovernor struct {
configPublishCounter int64
flowCancelEnabled bool
coinGeckoApiKey string
// Pairs of chains for which flow canceling is enabled. Note that an asset may be flow canceling even if
// it was minted on a chain that is not configured to be an 'end' of any of the pipes.
flowCancelPipes []pipe
}

func NewChainGovernor(
Expand Down Expand Up @@ -256,18 +292,25 @@ func (gov *ChainGovernor) initConfig() error {
configChains := chainList()
configTokens := tokenList()
flowCancelTokens := []tokenConfigEntry{}
flowCancelPipes := []pipe{}

if gov.env == common.UnsafeDevNet {
configTokens, flowCancelTokens, configChains = gov.initDevnetConfig()
configTokens, flowCancelTokens, configChains, flowCancelPipes = gov.initDevnetConfig()
} else if gov.env == common.TestNet {
configTokens, flowCancelTokens, configChains = gov.initTestnetConfig()
configTokens, flowCancelTokens, configChains, flowCancelPipes = gov.initTestnetConfig()
} else {
// mainnet, unit tests, or accountant-mock
if gov.flowCancelEnabled {
flowCancelTokens = FlowCancelTokenList()
flowCancelPipes = FlowCancelPipes()
}
}

// We're done with this value for the rest of this function, so write it to the governor struct now
if gov.flowCancelEnabled {
gov.flowCancelPipes = flowCancelPipes
}

for _, ct := range configTokens {
addr, err := vaa.StringToAddress(ct.addr)
if err != nil {
Expand Down Expand Up @@ -611,31 +654,34 @@ func (gov *ChainGovernor) ProcessMsgForTime(msg *common.MessagePublication, now
// - This will cause the summed value of Sui to decrease.
emitterChainEntry.transfers = append(emitterChainEntry.transfers, transfer)

// Add inverse transfer to destination chain entry if this asset can cancel flows.
key := tokenKey{chain: token.token.chain, addr: token.token.addr}

tokenEntry := gov.tokens[key]
if tokenEntry != nil {
// Mandatory check to ensure that the token should be able to reduce the Governor limit.
if tokenEntry.flowCancels {
if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok {
if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil {
return false, err
}
} else {
gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist",
zap.String("msgID", msg.MessageIDString()),
zap.String("hash", hash), zap.Error(err),
zap.Stringer("target chain", payload.TargetChain),
)
}
if gov.flowCancelEnabled {
_, err = gov.tryAddFlowCancelTransfer(&transfer)
if err != nil {
gov.logger.Warn("Error when attempting to add a flow cancel transfer",
zap.Error(err),
)
return false, err
}
}

gov.msgsSeen[hash] = transferComplete
return true, nil
}

// pipeCanFlowCancel checks whether the pipe passed as an argument is present in the list of flow-cancel enabled
// pipes. This method returns false for all values if flow canceling is disabled.
func (gov *ChainGovernor) pipeCanFlowCancel(pipe *pipe) bool {
if !gov.flowCancelEnabled {
return false
}
for _, configuredPipe := range gov.flowCancelPipes {
if pipe.equals(&configuredPipe) {
return true
}
}
return false
}

// IsGovernedMsg determines if the message applies to the governor. It grabs the lock.
func (gov *ChainGovernor) IsGovernedMsg(msg *common.MessagePublication) (msgIsGoverned bool, err error) {
gov.mutex.Lock()
Expand Down Expand Up @@ -854,33 +900,16 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
ce.transfers = append(ce.transfers, transfer)

gov.msgsSeen[pe.hash] = transferComplete
if gov.flowCancelEnabled {
_, err := gov.tryAddFlowCancelTransfer(&transfer)
if err != nil {
gov.logger.Error("Error when attempting to add a flow cancel transfer",
zap.Error(err),
)

// Add inverse transfer to destination chain entry if this asset can cancel flows.
key := tokenKey{chain: pe.token.token.chain, addr: pe.token.token.addr}
tokenEntry := gov.tokens[key]
if tokenEntry != nil {
// Mandatory check to ensure that the token should be able to reduce the Governor limit.
if tokenEntry.flowCancels {
if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok {

if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil {
gov.logger.Warn("could not add flow canceling transfer to destination chain",
zap.String("msgID", dbTransfer.MsgID),
zap.String("hash", pe.hash),
zap.Error(err),
)
// Process the next pending transfer
continue
}
} else {
gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist",
zap.String("msgID", dbTransfer.MsgID),
zap.String("hash", pe.hash), zap.Error(err),
zap.Stringer("target chain", payload.TargetChain),
)
}
}
}

} else {
delete(gov.msgsSeen, pe.hash)
}
Expand Down Expand Up @@ -924,6 +953,65 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) {
return value, nil
}

// tryAddFlowCancelTransfer adds inverse transfer to destination chain entry if this asset can cancel flows.
func (gov *ChainGovernor) tryAddFlowCancelTransfer(transfer *transfer) (added bool, err error) {
added = false

originChain := transfer.dbTransfer.OriginChain
originAddr := transfer.dbTransfer.OriginAddress
hash := transfer.dbTransfer.Hash
target := transfer.dbTransfer.TargetChain
emitter := transfer.dbTransfer.EmitterChain

pipe := &pipe{emitter, target}
if !gov.pipeCanFlowCancel(pipe) {
return false, nil
}

key := tokenKey{originChain, originAddr}
tokenEntry := gov.tokens[key]
if tokenEntry == nil {
// Weird but not critical
gov.logger.Warn("Not adding flow cancel transfer because token is not governed",
zap.String("OriginChain", originChain.String()),
zap.String("tokenEntry", originAddr.String()),
zap.String("msgID", transfer.dbTransfer.MsgID),
zap.String("hash", transfer.dbTransfer.Hash),
)
return false, nil
}
if !tokenEntry.flowCancels {
// Nothing to do in this case
return false, nil
}

// Ensure destination exists
destinationChainEntry, ok := gov.chains[transfer.dbTransfer.TargetChain]
if !ok {
// Weird that TargetChain does not exist but not relevant for the flow cancel feature. We just
// fail closed here: do not add the flow cancelling transfer.
gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist",
zap.String("msgID", transfer.dbTransfer.MsgID),
zap.String("hash", hash),
zap.Stringer("target chain", transfer.dbTransfer.TargetChain),
)
return false, nil
}

// Add the 'inverse' transfer to the destination chain entry.
if err := destinationChainEntry.addFlowCancelTransfer(transfer.inverse()); err != nil {
gov.logger.Warn("could not add flow canceling transfer to destination chain",
zap.String("msgID", transfer.dbTransfer.MsgID),
zap.String("hash", transfer.dbTransfer.Hash),
zap.Error(err),
)
// Process the next pending transfer
return false, err
}

return true, nil
}

// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `chainEntry`. In effect, it represents a
// chain's "Governor Usage" for a given 24 hour period.
// This sum may be reduced by the sum of 'flow cancelling' transfers: that is, transfers of an allow-listed token
Expand Down
24 changes: 4 additions & 20 deletions node/pkg/governor/governor_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,26 +246,10 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error {
// Reload flow-cancel transfers for the TargetChain. This is important when the node restarts so that a corresponding,
// inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` and
// `CheckPending` loops but those functions do not capture flow-cancelling when the node is restarted.
tokenEntry := gov.tokens[tk]
if tokenEntry != nil {
// Mandatory check to ensure that the token should be able to reduce the Governor limit.
if tokenEntry.flowCancels {
if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok {
if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil {
gov.logger.Error("could not add flow canceling transfer to destination chain",
zap.String("msgID", xfer.MsgID),
zap.String("hash", xfer.Hash), zap.Error(err),
)
return err
}
} else {
gov.logger.Error("tried to cancel flow but chain entry for target chain does not exist",
zap.String("msgID", xfer.MsgID),
zap.Stringer("token chain", xfer.OriginChain),
zap.Stringer("token address", xfer.OriginAddress),
zap.Stringer("target chain", xfer.TargetChain),
)
}
if gov.flowCancelEnabled {
_, err := gov.tryAddFlowCancelTransfer(&transfer)
if err != nil {
return err
}
}
return nil
Expand Down
8 changes: 6 additions & 2 deletions node/pkg/governor/testnet_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)

func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry) {
func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfigEntry, []chainConfigEntry, []pipe) {
gov.logger.Info("setting up testnet config")

tokens := []tokenConfigEntry{
Expand All @@ -15,11 +15,15 @@ func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfig
}

flowCancelTokens := []tokenConfigEntry{}
flowCancelPipes := []pipe{}

if gov.flowCancelEnabled {
flowCancelTokens = []tokenConfigEntry{
{chain: 1, addr: "3b442cb3912157f13a933d0134282d032b5ffecd01a2dbf1b7790608df002ea7", symbol: "USDC", coinGeckoId: "usd-coin", decimals: 6, price: 1.001}, // Addr: 4zMMC9srt5Ri5X14GAgXhaHii3GnPAEERYPJgZJDncDU, Notional: 6780118.197035182
}
flowCancelPipes = []pipe{
{first: vaa.ChainIDEthereum, second: vaa.ChainIDSui},
}
}

chains := []chainConfigEntry{
Expand All @@ -28,5 +32,5 @@ func (gov *ChainGovernor) initTestnetConfig() ([]tokenConfigEntry, []tokenConfig
{emitterChainID: vaa.ChainIDFantom, dailyLimit: 1000000},
}

return tokens, flowCancelTokens, chains
return tokens, flowCancelTokens, chains, flowCancelPipes
}

0 comments on commit 40c4b1e

Please sign in to comment.