diff --git a/Cargo.lock b/Cargo.lock index f188675..3274134 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1407,6 +1407,12 @@ dependencies = [ "generic-array 0.14.7", ] +[[package]] +name = "circular-buffer" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67261db007b5f4cf8cba393c1a5c511a5cc072339ce16e12aeba1d7b9b77946" + [[package]] name = "clap" version = "2.34.0" @@ -5308,7 +5314,7 @@ dependencies = [ [[package]] name = "quic-geyser-client" version = "0.1.5" -source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#8efcc200c795b1236675b161c04e5e65e00ace48" +source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_new_streaming#b0fa0a2f50240ab6c1e7a59b928ea74da9a6785b" dependencies = [ "anyhow", "bincode", @@ -5325,10 +5331,11 @@ dependencies = [ [[package]] name = "quic-geyser-common" version = "0.1.5" -source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#8efcc200c795b1236675b161c04e5e65e00ace48" +source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_new_streaming#b0fa0a2f50240ab6c1e7a59b928ea74da9a6785b" dependencies = [ "anyhow", "bincode", + "circular-buffer", "itertools 0.10.5", "log 0.4.21", "lz4", diff --git a/Cargo.toml b/Cargo.toml index 55b3e82..acbaf25 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,8 +26,8 @@ stable-swap = { version = "1.8.1", features = ["no-entrypoint", "client"] } stable-swap-client = { version = "1.8.1" } stable-swap-math = { version = "1.8.1" } uint = { version = "0.9.1" } -quic-geyser-client = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "router_v1.17.29" } -quic-geyser-common = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "router_v1.17.29" } +quic-geyser-client = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "router_new_streaming" } +quic-geyser-common = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "router_new_streaming" } [profile.release] overflow-checks = true diff --git a/bin/autobahn-router/src/edge.rs b/bin/autobahn-router/src/edge.rs index 1a67157..eebb229 100644 --- a/bin/autobahn-router/src/edge.rs +++ b/bin/autobahn-router/src/edge.rs @@ -272,7 +272,7 @@ impl EdgeState { } pub fn cached_price_exact_out_for(&self, out_amount: u64) -> Option<(f64, f64)> { - if !self.is_valid_out() { + if !self.is_valid() { return None; } @@ -304,22 +304,6 @@ impl EdgeState { true } - pub fn is_valid_out(&self) -> bool { - if !self.is_valid { - return false; - } - - if self.cooldown_until.is_some() { - // Do not check time here ! - // We will reset "cooldown until" on first account update coming after cooldown - // So if this is not reset yet, it means that we didn't change anything - // No reason to be working again - return false; - } - - true - } - pub fn reset_cooldown(&mut self) { self.cooldown_event += 0; self.cooldown_until = None; diff --git a/bin/autobahn-router/src/edge_updater.rs b/bin/autobahn-router/src/edge_updater.rs index f27e9d9..4c7db89 100644 --- a/bin/autobahn-router/src/edge_updater.rs +++ b/bin/autobahn-router/src/edge_updater.rs @@ -16,7 +16,7 @@ use std::time::{Duration, Instant}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::task::JoinHandle; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, warn}; #[derive(Clone)] pub struct Dex { @@ -241,15 +241,18 @@ impl EdgeUpdater { None => state.slot_excessive_lagging_since = Some(Instant::now()), Some(since) => { if since.elapsed() > max_lag_duration { - panic!( - "Lagging a lot {} for more than {}s, exiting..", + error!( + "Lagging a lot {} for more than {}s, for dex {}..", lag, - max_lag_duration.as_secs() + max_lag_duration.as_secs(), + self.dex.name, ); } } } return; + } else if state.slot_excessive_lagging_since.is_some() { + state.slot_excessive_lagging_since = None; } } } @@ -333,7 +336,9 @@ impl EdgeUpdater { }; state.received_account.insert(pk); - state.latest_slot_pending = slot; + if state.latest_slot_pending < slot { + state.latest_slot_pending = slot; + } self.check_readiness(); diff --git a/bin/autobahn-router/template-config.toml b/bin/autobahn-router/template-config.toml index 40fdd0a..4e7a179 100644 --- a/bin/autobahn-router/template-config.toml +++ b/bin/autobahn-router/template-config.toml @@ -1,3 +1,5 @@ +snapshot_timeout_in_seconds = 900 + [infinity] enabled = true @@ -22,8 +24,8 @@ add_mango_tokens = false [raydium] enabled = true mints = [] -take_all_mints = false -add_mango_tokens = true +take_all_mints = true +add_mango_tokens = false [raydium_cp] enabled = true @@ -77,6 +79,7 @@ dedup_queue_size = 50000 rpc_http_url = "$RPC_HTTP_URL" rpc_support_compression = true re_snapshot_interval_secs = 1200 +request_timeout_in_seconds = 300 [[sources.grpc_sources]] name = "router-other" @@ -90,6 +93,7 @@ dedup_queue_size = 50000 rpc_http_url = "$AMS_RPC_HTTP_URL" rpc_support_compression = true re_snapshot_interval_secs = 1200 +request_timeout_in_seconds = 300 [[sources.grpc_sources]] name = "router-ams" @@ -97,6 +101,12 @@ connection_string = "$AMS_RPC_HTTP_URL_WITHOUT_TOKEN" token = "$AMS_RPC_TOKEN" retry_connection_sleep_secs = 30 +[[sources.quic_sources]] +name = "quic-client" +connection_string = "$AMS_RPC_QUIC_URL" +retry_connection_sleep_secs = 1 +enable_gso = false + [price_feed] birdeye_token = "$BIRDEYE_TOKEN" refresh_interval_secs = 1200 # every 20 min diff --git a/fly.toml b/fly.toml index 54eafe4..3ef1e97 100644 --- a/fly.toml +++ b/fly.toml @@ -10,8 +10,8 @@ kill_timeout = "30s" cmd = ["autobahn-router", "/usr/local/bin/template-config.toml"] [[vm]] - size = "shared-cpu-4x" - memory = "8gb" + size = "performance-4x" + memory = "16gb" [[restart]] policy = "always" diff --git a/lib/router-config-lib/src/lib.rs b/lib/router-config-lib/src/lib.rs index 5fde856..b8da04a 100644 --- a/lib/router-config-lib/src/lib.rs +++ b/lib/router-config-lib/src/lib.rs @@ -14,6 +14,7 @@ pub struct GrpcSourceConfig { #[derive(Clone, Debug, Default, serde_derive::Deserialize)] pub struct QuicSourceConfig { pub name: String, + #[serde(deserialize_with = "serde_string_or_env")] pub connection_string: String, pub retry_connection_sleep_secs: u64, pub enable_gso: Option,