From 520d6a74b33437fdd73da9962ca6b0d0280ee45e Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Tue, 1 Oct 2024 10:15:23 +0200 Subject: [PATCH] Adding quic geyser source and making it work with all raydium pools (#5) Adding quic geyser source and making it work with all raydium pools --- Cargo.lock | 68 +- Cargo.toml | 3 + bin/autobahn-router/Cargo.toml | 4 + .../examples/grpc_source_tester.rs | 6 +- bin/autobahn-router/src/edge_updater.rs | 3 +- bin/autobahn-router/src/main.rs | 41 +- bin/autobahn-router/src/metrics.rs | 17 + bin/autobahn-router/src/source/geyser.rs | 41 +- .../src/source/grpc_plugin_source.rs | 12 +- .../src/source/mint_accounts_source.rs | 1 - bin/autobahn-router/src/source/mod.rs | 1 + .../src/source/quic_plugin_source.rs | 605 ++++++++++++++++++ lib/dex-raydium-cp/src/edge.rs | 8 +- lib/router-config-lib/src/lib.rs | 16 +- lib/router-feed-lib/src/grpc_tx_watcher.rs | 6 +- .../tests/cases/test_swap_from_dump.rs | 9 +- 16 files changed, 773 insertions(+), 68 deletions(-) create mode 100644 bin/autobahn-router/src/source/quic_plugin_source.rs diff --git a/Cargo.lock b/Cargo.lock index b9a5fae..f188675 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -740,6 +740,8 @@ dependencies = [ "ordered-float", "priority-queue", "prometheus", + "quic-geyser-client", + "quic-geyser-common", "rand 0.7.3", "regex", "router-config-lib", @@ -3264,7 +3266,7 @@ dependencies = [ "futures-util", "http 0.2.12", "hyper 0.14.28", - "rustls 0.21.10", + "rustls 0.21.12", "tokio", "tokio-rustls 0.24.1", ] @@ -5303,6 +5305,40 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "quic-geyser-client" +version = "0.1.5" +source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#8efcc200c795b1236675b161c04e5e65e00ace48" +dependencies = [ + "anyhow", + "bincode", + "log 0.4.21", + "pkcs8", + "quic-geyser-common", + "quinn", + "rcgen", + "rustls 0.21.12", + "solana-sdk", + "tokio", +] + +[[package]] +name = "quic-geyser-common" +version = "0.1.5" +source = "git+https://github.com/blockworks-foundation/quic_geyser_plugin.git?branch=router_v1.17.29#8efcc200c795b1236675b161c04e5e65e00ace48" +dependencies = [ + "anyhow", + "bincode", + "itertools 0.10.5", + "log 0.4.21", + "lz4", + "serde", + "solana-program", + "solana-sdk", + "solana-transaction-status", + "thiserror", +] + [[package]] name = "quick-protobuf" version = "0.8.0" @@ -5323,7 +5359,7 @@ dependencies = [ "quinn-proto", "quinn-udp", "rustc-hash", - "rustls 0.21.10", + "rustls 0.21.12", "thiserror", "tokio", "tracing", @@ -5339,7 +5375,7 @@ dependencies = [ "rand 0.8.5", "ring 0.16.20", "rustc-hash", - "rustls 0.21.10", + "rustls 0.21.12", "rustls-native-certs", "slab", "thiserror", @@ -5762,7 +5798,7 @@ dependencies = [ "once_cell", "percent-encoding 2.3.1", "pin-project-lite", - "rustls 0.21.10", + "rustls 0.21.12", "rustls-pemfile", "serde", "serde_json", @@ -6049,9 +6085,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.10" +version = "0.21.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log 0.4.21", "ring 0.17.8", @@ -6117,8 +6153,8 @@ dependencies = [ "sanctum-token-ratio", "solana-program", "solana-readonly-account", - "spl-associated-token-account 2.3.0", - "spl-token 4.0.0", + "spl-associated-token-account 1.1.3", + "spl-token 3.5.0", "spl-token-metadata-interface", "static_assertions", ] @@ -6231,7 +6267,7 @@ source = "git+https://github.com/igneous-labs/sanctum-solana-utils.git?rev=2d171 dependencies = [ "solana-program", "solana-readonly-account", - "spl-associated-token-account 2.3.0", + "spl-associated-token-account 1.1.3", ] [[package]] @@ -6284,7 +6320,7 @@ source = "git+https://github.com/igneous-labs/sanctum-solana-utils.git?rev=2d171 dependencies = [ "solana-program", "solana-readonly-account", - "spl-token-2022 1.0.0", + "spl-token-2022 0.6.1", ] [[package]] @@ -7408,7 +7444,7 @@ dependencies = [ "quinn", "quinn-proto", "rcgen", - "rustls 0.21.10", + "rustls 0.21.12", "solana-connection-cache", "solana-measure", "solana-metrics", @@ -7725,7 +7761,7 @@ dependencies = [ "quinn-proto", "rand 0.8.5", "rcgen", - "rustls 0.21.10", + "rustls 0.21.12", "solana-metrics", "solana-perf", "solana-sdk", @@ -9084,7 +9120,7 @@ version = "0.24.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" dependencies = [ - "rustls 0.21.10", + "rustls 0.21.12", "tokio", ] @@ -9158,7 +9194,7 @@ checksum = "212d5dcb2a1ce06d81107c3d0ffa3121fe974b73f068c8282cb1c32328113b6c" dependencies = [ "futures-util", "log 0.4.21", - "rustls 0.21.10", + "rustls 0.21.12", "tokio", "tokio-rustls 0.24.1", "tungstenite 0.20.1", @@ -9264,7 +9300,7 @@ dependencies = [ "percent-encoding 2.3.1", "pin-project", "prost 0.12.4", - "rustls 0.21.10", + "rustls 0.21.12", "rustls-native-certs", "rustls-pemfile", "tokio", @@ -9480,7 +9516,7 @@ dependencies = [ "httparse", "log 0.4.21", "rand 0.8.5", - "rustls 0.21.10", + "rustls 0.21.12", "sha1", "thiserror", "url 2.5.0", diff --git a/Cargo.toml b/Cargo.toml index f7a6e72..55b3e82 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ solana-rpc-client-api = { version = "1.17" } mango-feeds-connector = { git = "https://github.com/blockworks-foundation/mango-feeds.git", tag = "connector-v0.4.8" } yellowstone-grpc-client = { version = "1.15.0", git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", tag = "v1.15.0+solana.1.17" } yellowstone-grpc-proto = { version = "1.14.0", git = "https://github.com/blockworks-foundation/yellowstone-grpc.git", tag = "v1.15.0+solana.1.17" } + reqwest = { version = "0.11.27", features = ["json"] } whirlpools-client = { git = "https://github.com/blockworks-foundation/whirlpools-client/", features = ["no-entrypoint"] } openbook-v2 = { git = "https://github.com/openbook-dex/openbook-v2", tag = "v0.2.7", features = ["no-entrypoint", "client"] } @@ -25,6 +26,8 @@ stable-swap = { version = "1.8.1", features = ["no-entrypoint", "client"] } stable-swap-client = { version = "1.8.1" } stable-swap-math = { version = "1.8.1" } uint = { version = "0.9.1" } +quic-geyser-client = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "router_v1.17.29" } +quic-geyser-common = { git = "https://github.com/blockworks-foundation/quic_geyser_plugin.git", branch = "router_v1.17.29" } [profile.release] overflow-checks = true diff --git a/bin/autobahn-router/Cargo.toml b/bin/autobahn-router/Cargo.toml index a2631e5..4aa087c 100644 --- a/bin/autobahn-router/Cargo.toml +++ b/bin/autobahn-router/Cargo.toml @@ -87,6 +87,10 @@ yellowstone-grpc-client = { workspace = true } yellowstone-grpc-proto = { workspace = true } tonic = { version = "0.10.2", features = ["gzip"] } +# quic +quic-geyser-client = { workspace = true } +quic-geyser-common = { workspace = true } + # compressed snapshots lz4 = "1.24.0" diff --git a/bin/autobahn-router/examples/grpc_source_tester.rs b/bin/autobahn-router/examples/grpc_source_tester.rs index ebc5687..fe9ae2b 100644 --- a/bin/autobahn-router/examples/grpc_source_tester.rs +++ b/bin/autobahn-router/examples/grpc_source_tester.rs @@ -22,13 +22,13 @@ pub async fn main() { let rpc_http_addr = env::var("RPC_HTTP_ADDR").expect("need rpc http url"); let snapshot_config = AccountDataSourceConfig { region: None, - use_quic: None, - quic_address: None, + quic_sources: None, rpc_http_url: rpc_http_addr.clone(), rpc_support_compression: Some(false), /* no compression */ re_snapshot_interval_secs: None, - grpc_sources: vec![], + grpc_sources: Some(vec![]), dedup_queue_size: 0, + request_timeout_in_seconds: None, }; // Raydium diff --git a/bin/autobahn-router/src/edge_updater.rs b/bin/autobahn-router/src/edge_updater.rs index 870bcdc..b3571ec 100644 --- a/bin/autobahn-router/src/edge_updater.rs +++ b/bin/autobahn-router/src/edge_updater.rs @@ -138,7 +138,8 @@ pub fn spawn_updater_job( ), }; - let snapshot_timeout = Instant::now() + Duration::from_secs(60 * 5); + let snapshot_timeout_in_seconds = config.snapshot_timeout_in_seconds.unwrap_or(60 * 5); + let snapshot_timeout = Instant::now() + Duration::from_secs(snapshot_timeout_in_seconds); let listener_job = tokio_spawn(format!("edge_updater_{}", dex.name).as_str(), async move { let mut updater = EdgeUpdater { dex, diff --git a/bin/autobahn-router/src/main.rs b/bin/autobahn-router/src/main.rs index cc4840b..1d8b40e 100644 --- a/bin/autobahn-router/src/main.rs +++ b/bin/autobahn-router/src/main.rs @@ -175,19 +175,28 @@ async fn main() -> anyhow::Result<()> { } }); - if source_config.grpc_sources.len() > 1 { - error!("only one grpc source is supported ATM"); - exit(-1); + if let Some(quic_sources) = &source_config.quic_sources { + info!( + "quic sources: {}", + quic_sources + .iter() + .map(|c| c.connection_string.clone()) + .collect::() + ); } - - info!( - "grpc sources: {}", - source_config - .grpc_sources - .iter() - .map(|c| c.connection_string.clone()) - .collect::() - ); + if let Some(grpc_sources) = source_config.grpc_sources.clone() { + info!( + "grpc sources: {}", + grpc_sources + .iter() + .map(|c| c.connection_string.clone()) + .collect::() + ); + } else { + // current grpc source is needed for transaction watcher even if there is quic + error!("No grpc geyser sources specified"); + exit(-1); + }; if config.metrics.output_http { let prom_bind_addr = config @@ -443,8 +452,8 @@ async fn main() -> anyhow::Result<()> { let ef = exit_sender.subscribe(); let sc = source_config.clone(); let account_update_job = tokio_spawn("geyser", async move { - if sc.use_quic.unwrap_or(false) { - error!("not supported yet"); + if sc.grpc_sources.is_none() && sc.quic_sources.is_none() { + error!("No quic or grpc plugin setup"); } else { geyser::spawn_geyser_source( &sc, @@ -528,7 +537,7 @@ fn build_price_feed( fn build_rpc(source_config: &AccountDataSourceConfig) -> RpcClient { RpcClient::new_with_timeouts_and_commitment( string_or_env(source_config.rpc_http_url.clone()), - Duration::from_secs(60), // request timeout + Duration::from_secs(source_config.request_timeout_in_seconds.unwrap_or(60)), // request timeout CommitmentConfig::confirmed(), Duration::from_secs(60), // confirmation timeout ) @@ -537,7 +546,7 @@ fn build_rpc(source_config: &AccountDataSourceConfig) -> RpcClient { fn build_blocking_rpc(source_config: &AccountDataSourceConfig) -> BlockingRpcClient { BlockingRpcClient::new_with_timeouts_and_commitment( string_or_env(source_config.rpc_http_url.clone()), - Duration::from_secs(60), // request timeout + Duration::from_secs(source_config.request_timeout_in_seconds.unwrap_or(60)), // request timeout CommitmentConfig::confirmed(), Duration::from_secs(60), // confirmation timeout ) diff --git a/bin/autobahn-router/src/metrics.rs b/bin/autobahn-router/src/metrics.rs index 860d22d..294a26c 100644 --- a/bin/autobahn-router/src/metrics.rs +++ b/bin/autobahn-router/src/metrics.rs @@ -27,6 +27,23 @@ lazy_static::lazy_static! { pub static ref GRPC_TO_EDGE_SLOT_LAG: IntGaugeVec = register_int_gauge_vec!(opts!("router_grpc_to_edge_slot_lag", "RPC Slot vs last slot used to update edges"), &["dex_name"]).unwrap(); + pub static ref QUIC_ACCOUNT_WRITES: IntCounter = + register_int_counter!("quic_account_writes", "Number of account updates via Geyser gRPC").unwrap(); + pub static ref QUIC_ACCOUNT_WRITE_QUEUE: IntGauge = + register_int_gauge!("quic_account_write_queue", "Items in account write queue via Geyser gPRC").unwrap(); + pub static ref QUIC_DEDUP_QUEUE: GenericGauge = + register_int_gauge!("quic_dedup_queue", "Items in dedup queue via Geyser gPRC").unwrap(); + pub static ref QUIC_SLOT_UPDATE_QUEUE: GenericGauge = + register_int_gauge!("quic_slot_update_queue", "Items in slot update queue via Geyser gPRC").unwrap(); + pub static ref QUIC_SLOT_UPDATES: IntCounter = + register_int_counter!("quic_slot_updates", "Number of slot updates via Geyser gPRC").unwrap(); + pub static ref QUIC_SNAPSHOT_ACCOUNT_WRITES: IntCounter = + register_int_counter!("quic_snapshot_account_writes", "Number of account writes from snapshot").unwrap(); + pub static ref QUIC_SOURCE_CONNECTION_RETRIES: IntCounterVec = + register_int_counter_vec!(opts!("quic_source_connection_retries", "gRPC source connection retries"), &["source_name"]).unwrap(); + pub static ref QUIC_NO_MESSAGE_FOR_DURATION_MS: IntGauge = + register_int_gauge!("quic_no_update_for_duration_ms", "Did not get any message from Geyser gPRC for this duration").unwrap(); + pub static ref HTTP_REQUEST_TIMING: HistogramVec = register_histogram_vec!( histogram_opts!("router_http_request_timing", "Endpoint timing in seconds", diff --git a/bin/autobahn-router/src/source/geyser.rs b/bin/autobahn-router/src/source/geyser.rs index 30adfe9..b5fdfa8 100644 --- a/bin/autobahn-router/src/source/geyser.rs +++ b/bin/autobahn-router/src/source/geyser.rs @@ -9,6 +9,8 @@ use router_feed_lib::get_program_account::FeedMetadata; use crate::source::grpc_plugin_source; +use super::quic_plugin_source; + pub async fn spawn_geyser_source( config: &AccountDataSourceConfig, exit_receiver: tokio::sync::broadcast::Receiver<()>, @@ -20,16 +22,31 @@ pub async fn spawn_geyser_source( subscribed_token_accounts: &HashSet, filters: &HashSet, ) { - grpc_plugin_source::process_events( - config.clone(), - subscribed_accounts.clone(), - subscribed_programs.clone(), - subscribed_token_accounts.clone(), - filters.clone(), - account_write_sender, - Some(metadata_write_sender), - slot_sender, - exit_receiver, - ) - .await; + if config.quic_sources.is_some() { + quic_plugin_source::process_events( + config.clone(), + subscribed_accounts.clone(), + subscribed_programs.clone(), + subscribed_token_accounts.clone(), + filters.clone(), + account_write_sender, + Some(metadata_write_sender), + slot_sender, + exit_receiver, + ) + .await; + } else if config.grpc_sources.is_some() { + grpc_plugin_source::process_events( + config.clone(), + subscribed_accounts.clone(), + subscribed_programs.clone(), + subscribed_token_accounts.clone(), + filters.clone(), + account_write_sender, + Some(metadata_write_sender), + slot_sender, + exit_receiver, + ) + .await; + } } diff --git a/bin/autobahn-router/src/source/grpc_plugin_source.rs b/bin/autobahn-router/src/source/grpc_plugin_source.rs index 73e9a31..6684b96 100644 --- a/bin/autobahn-router/src/source/grpc_plugin_source.rs +++ b/bin/autobahn-router/src/source/grpc_plugin_source.rs @@ -515,13 +515,13 @@ pub async fn process_events( async_channel::bounded::(config.dedup_queue_size); let mut source_jobs = vec![]; + let Some(grpc_sources) = config.grpc_sources.clone() else { + return; + }; + // note: caller in main.rs ensures this - assert_eq!( - config.grpc_sources.len(), - 1, - "only one grpc source supported" - ); - for grpc_source in config.grpc_sources.clone() { + assert_eq!(grpc_sources.len(), 1, "only one grpc source supported"); + for grpc_source in grpc_sources.clone() { let msg_sender = msg_sender.clone(); let sub_accounts = subscription_accounts.clone(); let sub_programs = subscription_programs.clone(); diff --git a/bin/autobahn-router/src/source/mint_accounts_source.rs b/bin/autobahn-router/src/source/mint_accounts_source.rs index 5cb4d9f..ec3a942 100644 --- a/bin/autobahn-router/src/source/mint_accounts_source.rs +++ b/bin/autobahn-router/src/source/mint_accounts_source.rs @@ -88,7 +88,6 @@ pub async fn request_mint_metadata( count.fetch_add(1, Ordering::Relaxed); } } - mint_accounts }); threads.push(jh_thread); diff --git a/bin/autobahn-router/src/source/mod.rs b/bin/autobahn-router/src/source/mod.rs index 7149dd7..f6a2374 100644 --- a/bin/autobahn-router/src/source/mod.rs +++ b/bin/autobahn-router/src/source/mod.rs @@ -1,3 +1,4 @@ pub mod geyser; pub mod grpc_plugin_source; pub mod mint_accounts_source; +pub mod quic_plugin_source; diff --git a/bin/autobahn-router/src/source/quic_plugin_source.rs b/bin/autobahn-router/src/source/quic_plugin_source.rs new file mode 100644 index 0000000..f2df0ef --- /dev/null +++ b/bin/autobahn-router/src/source/quic_plugin_source.rs @@ -0,0 +1,605 @@ +use itertools::Itertools; +use jsonrpc_core::futures::StreamExt; + +use quic_geyser_common::filters::MemcmpFilter; +use quic_geyser_common::types::connections_parameters::ConnectionParameters; +use solana_sdk::pubkey::Pubkey; + +use anchor_spl::token::spl_token; +use async_channel::{Receiver, Sender}; +use std::collections::HashSet; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Instant; +use std::{collections::HashMap, env, time::Duration}; +use tracing::*; + +use crate::metrics; +use mango_feeds_connector::{chain_data::SlotStatus, SlotUpdate}; +use quic_geyser_common::message::Message; +use router_config_lib::{AccountDataSourceConfig, QuicSourceConfig}; +use router_feed_lib::account_write::{AccountOrSnapshotUpdate, AccountWrite}; +use router_feed_lib::get_program_account::{ + get_snapshot_gma, get_snapshot_gpa, get_snapshot_gta, CustomSnapshotProgramAccounts, + FeedMetadata, +}; +use solana_program::clock::Slot; +use tokio::sync::Semaphore; + +const MAX_GMA_ACCOUNTS: usize = 100; + +// limit number of concurrent gMA/gPA requests +const MAX_PARALLEL_HEAVY_RPC_REQUESTS: usize = 4; + +#[allow(clippy::large_enum_variant)] +pub enum SourceMessage { + QuicMessage(Message), + Snapshot(CustomSnapshotProgramAccounts), +} + +pub async fn feed_data_geyser( + quic_source_config: &QuicSourceConfig, + snapshot_config: AccountDataSourceConfig, + subscribed_accounts: &HashSet, + subscribed_programs: &HashSet, + subscribed_token_accounts: &HashSet, + sender: async_channel::Sender, +) -> anyhow::Result<()> { + let use_compression = snapshot_config.rpc_support_compression.unwrap_or(false); + + let snapshot_rpc_http_url = match &snapshot_config.rpc_http_url.chars().next().unwrap() { + '$' => env::var(&snapshot_config.rpc_http_url[1..]) + .expect("reading connection string from env"), + _ => snapshot_config.rpc_http_url.clone(), + }; + info!("connecting to quic source {:?}", quic_source_config); + + let (quic_client, mut stream, _jh) = quic_geyser_client::non_blocking::client::Client::new( + quic_source_config.connection_string.clone(), + ConnectionParameters { + enable_gso: quic_source_config.enable_gso.unwrap_or(true), + ..Default::default() + }, + ) + .await?; + + let mut subscriptions = vec![]; + + let subscribed_program_filter = subscribed_programs.iter().map(|x| { + quic_geyser_common::filters::Filter::Account(quic_geyser_common::filters::AccountFilter { + owner: Some(*x), + accounts: None, + filters: None, + }) + }); + subscriptions.extend(subscribed_program_filter); + + let subscribed_token_accounts_filter = subscribed_programs.iter().map(|x| { + quic_geyser_common::filters::Filter::Account(quic_geyser_common::filters::AccountFilter { + owner: Some(Pubkey::from_str("TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA").unwrap()), + accounts: None, + filters: Some(vec![ + quic_geyser_common::filters::AccountFilterType::Datasize(165), + quic_geyser_common::filters::AccountFilterType::Memcmp(MemcmpFilter { + offset: 32, + data: quic_geyser_common::filters::MemcmpFilterData::Bytes( + x.to_bytes().to_vec(), + ), + }), + ]), + }) + }); + subscriptions.extend(subscribed_token_accounts_filter); + + subscriptions.push(quic_geyser_common::filters::Filter::Account( + quic_geyser_common::filters::AccountFilter { + accounts: Some(subscribed_accounts.clone()), + owner: None, + filters: None, + }, + )); + + subscriptions.push(quic_geyser_common::filters::Filter::Slot); + quic_client.subscribe(subscriptions).await?; + + // We can't get a snapshot immediately since the finalized snapshot would be for a + // slot in the past and we'd be missing intermediate updates. + // + // Delay the request until the first slot we received all writes for becomes rooted + // to avoid that problem - partially. The rooted slot will still be larger than the + // finalized slot, so add a number of slots as a buffer. + // + // If that buffer isn't sufficient, there'll be a retry. + + // The first slot that we will receive _all_ account writes for + let mut first_full_slot: u64 = u64::MAX; + + // If a snapshot should be performed when ready. + let mut snapshot_needed = true; + + // The highest "rooted" slot that has been seen. + let mut max_finalized_slot = 0; + + // Data for slots will arrive out of order. This value defines how many + // slots after a slot was marked "rooted" we assume it'll not receive + // any more account write information. + // + // This is important for the write_version mapping (to know when slots can + // be dropped). + let max_out_of_order_slots = 40; + + // Number of slots that we expect "finalized" commitment to lag + // behind "rooted". This matters for getProgramAccounts based snapshots, + // which will have "finalized" commitment. + let mut rooted_to_finalized_slots = 30; + + let (snapshot_gma_sender, mut snapshot_gma_receiver) = tokio::sync::mpsc::unbounded_channel(); + // TODO log buffer size + + // The plugin sends a ping every 5s or so + let fatal_idle_timeout = Duration::from_secs(15); + let mut re_snapshot_interval = tokio::time::interval(Duration::from_secs( + snapshot_config + .re_snapshot_interval_secs + .unwrap_or(60 * 60 * 12), + )); + re_snapshot_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + re_snapshot_interval.tick().await; + + // Highest slot that an account write came in for. + let mut newest_write_slot: u64 = 0; + + let mut last_message_received_at = Instant::now(); + + loop { + tokio::select! { + update = stream.recv() => { + let Some(mut message) = update + else { + anyhow::bail!("geyser plugin has closed the stream"); + }; + // use account and slot updates to trigger snapshot loading + match &mut message { + Message::SlotMsg(slot_update) => { + trace!("received slot update for slot {}", slot_update.slot); + let commitment_config = slot_update.commitment_config; + + debug!( + "slot_update: {} ({:?})", + slot_update.slot, + commitment_config + ); + + if commitment_config.is_finalized() { + if first_full_slot == u64::MAX { + // TODO: is this equivalent to before? what was highesy_write_slot? + first_full_slot = slot_update.slot + 1; + } + // TODO rename rooted to finalized + if slot_update.slot > max_finalized_slot { + max_finalized_slot = slot_update.slot; + } + + let waiting_for_snapshot_slot = max_finalized_slot <= first_full_slot + rooted_to_finalized_slots; + + if waiting_for_snapshot_slot { + debug!("waiting for snapshot slot: rooted={}, first_full={}, slot={}", max_finalized_slot, first_full_slot, slot_update.slot); + } + + if snapshot_needed && !waiting_for_snapshot_slot { + snapshot_needed = false; + + debug!("snapshot slot reached - setting up snapshot tasks"); + + let permits_parallel_rpc_requests = Arc::new(Semaphore::new(MAX_PARALLEL_HEAVY_RPC_REQUESTS)); + + info!("Requesting snapshot from gMA for {} filter accounts", subscribed_accounts.len()); + for pubkey_chunk in subscribed_accounts.iter().chunks(MAX_GMA_ACCOUNTS).into_iter() { + let rpc_http_url = snapshot_rpc_http_url.clone(); + let account_ids = pubkey_chunk.map(|x| *x).collect_vec(); + let sender = snapshot_gma_sender.clone(); + let permits = permits_parallel_rpc_requests.clone(); + tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); + let snapshot = get_snapshot_gma(&rpc_http_url, &account_ids).await; + match sender.send(snapshot) { + Ok(_) => {} + Err(_) => { + warn!("Could not send snapshot, quic has probably reconnected"); + } + } + }); + } + + info!("Requesting snapshot from gPA for {} program filter accounts", subscribed_programs.len()); + for program_id in subscribed_programs { + let rpc_http_url = snapshot_rpc_http_url.clone(); + let program_id = *program_id; + let sender = snapshot_gma_sender.clone(); + let permits = permits_parallel_rpc_requests.clone(); + tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); + let snapshot = get_snapshot_gpa(&rpc_http_url, &program_id, use_compression).await; + match sender.send(snapshot) { + Ok(_) => {} + Err(_) => { + warn!("Could not send snapshot, quic has probably reconnected"); + } + } + }); + } + + info!("Requesting snapshot from gTA for {} owners filter accounts", subscribed_token_accounts.len()); + for owner_id in subscribed_token_accounts { + let rpc_http_url = snapshot_rpc_http_url.clone(); + let owner_id = owner_id.clone(); + let sender = snapshot_gma_sender.clone(); + let permits = permits_parallel_rpc_requests.clone(); + tokio::spawn(async move { + let _permit = permits.acquire().await.unwrap(); + let snapshot = get_snapshot_gta(&rpc_http_url, &owner_id).await; + match sender.send(snapshot) { + Ok(_) => {} + Err(_) => { + warn!("Could not send snapshot, quic has probably reconnected"); + } + } + }); + } + } + } + }, + Message::AccountMsg(info) => { + let slot = info.slot_identifier.slot; + trace!("received account update for slot {}", slot); + if slot < first_full_slot { + // Don't try to process data for slots where we may have missed writes: + // We could not map the write_version correctly for them. + continue; + } + + if slot > newest_write_slot { + newest_write_slot = slot; + debug!( + "newest_write_slot: {}", + newest_write_slot + ); + } else if max_finalized_slot > 0 && info.slot_identifier.slot < max_finalized_slot - max_out_of_order_slots { + anyhow::bail!("received write {} slots back from max rooted slot {}", max_finalized_slot - slot, max_finalized_slot); + } + }, + _ => { + // ignore all other quic update types + } + } + + let elapsed = last_message_received_at.elapsed().as_millis(); + metrics::QUIC_NO_MESSAGE_FOR_DURATION_MS.set(elapsed as i64); + last_message_received_at = Instant::now(); + + // send the incremental updates to the channel + sender.send(SourceMessage::QuicMessage(message)).await.expect("send success"); + }, + snapshot_message = snapshot_gma_receiver.recv() => { + let Some(snapshot_result) = snapshot_message + else { + anyhow::bail!("snapshot channel closed"); + }; + let snapshot = snapshot_result?; + debug!("snapshot (program={}, m_accounts={}) is for slot {}, first full slot was {}", + snapshot.program_id.map(|x| x.to_string()).unwrap_or("none".to_string()), + snapshot.accounts.len(), + snapshot.slot, + first_full_slot); + + if snapshot.slot < first_full_slot { + warn!( + "snapshot is too old: has slot {}, expected {} minimum - request another one but also use this snapshot", + snapshot.slot, + first_full_slot + ); + // try again in another 25 slots + snapshot_needed = true; + rooted_to_finalized_slots += 25; + } + + // New - Don't care if the snapshot is old, we want startup to work anyway + // If an edge is not working properly, it will be disabled when swapping it + sender + .send(SourceMessage::Snapshot(snapshot)) + .await + .expect("send success"); + + }, + _ = tokio::time::sleep(fatal_idle_timeout) => { + anyhow::bail!("geyser plugin hasn't sent a message in too long"); + } + _ = re_snapshot_interval.tick() => { + info!("Re-snapshot hack"); + snapshot_needed = true; + } + } + } +} + +pub async fn process_events( + config: AccountDataSourceConfig, + subscription_accounts: HashSet, + subscription_programs: HashSet, + subscription_token_accounts: HashSet, + filters: HashSet, + account_write_queue_sender: async_channel::Sender, + metdata_write_queue_sender: Option>, + slot_queue_sender: async_channel::Sender, + mut exit: tokio::sync::broadcast::Receiver<()>, +) { + // Subscribe to geyser + let (msg_sender, msg_receiver) = + async_channel::bounded::(config.dedup_queue_size); + let mut source_jobs = vec![]; + + let Some(quic_sources) = config.quic_sources.clone() else { + return; + }; + + // note: caller in main.rs ensures this + assert_eq!(quic_sources.len(), 1, "only one quic source supported"); + for quic_source in quic_sources.clone() { + let msg_sender = msg_sender.clone(); + let sub_accounts = subscription_accounts.clone(); + let sub_programs = subscription_programs.clone(); + let sub_token_accounts = subscription_token_accounts.clone(); + + let cfg = config.clone(); + + source_jobs.push(tokio::spawn(async move { + let mut error_count = 0; + let mut last_error = Instant::now(); + + // Continuously reconnect on failure + loop { + let out = feed_data_geyser( + &quic_source, + cfg.clone(), + &sub_accounts, + &sub_programs, + &sub_token_accounts, + msg_sender.clone(), + ); + if last_error.elapsed() > Duration::from_secs(60 * 10) { + error_count = 0; + } + else if error_count > 10 { + error!("error during communication with the geyser plugin - retried too many time, exiting.."); + break; + } + + match out.await { + // happy case! + Err(err) => { + warn!( + "error during communication with the geyser plugin - retrying: {:?}", + err + ); + last_error = Instant::now(); + error_count += 1; + } + // this should never happen + Ok(_) => { + error!("feed_data must return an error, not OK - continue"); + last_error = Instant::now(); + error_count += 1; + } + } + + metrics::QUIC_SOURCE_CONNECTION_RETRIES + .with_label_values(&[&quic_source.name]) + .inc(); + + tokio::time::sleep(std::time::Duration::from_secs( + quic_source.retry_connection_sleep_secs, + )) + .await; + } + })); + } + + // slot -> (pubkey -> write_version) + // + // To avoid unnecessarily sending requests to SQL, we track the latest write_version + // for each (slot, pubkey). If an already-seen write_version comes in, it can be safely + // discarded. + let mut latest_write = HashMap::>::new(); + + // Number of slots to retain in latest_write + let latest_write_retention = 50; + + let mut source_jobs: futures::stream::FuturesUnordered<_> = source_jobs.into_iter().collect(); + + loop { + tokio::select! { + _ = source_jobs.next() => { + warn!("shutting down quic_plugin_source because subtask failed..."); + break; + }, + _ = exit.recv() => { + warn!("shutting down quic_plugin_source..."); + break; + } + msg = msg_receiver.recv() => { + match msg { + Ok(msg) => { + process_account_updated_from_sources(&account_write_queue_sender, + &slot_queue_sender, + &msg_receiver, + msg, + &mut latest_write, + latest_write_retention, + &metdata_write_queue_sender, + &filters, + ).await ; + } + Err(e) => { + warn!("failed to process quic event: {:?}", e); + break; + } + }; + }, + }; + } + + // close all channels to notify downstream CSPs of error + account_write_queue_sender.close(); + metdata_write_queue_sender.map(|s| s.close()); + slot_queue_sender.close(); +} + +// consume channel with snapshot and update data +async fn process_account_updated_from_sources( + account_write_queue_sender: &Sender, + slot_queue_sender: &Sender, + msg_receiver: &Receiver, + msg: SourceMessage, + latest_write: &mut HashMap>, + // in slots + latest_write_retention: u64, + // metric_account_writes: &mut MetricU64, + // metric_account_queue: &mut MetricU64, + // metric_dedup_queue: &mut MetricU64, + // metric_slot_queue: &mut MetricU64, + // metric_slot_updates: &mut MetricU64, + // metric_snapshots: &mut MetricU64, + // metric_snapshot_account_writes: &mut MetricU64, + metdata_write_queue_sender: &Option>, + filters: &HashSet, +) { + let metadata_sender = |msg| { + if let Some(sender) = &metdata_write_queue_sender { + sender.send_blocking(msg) + } else { + Ok(()) + } + }; + + metrics::QUIC_DEDUP_QUEUE.set(msg_receiver.len() as i64); + match msg { + SourceMessage::QuicMessage(message) => { + match message { + Message::AccountMsg(account_message) => { + metrics::QUIC_ACCOUNT_WRITES.inc(); + metrics::QUIC_ACCOUNT_WRITE_QUEUE.set(account_write_queue_sender.len() as i64); + let solana_account = account_message.solana_account(); + + // Skip writes that a different server has already sent + let pubkey_writes = latest_write + .entry(account_message.slot_identifier.slot) + .or_default(); + if !filters.contains(&account_message.pubkey) { + return; + } + + let writes = pubkey_writes.entry(account_message.pubkey).or_insert(0); + if account_message.write_version <= *writes { + return; + } + *writes = account_message.write_version; + latest_write.retain(|&k, _| { + k >= account_message.slot_identifier.slot - latest_write_retention + }); + + account_write_queue_sender + .send(AccountOrSnapshotUpdate::AccountUpdate(AccountWrite { + pubkey: account_message.pubkey, + slot: account_message.slot_identifier.slot, + write_version: account_message.write_version, + lamports: account_message.lamports, + owner: account_message.owner, + executable: account_message.executable, + rent_epoch: account_message.rent_epoch, + data: solana_account.data, + })) + .await + .expect("send success"); + } + Message::SlotMsg(slot_message) => { + metrics::QUIC_SLOT_UPDATES.inc(); + metrics::QUIC_SLOT_UPDATE_QUEUE.set(slot_queue_sender.len() as i64); + + let status = if slot_message.commitment_config.is_processed() { + SlotStatus::Processed + } else if slot_message.commitment_config.is_confirmed() { + SlotStatus::Confirmed + } else { + SlotStatus::Rooted + }; + + let slot_update = SlotUpdate { + slot: slot_message.slot, + parent: Some(slot_message.parent), + status, + }; + + slot_queue_sender + .send(slot_update) + .await + .expect("send success"); + } + _ => { + // ignore update + } + } + } + SourceMessage::Snapshot(update) => { + let label = if let Some(prg) = update.program_id { + if prg == spl_token::ID { + "gpa(tokens)" + } else { + "gpa" + } + } else { + "gma" + }; + metrics::ACCOUNT_SNAPSHOTS + .with_label_values(&[&label]) + .inc(); + debug!( + "processing snapshot for program_id {} -> size={} & missing size={}...", + update + .program_id + .map(|x| x.to_string()) + .unwrap_or("".to_string()), + update.accounts.len(), + update.missing_accounts.len() + ); + if let Err(e) = metadata_sender(FeedMetadata::SnapshotStart(update.program_id)) { + warn!("failed to send feed matadata event: {}", e); + } + + let mut updated_accounts = vec![]; + for account in update.accounts { + metrics::QUIC_SNAPSHOT_ACCOUNT_WRITES.inc(); + metrics::QUIC_ACCOUNT_WRITE_QUEUE.set(account_write_queue_sender.len() as i64); + + if !filters.contains(&account.pubkey) { + continue; + } + + updated_accounts.push(account); + } + account_write_queue_sender + .send(AccountOrSnapshotUpdate::SnapshotUpdate(updated_accounts)) + .await + .expect("send success"); + + for account in update.missing_accounts { + if let Err(e) = metadata_sender(FeedMetadata::InvalidAccount(account)) { + warn!("failed to send feed matadata event: {}", e); + } + } + debug!("processing snapshot done"); + if let Err(e) = metadata_sender(FeedMetadata::SnapshotEnd(update.program_id)) { + warn!("failed to send feed matadata event: {}", e); + } + } + } +} diff --git a/lib/dex-raydium-cp/src/edge.rs b/lib/dex-raydium-cp/src/edge.rs index f27f297..04664c8 100644 --- a/lib/dex-raydium-cp/src/edge.rs +++ b/lib/dex-raydium-cp/src/edge.rs @@ -1,5 +1,3 @@ -use std::any::Any; -use std::panic; use anchor_lang::Id; use anchor_spl::token::Token; use anchor_spl::token_2022::spl_token_2022::extension::transfer_fee::TransferFeeConfig; @@ -14,6 +12,8 @@ use solana_program::clock::Clock; use solana_program::pubkey::Pubkey; use solana_program::sysvar::Sysvar; use solana_sdk::account::ReadableAccount; +use std::any::Any; +use std::panic; use router_lib::dex::{DexEdge, DexEdgeIdentifier}; @@ -189,8 +189,7 @@ pub fn swap_base_output( output_mint: &Option, amount_out: u64, ) -> anyhow::Result<(u64, u64, u64)> { - let res = panic::catch_unwind(|| - { + let res = panic::catch_unwind(|| { let pool_state = pool; let block_timestamp = pool_state.open_time + 1; // TODO FAS his is suppose to be the clock if !pool_state.get_status_by_bit(PoolStatusBitIndex::Swap) @@ -269,7 +268,6 @@ pub fn swap_base_output( } else { anyhow::bail!("Something went wrong in raydium cp") } - } pub fn get_transfer_fee( diff --git a/lib/router-config-lib/src/lib.rs b/lib/router-config-lib/src/lib.rs index 55c2eec..cabe96c 100644 --- a/lib/router-config-lib/src/lib.rs +++ b/lib/router-config-lib/src/lib.rs @@ -11,6 +11,14 @@ pub struct GrpcSourceConfig { pub tls: Option, } +#[derive(Clone, Debug, Default, serde_derive::Deserialize)] +pub struct QuicSourceConfig { + pub name: String, + pub connection_string: String, + pub retry_connection_sleep_secs: u64, + pub enable_gso: Option, +} + #[derive(Clone, Debug, Default, serde_derive::Deserialize)] pub struct TlsConfig { pub ca_cert_path: String, @@ -36,6 +44,7 @@ pub struct Config { pub safety_checks: Option, pub hot_mints: Option, pub debug_config: Option, + pub snapshot_timeout_in_seconds: Option, } impl Config { @@ -78,16 +87,15 @@ pub struct InfinityConfig { #[derive(Clone, Debug, Default, serde_derive::Deserialize)] pub struct AccountDataSourceConfig { pub region: Option, - pub use_quic: Option, - #[serde(deserialize_with = "serde_opt_string_or_env", default)] - pub quic_address: Option, + pub quic_sources: Option>, #[serde(deserialize_with = "serde_string_or_env")] pub rpc_http_url: String, // does RPC node support getProgramAccountsCompressed pub rpc_support_compression: Option, pub re_snapshot_interval_secs: Option, - pub grpc_sources: Vec, + pub grpc_sources: Option>, pub dedup_queue_size: usize, + pub request_timeout_in_seconds: Option, } #[derive(Clone, Debug, serde_derive::Deserialize)] diff --git a/lib/router-feed-lib/src/grpc_tx_watcher.rs b/lib/router-feed-lib/src/grpc_tx_watcher.rs index c9ee286..37e7183 100644 --- a/lib/router-feed-lib/src/grpc_tx_watcher.rs +++ b/lib/router-feed-lib/src/grpc_tx_watcher.rs @@ -185,7 +185,11 @@ pub async fn process_tx_events( let (msg_sender, msg_receiver) = async_channel::bounded::(config.dedup_queue_size); let mut source_jobs = vec![]; - for grpc_source in config.grpc_sources.clone() { + let Some(grpc_sources) = config.grpc_sources.clone() else { + panic!("There should be atleast one grpc source specified for grpc tx watcher"); + }; + + for grpc_source in grpc_sources.clone() { let msg_sender = msg_sender.clone(); // Make TLS config if configured diff --git a/programs/simulator/tests/cases/test_swap_from_dump.rs b/programs/simulator/tests/cases/test_swap_from_dump.rs index 0e1ca6c..32c148e 100644 --- a/programs/simulator/tests/cases/test_swap_from_dump.rs +++ b/programs/simulator/tests/cases/test_swap_from_dump.rs @@ -208,8 +208,7 @@ async fn run_all_swap_from_dump(dump_name: &str) -> Result, Er quote.output_amount != received_out_amount }; - if unexpected_in_amount || unexpected_out_amount - { + if unexpected_in_amount || unexpected_out_amount { debug_print_ix( &mut success, &mut index, @@ -270,7 +269,11 @@ async fn debug_print_ix( error!( "Faulty swapping #{} quote{}: \r\n{} -> {} ({} -> {})\r\n (successfully run {} swap)", index, - if quote.is_exact_out { " (ExactOut)" } else { "" }, + if quote.is_exact_out { + " (ExactOut)" + } else { + "" + }, quote.input_mint, quote.output_mint, quote.input_amount,