diff --git a/Cargo.lock b/Cargo.lock index 8543c62..8b2feb1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1409,6 +1409,12 @@ dependencies = [ "generic-array 0.14.7", ] +[[package]] +name = "circular-buffer" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67261db007b5f4cf8cba393c1a5c511a5cc072339ce16e12aeba1d7b9b77946" + [[package]] name = "clap" version = "2.34.0" @@ -5330,7 +5336,7 @@ dependencies = [ [[package]] name = "quic-geyser-client" version = "0.1.5" -source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#8efcc200c795b1236675b161c04e5e65e00ace48" +source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#594077ccebfde826920192c17b0e501bb185650e" dependencies = [ "anyhow", "bincode", @@ -5347,10 +5353,11 @@ dependencies = [ [[package]] name = "quic-geyser-common" version = "0.1.5" -source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#8efcc200c795b1236675b161c04e5e65e00ace48" +source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#594077ccebfde826920192c17b0e501bb185650e" dependencies = [ "anyhow", "bincode", + "circular-buffer", "itertools 0.10.5", "log 0.4.21", "lz4", diff --git a/bin/autobahn-router/src/edge.rs b/bin/autobahn-router/src/edge.rs index 1a67157..eebb229 100644 --- a/bin/autobahn-router/src/edge.rs +++ b/bin/autobahn-router/src/edge.rs @@ -272,7 +272,7 @@ impl EdgeState { } pub fn cached_price_exact_out_for(&self, out_amount: u64) -> Option<(f64, f64)> { - if !self.is_valid_out() { + if !self.is_valid() { return None; } @@ -304,22 +304,6 @@ impl EdgeState { true } - pub fn is_valid_out(&self) -> bool { - if !self.is_valid { - return false; - } - - if self.cooldown_until.is_some() { - // Do not check time here ! - // We will reset "cooldown until" on first account update coming after cooldown - // So if this is not reset yet, it means that we didn't change anything - // No reason to be working again - return false; - } - - true - } - pub fn reset_cooldown(&mut self) { self.cooldown_event += 0; self.cooldown_until = None; diff --git a/bin/autobahn-router/src/edge_updater.rs b/bin/autobahn-router/src/edge_updater.rs index f27e9d9..5d299a7 100644 --- a/bin/autobahn-router/src/edge_updater.rs +++ b/bin/autobahn-router/src/edge_updater.rs @@ -16,7 +16,7 @@ use std::time::{Duration, Instant}; use tokio::sync::broadcast; use tokio::sync::broadcast::error::RecvError; use tokio::task::JoinHandle; -use tracing::{debug, error, info, trace, warn}; +use tracing::{debug, error, info, warn}; #[derive(Clone)] pub struct Dex { @@ -242,14 +242,17 @@ impl EdgeUpdater { Some(since) => { if since.elapsed() > max_lag_duration { panic!( - "Lagging a lot {} for more than {}s, exiting..", + "Lagging a lot {} for more than {}s, for dex {}..", lag, - max_lag_duration.as_secs() + max_lag_duration.as_secs(), + self.dex.name, ); } } } return; + } else if state.slot_excessive_lagging_since.is_some() { + state.slot_excessive_lagging_since = None; } } } @@ -333,7 +336,9 @@ impl EdgeUpdater { }; state.received_account.insert(pk); - state.latest_slot_pending = slot; + if state.latest_slot_pending < slot { + state.latest_slot_pending = slot; + } self.check_readiness(); @@ -398,7 +403,7 @@ impl EdgeUpdater { state.latest_slot_processed = state.latest_slot_pending; if started_at.elapsed() > Duration::from_millis(100) { - info!( + debug!( "{} - refresh {} - took - {:?}", self.dex.name, refreshed_edges.len(), diff --git a/bin/autobahn-router/src/main.rs b/bin/autobahn-router/src/main.rs index 9fe7037..830dbe9 100644 --- a/bin/autobahn-router/src/main.rs +++ b/bin/autobahn-router/src/main.rs @@ -99,6 +99,14 @@ async fn main() -> anyhow::Result<()> { let config = Config::load(&args[1])?; let router_version = RouterVersion::OverestimateAmount; + if config.metrics.output_http { + let prom_bind_addr = config + .metrics + .prometheus_address + .clone() + .expect("prometheus_address must be set"); + PrometheusSync::sync(prom_bind_addr); + } let hot_mints = Arc::new(RwLock::new(HotMintsCache::new(&config.hot_mints))); let mango_data = match mango::mango_fetcher::fetch_mango_data().await { @@ -200,14 +208,6 @@ async fn main() -> anyhow::Result<()> { exit(-1); }; - if config.metrics.output_http { - let prom_bind_addr = config - .metrics - .prometheus_address - .clone() - .expect("prometheus_address must be set"); - let _prometheus = PrometheusSync::sync(prom_bind_addr); - } if config.metrics.output_stdout { warn!("metrics output to stdout is not supported yet"); } diff --git a/bin/autobahn-router/src/prometheus_sync.rs b/bin/autobahn-router/src/prometheus_sync.rs index adc8fe3..896e1c1 100644 --- a/bin/autobahn-router/src/prometheus_sync.rs +++ b/bin/autobahn-router/src/prometheus_sync.rs @@ -1,17 +1,15 @@ -use std::time::Duration; - +use axum::{routing, Router}; use prometheus::{Encoder, TextEncoder}; +use tokio::net::{TcpListener, ToSocketAddrs}; use tokio::task::JoinHandle; -use tokio::{ - io::AsyncWriteExt, - net::{TcpListener, TcpStream, ToSocketAddrs}, -}; -use tracing::error; +use tracing::{error, info}; + +use crate::server::errors::AppError; pub struct PrometheusSync; impl PrometheusSync { - fn create_response(payload: &str) -> String { + fn create_response(payload: String) -> String { format!( "HTTP/1.1 200 OK\r\nContent-Length: {}\r\n\r\n{}", payload.len(), @@ -19,7 +17,7 @@ impl PrometheusSync { ) } - async fn handle_stream(stream: &mut TcpStream) -> anyhow::Result<()> { + async fn get_prometheus_stream() -> Result { let mut metrics_buffer = Vec::new(); let encoder = TextEncoder::new(); @@ -29,29 +27,22 @@ impl PrometheusSync { .unwrap(); let metrics_buffer = String::from_utf8(metrics_buffer).unwrap(); - let response = Self::create_response(&metrics_buffer); - - stream.writable().await?; - stream.write_all(response.as_bytes()).await?; - - stream.flush().await?; - - Ok(()) + Ok(Self::create_response(metrics_buffer)) } pub fn sync(addr: impl ToSocketAddrs + Send + 'static) -> JoinHandle> { tokio::spawn(async move { let listener = TcpListener::bind(addr).await?; - loop { - let Ok((mut stream, _addr)) = listener.accept().await else { - error!("Error accepting prometheus stream"); - tokio::time::sleep(Duration::from_millis(1)).await; - continue; - }; + let mut router: Router<()> = Router::new(); + router = router.route("/metrics", routing::get(Self::get_prometheus_stream)); + + let handle = axum::serve(listener, router); + + info!("Prometheus Server started"); - let _ = Self::handle_stream(&mut stream).await; - } + handle.await.expect("Prometheus Server failed"); + Ok(()) }) } } diff --git a/bin/autobahn-router/src/server/mod.rs b/bin/autobahn-router/src/server/mod.rs index 6e41250..461bc1d 100644 --- a/bin/autobahn-router/src/server/mod.rs +++ b/bin/autobahn-router/src/server/mod.rs @@ -1,5 +1,5 @@ pub mod alt_provider; -mod errors; +pub mod errors; pub mod hash_provider; pub mod http_server; pub mod live_account_provider; diff --git a/bin/autobahn-router/template-config.toml b/bin/autobahn-router/template-config.toml index 40fdd0a..1a521c4 100644 --- a/bin/autobahn-router/template-config.toml +++ b/bin/autobahn-router/template-config.toml @@ -1,3 +1,5 @@ +snapshot_timeout_in_seconds = 900 + [infinity] enabled = true @@ -22,8 +24,8 @@ add_mango_tokens = false [raydium] enabled = true mints = [] -take_all_mints = false -add_mango_tokens = true +take_all_mints = true +add_mango_tokens = false [raydium_cp] enabled = true @@ -77,6 +79,7 @@ dedup_queue_size = 50000 rpc_http_url = "$RPC_HTTP_URL" rpc_support_compression = true re_snapshot_interval_secs = 1200 +request_timeout_in_seconds = 300 [[sources.grpc_sources]] name = "router-other" @@ -84,12 +87,40 @@ connection_string = "$RPC_HTTP_URL_WITHOUT_TOKEN" token = "$RPC_TOKEN" retry_connection_sleep_secs = 30 +[[sources.quic_sources]] +name = "quic-client" +connection_string = "$RPC_QUIC_URL" +retry_connection_sleep_secs = 1 +enable_gso = false + +[[sources]] +region = "dfw" +dedup_queue_size = 50000 +rpc_http_url = "$DFW_RPC_HTTP_URL" +rpc_support_compression = true +re_snapshot_interval_secs = 1200 +request_timeout_in_seconds = 300 + +[[sources.grpc_sources]] +name = "router-dfw" +connection_string = "$DFW_RPC_HTTP_URL_WITHOUT_TOKEN" +token = "$AMS_RPC_TOKEN" +retry_connection_sleep_secs = 30 + +[[sources.quic_sources]] +name = "quic-client-dfw" +connection_string = "$DFW_RPC_QUIC_URL" +retry_connection_sleep_secs = 1 +enable_gso = false + + [[sources]] region = "ams" dedup_queue_size = 50000 rpc_http_url = "$AMS_RPC_HTTP_URL" rpc_support_compression = true re_snapshot_interval_secs = 1200 +request_timeout_in_seconds = 300 [[sources.grpc_sources]] name = "router-ams" @@ -97,6 +128,12 @@ connection_string = "$AMS_RPC_HTTP_URL_WITHOUT_TOKEN" token = "$AMS_RPC_TOKEN" retry_connection_sleep_secs = 30 +[[sources.quic_sources]] +name = "quic-client-ams " +connection_string = "$AMS_RPC_QUIC_URL" +retry_connection_sleep_secs = 1 +enable_gso = false + [price_feed] birdeye_token = "$BIRDEYE_TOKEN" refresh_interval_secs = 1200 # every 20 min diff --git a/fly.toml b/fly.toml index 54eafe4..3ef1e97 100644 --- a/fly.toml +++ b/fly.toml @@ -10,8 +10,8 @@ kill_timeout = "30s" cmd = ["autobahn-router", "/usr/local/bin/template-config.toml"] [[vm]] - size = "shared-cpu-4x" - memory = "8gb" + size = "performance-4x" + memory = "16gb" [[restart]] policy = "always" diff --git a/lib/router-config-lib/src/lib.rs b/lib/router-config-lib/src/lib.rs index 5fde856..b8da04a 100644 --- a/lib/router-config-lib/src/lib.rs +++ b/lib/router-config-lib/src/lib.rs @@ -14,6 +14,7 @@ pub struct GrpcSourceConfig { #[derive(Clone, Debug, Default, serde_derive::Deserialize)] pub struct QuicSourceConfig { pub name: String, + #[serde(deserialize_with = "serde_string_or_env")] pub connection_string: String, pub retry_connection_sleep_secs: u64, pub enable_gso: Option,