Skip to content

Commit

Permalink
refactor(client): log a list of available protocols on startup (#509)
Browse files Browse the repository at this point in the history
  • Loading branch information
zizou0x authored Feb 7, 2025
2 parents 3e7836b + 851ada9 commit 20fbd06
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 11 deletions.
41 changes: 33 additions & 8 deletions tycho-client/src/cli.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use std::{str::FromStr, time::Duration};
use std::{collections::HashSet, str::FromStr, time::Duration};

use clap::Parser;
use tracing::{debug, info};
use tracing_appender::rolling;

use tycho_core::dto::{Chain, ExtractorIdentity};
use tycho_core::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};

use crate::{
deltas::DeltasClient,
feed::{
component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
BlockSynchronizer,
},
rpc::RPCClient,
HttpRPCClient, WsDeltasClient,
};

Expand Down Expand Up @@ -183,6 +184,9 @@ async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) {
};

let ws_client = WsDeltasClient::new(&tycho_ws_url, args.auth_key.as_deref()).unwrap();
let rpc_client = HttpRPCClient::new(&tycho_rpc_url, args.auth_key.as_deref()).unwrap();
let chain =
Chain::from_str(&args.chain).unwrap_or_else(|_| panic!("Unknown chain {}", &args.chain));
let ws_jh = ws_client
.connect()
.await
Expand All @@ -197,13 +201,34 @@ async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) {
block_sync.max_messages(*mm);
}

let available_protocols_set = rpc_client
.get_protocol_systems(&ProtocolSystemsRequestBody {
chain,
pagination: PaginationParams { page: 0, page_size: 100 },
})
.await
.unwrap()
.protocol_systems
.into_iter()
.collect::<HashSet<_>>();

let requested_protocol_set = exchanges
.iter()
.map(|(name, _)| name.clone())
.collect::<HashSet<_>>();

let not_requested_protocols = available_protocols_set
.difference(&requested_protocol_set)
.cloned()
.collect::<Vec<_>>();

if !not_requested_protocols.is_empty() {
tracing::info!("Other available protocols: {}", not_requested_protocols.join(", "));
}

for (name, address) in exchanges {
debug!("Registering exchange: {}", name);
let id = ExtractorIdentity {
chain: Chain::from_str(&args.chain)
.unwrap_or_else(|_| panic!("Unknown chain {}", &args.chain)),
name: name.clone(),
};
let id = ExtractorIdentity { chain, name: name.clone() };
let filter = if address.is_some() {
ComponentFilter::Ids(vec![address.unwrap()])
} else if let (Some(remove_tvl), Some(add_tvl)) =
Expand All @@ -219,7 +244,7 @@ async fn run(exchanges: Vec<(String, Option<String>)>, args: CliArgs) {
filter,
3,
!args.no_state,
HttpRPCClient::new(&tycho_rpc_url, args.auth_key.as_deref()).unwrap(),
rpc_client.clone(),
ws_client.clone(),
);
block_sync = block_sync.register_synchronizer(id, sync);
Expand Down
38 changes: 35 additions & 3 deletions tycho-client/src/stream.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
use std::{collections::HashMap, env, time::Duration};
use std::{
collections::{HashMap, HashSet},
env,
time::Duration,
};
use thiserror::Error;
use tokio::{sync::mpsc::Receiver, task::JoinHandle};
use tracing::info;

use tycho_core::dto::{Chain, ExtractorIdentity};
use tycho_core::dto::{Chain, ExtractorIdentity, PaginationParams, ProtocolSystemsRequestBody};

use crate::{
deltas::DeltasClient,
feed::{
component_tracker::ComponentFilter, synchronizer::ProtocolStateSynchronizer,
BlockSynchronizer, FeedMessage,
},
rpc::RPCClient,
HttpRPCClient, WsDeltasClient,
};

Expand Down Expand Up @@ -135,6 +140,7 @@ impl TychoStreamBuilder {

// Initialize the WebSocket client
let ws_client = WsDeltasClient::new(&tycho_ws_url, auth_key.as_deref()).unwrap();
let rpc_client = HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref()).unwrap();
let ws_jh = ws_client
.connect()
.await
Expand All @@ -146,6 +152,32 @@ impl TychoStreamBuilder {
Duration::from_secs(self.timeout),
);

let available_protocols_set = rpc_client
.get_protocol_systems(&ProtocolSystemsRequestBody {
chain: self.chain,
pagination: PaginationParams { page: 0, page_size: 100 },
})
.await
.unwrap()
.protocol_systems
.into_iter()
.collect::<HashSet<_>>();

let requested_protocol_set = self
.exchanges
.keys()
.cloned()
.collect::<HashSet<_>>();

let not_requested_protocols = available_protocols_set
.difference(&requested_protocol_set)
.cloned()
.collect::<Vec<_>>();

if !not_requested_protocols.is_empty() {
tracing::info!("Other available protocols: {}", not_requested_protocols.join(", "));
}

// Register each exchange with the BlockSynchronizer
for (name, filter) in self.exchanges {
info!("Registering exchange: {}", name);
Expand All @@ -156,7 +188,7 @@ impl TychoStreamBuilder {
filter,
3,
!self.no_state,
HttpRPCClient::new(&tycho_rpc_url, auth_key.as_deref()).unwrap(),
rpc_client.clone(),
ws_client.clone(),
);
block_sync = block_sync.register_synchronizer(id, sync);
Expand Down

0 comments on commit 20fbd06

Please sign in to comment.