Skip to content

Commit

Permalink
allow subscribe filter update via channel
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jan 21, 2025
1 parent ee3ac1d commit 6ca8126
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 134 deletions.
104 changes: 101 additions & 3 deletions examples/subscribe_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ use solana_sdk::clock::UnixTimestamp;
use solana_sdk::pubkey::Pubkey;
use std::collections::HashMap;
use std::env;
use std::str::FromStr;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc::Receiver;
use tokio::sync::mpsc::{Receiver, Sender};

use geyser_grpc_connector::grpc_subscription_autoreconnect_tasks::create_geyser_autoconnection_task_with_mpsc;
use geyser_grpc_connector::{GrpcConnectionTimeouts, GrpcSourceConfig, Message};
Expand Down Expand Up @@ -52,14 +53,26 @@ pub async fn main() {

let (autoconnect_tx, geyser_messages_rx) = tokio::sync::mpsc::channel(10);
let (_exit_tx, exit_rx) = tokio::sync::broadcast::channel::<()>(1);
let (subscribe_filter_update_tx, mut _subscribe_filter_update_rx) =
tokio::sync::mpsc::channel::<SubscribeRequest>(1);

let _all_accounts = create_geyser_autoconnection_task_with_mpsc(
let _jh = create_geyser_autoconnection_task_with_mpsc(
config.clone(),
all_accounts(),
jito2_account(),
autoconnect_tx.clone(),
exit_rx.resubscribe(),
);

// testcase 1
// test if the autoconnector continues to work even if the channel drops
// drop(subscribe_filter_update_tx);

// testcase 2
spawn_subscribe_filter_updater(subscribe_filter_update_tx.clone());

// testcase 3
// spawn_subscribe_broken_filter_updater(subscribe_filter_update_tx.clone());

let current_processed_slot = AtomicSlot::default();
start_tracking_account_consumer(geyser_messages_rx, current_processed_slot.clone());

Expand Down Expand Up @@ -102,6 +115,34 @@ fn start_tracking_account_consumer(
});
}

#[allow(dead_code)]
fn spawn_subscribe_filter_updater(subscribe_filter_update_tx: Sender<SubscribeRequest>) {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(5)).await;
info!("updating filters");
subscribe_filter_update_tx
.send(jito1_account())
.await
.expect("send");
}
});
}

#[allow(dead_code)]
fn spawn_subscribe_broken_filter_updater(subscribe_filter_update_tx: Sender<SubscribeRequest>) {
tokio::spawn(async move {
loop {
sleep(Duration::from_secs(5)).await;
info!("updating filters");
subscribe_filter_update_tx
.send(broken_subscription())
.await
.expect("send");
}
});
}

fn get_epoch_sec() -> UnixTimestamp {
SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand Down Expand Up @@ -177,6 +218,63 @@ pub fn all_accounts() -> SubscribeRequest {
}
}

pub fn jito1_account() -> SubscribeRequest {
// Jito1
let account = Pubkey::from_str("CXPeim1wQMkcTvEHx9QdhgKREYYJD8bnaCCqPRwJ1to1").unwrap();

let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![account.to_string()],
owner: vec![],
filters: vec![],
},
);

SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}
}

pub fn jito2_account() -> SubscribeRequest {
// Jito2
let account = Pubkey::from_str("A4hyMd3FyvUJSRafDUSwtLLaQcxRP4r1BRC9w2AJ1to2").unwrap();

let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"client".to_string(),
SubscribeRequestFilterAccounts {
account: vec![account.to_string()],
owner: vec![],
filters: vec![],
},
);

SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}
}

pub fn broken_subscription() -> SubscribeRequest {
let mut accounts_subs = HashMap::new();
accounts_subs.insert(
"broken_subscription".to_string(),
SubscribeRequestFilterAccounts {
account: vec!["nota_pubkey".to_string()],
owner: vec![],
filters: vec![],
},
);

SubscribeRequest {
accounts: accounts_subs,
..Default::default()
}
}

pub fn slots() -> SubscribeRequest {
let mut slots_subs = HashMap::new();
slots_subs.insert(
Expand Down
1 change: 1 addition & 0 deletions src/grpc_subscription_autoreconnect_streams.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/// NOT MAINTAINED - please use the `grpc_subscription_autoreconnect_streams` module instead
use std::time::Duration;

use async_stream::stream;
Expand Down
Loading

0 comments on commit 6ca8126

Please sign in to comment.