Skip to content

Commit

Permalink
Filtering updates which are not needed
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Nov 5, 2024
1 parent 409de09 commit 2c2bb6c
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 12 deletions.
33 changes: 29 additions & 4 deletions bin/autobahn-router/src/edge_updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ pub fn spawn_updater_job(
register_mint_sender: async_channel::Sender<Pubkey>,
ready_sender: async_channel::Sender<()>,
mut slot_updates: broadcast::Receiver<u64>,
mut account_updates: broadcast::Receiver<(Pubkey, u64)>,
mut account_updates: broadcast::Receiver<(Pubkey, Pubkey, u64)>,
mut metadata_updates: broadcast::Receiver<FeedMetadata>,
mut price_updates: broadcast::Receiver<PriceUpdate>,
mut exit: broadcast::Receiver<()>,
Expand Down Expand Up @@ -311,9 +311,8 @@ impl EdgeUpdater {
}
}

fn invalidate_one(&mut self, res: Result<(Pubkey, u64), RecvError>) -> bool {
let state = &mut self.state;
let (pk, slot) = match res {
fn invalidate_one(&mut self, res: Result<(Pubkey, Pubkey, u64), RecvError>) -> bool {
let (pk, owner, slot) = match res {
Ok(v) => v,
Err(broadcast::error::RecvError::Closed) => {
error!("account update channel closed unexpectedly");
Expand All @@ -328,6 +327,11 @@ impl EdgeUpdater {
}
};

// check if we need the update
if !self.do_update(&pk, &owner) {
return true;
}
let state = &mut self.state;
if let Some(impacted_edges) = self.dex.edges_per_pk.get(&pk) {
for edge in impacted_edges {
state.dirty_edges.insert(edge.unique_id(), edge.clone());
Expand Down Expand Up @@ -372,6 +376,27 @@ impl EdgeUpdater {
}
}

// ignore update if current dex does not need it
fn do_update(&mut self, pk: &Pubkey, owner: &Pubkey) -> bool {
if self.dex.edges_per_pk.contains_key(pk) {
return true;
}
match &self.dex.subscription_mode {
DexSubscriptionMode::Accounts(accounts) => {
return accounts.contains(pk)
},
DexSubscriptionMode::Disabled => {
false
},
DexSubscriptionMode::Programs(programs) => {
programs.contains(pk) || programs.contains(owner)
},
DexSubscriptionMode::Mixed(m) => {
m.accounts.contains(pk) || m.token_accounts_for_owner.contains(pk) || m.programs.contains(pk) || m.programs.contains(owner)
}
}
}

fn refresh_some(&mut self) {
let state = &mut self.state;
if state.dirty_edges.is_empty() || !state.is_ready {
Expand Down
8 changes: 4 additions & 4 deletions bin/autobahn-router/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ fn start_chaindata_updating(
chain_data: ChainDataArcRw,
account_writes: async_channel::Receiver<AccountOrSnapshotUpdate>,
slot_updates: async_channel::Receiver<SlotUpdate>,
account_update_sender: broadcast::Sender<(Pubkey, u64)>,
account_update_sender: broadcast::Sender<(Pubkey, Pubkey, u64)>,
mut exit: broadcast::Receiver<()>,
) -> JoinHandle<()> {
use mango_feeds_connector::chain_data::SlotData;
Expand Down Expand Up @@ -643,7 +643,7 @@ fn handle_updated_account(
most_recent_seen_slot: &mut u64,
chain_data: &mut RwLockWriteGuard<ChainData>,
update: AccountOrSnapshotUpdate,
account_update_sender: &broadcast::Sender<(Pubkey, u64)>,
account_update_sender: &broadcast::Sender<(Pubkey, Pubkey, u64)>,
) {
use mango_feeds_connector::chain_data::AccountData;
use solana_sdk::account::WritableAccount;
Expand All @@ -652,7 +652,7 @@ fn handle_updated_account(
fn one_update(
most_recent_seen_slot: &mut u64,
chain_data: &mut RwLockWriteGuard<ChainData>,
account_update_sender: &broadcast::Sender<(Pubkey, u64)>,
account_update_sender: &broadcast::Sender<(Pubkey, Pubkey, u64)>,
account_write: AccountWrite,
) {
chain_data.update_account(
Expand Down Expand Up @@ -680,7 +680,7 @@ fn handle_updated_account(
}

// ignore failing sends when there are no receivers
let _err = account_update_sender.send((account_write.pubkey, account_write.slot));
let _err = account_update_sender.send((account_write.pubkey, account_write.owner, account_write.slot));
}

match update {
Expand Down
8 changes: 8 additions & 0 deletions bin/autobahn-router/src/routing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,14 @@ impl Routing {
warn!(valid_edge_count, skipped_bad_price_impact, "pruning");
}

// for mint_vec in out_edges_per_mint_index.iter() {
// for mint in mint_vec {
// let input_mint = mint_to_index.iter().filter(|(_, x)| **x==mint.source_node).map(|(pk,_)| *pk).collect_vec();
// let output_mint = mint_to_index.iter().filter(|(_, x)| **x==mint.target_node).map(|(pk,_)| *pk).collect_vec();
// info!("input_mint {:?} {:?}", input_mint, output_mint );
// }
// }

(valid_edge_count, out_edges_per_mint_index)
}

Expand Down
8 changes: 4 additions & 4 deletions fly.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
app = "router-1"
primary_region = "ams"
app = "router-2"
primary_region = "dfw"
kill_signal = "SIGTERM"
kill_timeout = "30s"

Expand All @@ -10,8 +10,8 @@ kill_timeout = "30s"
cmd = ["autobahn-router", "/usr/local/bin/template-config.toml"]

[[vm]]
size = "performance-4x"
memory = "16gb"
size = "performance-16x"
memory = "32gb"

[[restart]]
policy = "always"
Expand Down

0 comments on commit 2c2bb6c

Please sign in to comment.