diff --git a/examples/subscribe_accounts.rs b/examples/subscribe_accounts.rs index c1bf93e..3003bd1 100644 --- a/examples/subscribe_accounts.rs +++ b/examples/subscribe_accounts.rs @@ -10,10 +10,11 @@ use solana_sdk::clock::UnixTimestamp; use solana_sdk::pubkey::Pubkey; use std::collections::HashMap; use std::env; +use std::str::FromStr; use std::sync::atomic::AtomicU64; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use tokio::sync::mpsc::Receiver; +use tokio::sync::mpsc::{Receiver, Sender}; use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc; use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message}; @@ -53,13 +54,15 @@ pub async fn main() { let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10); let (_exit_tx, exit_rx) = tokio::sync::broadcast::channel::<()>(1); - let _all_accounts = create_geyser_autoconnection_task_with_mpsc( + let (_jh, client_subscribe_tx) = create_geyser_autoconnection_task_with_mpsc( config.clone(), - all_accounts(), + jito2_account(), autoconnect_tx.clone(), exit_rx.resubscribe(), ); + spawn_subscribe_filter_updater(client_subscribe_tx.clone()); + let current_processed_slot = AtomicSlot::default(); start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone()); @@ -102,6 +105,20 @@ fn start_tracking_account_consumer( }); } + +fn spawn_subscribe_filter_updater(client_subscribe_tx: Sender) { + tokio::spawn(async move { + loop { + sleep(Duration::from_secs(5)).await; + info!("updating filters"); + client_subscribe_tx + .send(jito1_account()) + .await + .expect("send"); + } + }); +} + fn get_epoch_sec() -> UnixTimestamp { SystemTime::now() .duration_since(UNIX_EPOCH) @@ -177,6 +194,50 @@ pub fn all_accounts() -> SubscribeRequest { } } +pub fn jito1_account() -> SubscribeRequest { + + // Jito1 + let account = Pubkey::from_str("CXPeim1wQMkcTvEHx9QdhgKREYYJD8bnaCCqPRwJ1to1").unwrap(); + + let mut accounts_subs = HashMap::new(); + accounts_subs.insert( + "client".to_string(), + SubscribeRequestFilterAccounts { + account: vec![account.to_string()], + owner: vec![], + filters: vec![], + }, + ); + + SubscribeRequest { + accounts: accounts_subs, + ..Default::default() + } +} + + +pub fn jito2_account() -> SubscribeRequest { + + // Jito2 + let account = Pubkey::from_str("A4hyMd3FyvUJSRafDUSwtLLaQcxRP4r1BRC9w2AJ1to2").unwrap(); + + let mut accounts_subs = HashMap::new(); + accounts_subs.insert( + "client".to_string(), + SubscribeRequestFilterAccounts { + account: vec![account.to_string()], + owner: vec![], + filters: vec![], + }, + ); + + SubscribeRequest { + accounts: accounts_subs, + ..Default::default() + } +} + + pub fn slots() -> SubscribeRequest { let mut slots_subs = HashMap::new(); slots_subs.insert( diff --git a/src/grpc_subscription_autoreconnect_tasks.rs b/src/grpc_subscription_autoreconnect_tasks.rs index 3d389ad..e8e936c 100644 --- a/src/grpc_subscription_autoreconnect_tasks.rs +++ b/src/grpc_subscription_autoreconnect_tasks.rs @@ -259,33 +259,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc( let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout); 'recv_loop: loop { - if started_at.elapsed() > Duration::from_secs(10) { - warn!("EXPERIMENTAL: reconnect with different filter"); - - let mut sub_accounts = HashMap::new(); - sub_accounts.insert("all_accounts".to_string(), - SubscribeRequestFilterAccounts { - account: vec![], - owner: vec![], - filters: vec![], - }); - - let updated_filter = SubscribeRequest { - slots: Default::default(), - // ALL - accounts: sub_accounts, - ..Default::default() - }; - subscribe_tx.send(updated_filter).await.unwrap(); - - started_at = Instant::now(); - break 'recv_loop ConnectionState::Ready(geyser_stream, subscribe_tx); - } - - // let fut_stream = timeout( - // receive_timeout.unwrap_or(Duration::MAX), - // geyser_stream.next(), - // ); select! { exit_res = exit_notify.recv() => { @@ -302,6 +275,32 @@ pub fn create_geyser_autoconnection_task_with_mpsc( } break 'recv_loop ConnectionState::GracefulShutdown; }, + client_subscribe_update = client_subscribe_rx.recv() => { + match client_subscribe_update { + Some(subscribe_request) => { + debug!("> client subscribe update {:?}", subscribe_request); + let fut_send = subscribe_tx.send(subscribe_request); + let MaybeExit::Continue(filter_subscribe_result) = + await_or_exit(fut_send, exit_notify.recv()).await + else { + break 'recv_loop ConnectionState::GracefulShutdown; + }; + + // let MaybeExit::Continue(()) = + // await_or_exit(fut_send, exit_notify.recv()).await + // else { + // break 'recv_loop ConnectionState::GracefulShutdown; + // }; + } + None => { + warn!("client subscribe channel closed - aborting"); + break 'recv_loop ConnectionState::FatalError( + 0, + FatalErrorReason::DownstreamChannelClosed, + ); + } + } + }, geyser_stream_res = timeout( receive_timeout.unwrap_or(Duration::MAX), geyser_stream.next(), @@ -404,9 +403,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc( }; // -- END match }, - update = client_subscribe_rx.recv() => { - - } }