Skip to content

Commit

Permalink
working autoconnector
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jan 20, 2025
1 parent c293eba commit 9a8751d
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 33 deletions.
67 changes: 64 additions & 3 deletions examples/subscribe_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use solana_sdk::clock::UnixTimestamp;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::env;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::{Receiver, Sender};

use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message};
Expand Down Expand Up @@ -53,13 +54,15 @@ pub async fn main() {
let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
let (_exit_tx, exit_rx) = tokio::sync::broadcast::channel::<()>(1);

let _all_accounts = create_geyser_autoconnection_task_with_mpsc(
let (_jh, client_subscribe_tx) = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
all_accounts(),
jito2_account(),
autoconnect_tx.clone(),
exit_rx.resubscribe(),
);

spawn_subscribe_filter_updater(client_subscribe_tx.clone());

let current_processed_slot = AtomicSlot::default();
start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone());

Expand Down Expand Up @@ -102,6 +105,20 @@ fn start_tracking_account_consumer(
});

Check warning on line 105 in examples/subscribe_accounts.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/subscribe_accounts.rs
}


fn spawn_subscribe_filter_updater(client_subscribe_tx: Sender<SubscribeRequest>) {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(5)).await;
info!("updating filters");
client_subscribe_tx
.send(jito1_account())
.await
.expect("send");
}
});
}

fn get_epoch_sec() -> UnixTimestamp {
SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand Down Expand Up @@ -177,6 +194,50 @@ pub fn all_accounts() -> SubscribeRequest {
}
}

Check warning on line 195 in examples/subscribe_accounts.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/subscribe_accounts.rs

pub fn jito1_account() -> SubscribeRequest {

// Jito1
let account = Pubkey::from_str("CXPeim1wQMkcTvEHx9QdhgKREYYJD8bnaCCqPRwJ1to1").unwrap();

let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![account.to_string()],
owner: vec![],
filters: vec![],
},
);

SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}

Check warning on line 215 in examples/subscribe_accounts.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/subscribe_accounts.rs
}


pub fn jito2_account() -> SubscribeRequest {

// Jito2
let account = Pubkey::from_str("A4hyMd3FyvUJSRafDUSwtLLaQcxRP4r1BRC9w2AJ1to2").unwrap();

let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![account.to_string()],
owner: vec![],
filters: vec![],
},
);

SubscribeRequest {
accounts: accounts_subs,
..Default::default()

Check warning on line 236 in examples/subscribe_accounts.rs

View workflow job for this annotation

GitHub Actions / test

Diff in /home/runner/work/geyser-grpc-connector/geyser-grpc-connector/examples/subscribe_accounts.rs
}
}


pub fn slots() -> SubscribeRequest {
let mut slots_subs = HashMap::new();
slots_subs.insert(
Expand Down
56 changes: 26 additions & 30 deletions src/grpc_subscription_autoreconnect_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,33 +259,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
let receive_timeout = grpc_source.timeouts.as_ref().map(|t| t.receive_timeout);
'recv_loop: loop {

if started_at.elapsed() > Duration::from_secs(10) {
warn!("EXPERIMENTAL: reconnect with different filter");

let mut sub_accounts = HashMap::new();
sub_accounts.insert("all_accounts".to_string(),
SubscribeRequestFilterAccounts {
account: vec![],
owner: vec![],
filters: vec![],
});

let updated_filter = SubscribeRequest {
slots: Default::default(),
// ALL
accounts: sub_accounts,
..Default::default()
};
subscribe_tx.send(updated_filter).await.unwrap();

started_at = Instant::now();
break 'recv_loop ConnectionState::Ready(geyser_stream, subscribe_tx);
}

// let fut_stream = timeout(
// receive_timeout.unwrap_or(Duration::MAX),
// geyser_stream.next(),
// );

select! {
exit_res = exit_notify.recv() => {
Expand All @@ -302,6 +275,32 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
}
break 'recv_loop ConnectionState::GracefulShutdown;
},
client_subscribe_update = client_subscribe_rx.recv() => {
match client_subscribe_update {
Some(subscribe_request) => {
debug!("> client subscribe update {:?}", subscribe_request);
let fut_send = subscribe_tx.send(subscribe_request);
let MaybeExit::Continue(filter_subscribe_result) =
await_or_exit(fut_send, exit_notify.recv()).await
else {
break 'recv_loop ConnectionState::GracefulShutdown;
};

// let MaybeExit::Continue(()) =
// await_or_exit(fut_send, exit_notify.recv()).await
// else {
// break 'recv_loop ConnectionState::GracefulShutdown;
// };
}
None => {
warn!("client subscribe channel closed - aborting");
break 'recv_loop ConnectionState::FatalError(
0,
FatalErrorReason::DownstreamChannelClosed,
);
}
}
},
geyser_stream_res = timeout(
receive_timeout.unwrap_or(Duration::MAX),
geyser_stream.next(),
Expand Down Expand Up @@ -404,9 +403,6 @@ pub fn create_geyser_autoconnection_task_with_mpsc(
}; // -- END match

},
update = client_subscribe_rx.recv() => {

}
}


Expand Down

0 comments on commit 9a8751d

Please sign in to comment.