Skip to content

Commit

Permalink
implement graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
bharath-123 committed Mar 31, 2024
1 parent e7d3da8 commit ae4bca2
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 575 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/astria-composer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"sync",
"time",
"signal",
] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true, features = ["attributes"] }
tryhard = { workspace = true }
tonic = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
tonic-health = "0.10.2"
itertools = "0.11.0"

[dependencies.sequencer-client]
package = "astria-sequencer-client"
Expand Down
90 changes: 59 additions & 31 deletions crates/astria-composer/src/collectors/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ use ethers::providers::{
ProviderError,
Ws,
};
use tokio::sync::{
mpsc::error::SendTimeoutError,
watch,
use tokio::{
select,
sync::{
mpsc::error::SendTimeoutError,
watch,
},
};
use tokio_util::sync::CancellationToken;
use tracing::{
debug,
info,
Expand Down Expand Up @@ -59,6 +63,7 @@ pub(crate) struct Geth {
status: watch::Sender<Status>,
/// Rollup URL
url: String,
shutdown_token: CancellationToken,
}

#[derive(Debug)]
Expand All @@ -80,7 +85,12 @@ impl Status {

impl Geth {
/// Initializes a new collector instance
pub(crate) fn new(chain_name: String, url: String, executor_handle: executor::Handle) -> Self {
pub(crate) fn new(
chain_name: String,
url: String,
executor_handle: executor::Handle,
shutdown_token: CancellationToken,
) -> Self {
let (status, _) = watch::channel(Status::new());
let rollup_id = RollupId::from_unhashed_bytes(&chain_name);
info!(
Expand All @@ -94,6 +104,7 @@ impl Geth {
executor_handle,
status,
url,
shutdown_token,
}
}

Expand All @@ -116,6 +127,7 @@ impl Geth {
executor_handle,
status,
url,
shutdown_token,
..
} = self;

Expand Down Expand Up @@ -155,37 +167,53 @@ impl Geth {

status.send_modify(|status| status.is_connected = true);

while let Some(tx) = tx_stream.next().await {
let tx_hash = tx.hash;
debug!(transaction.hash = %tx_hash, "collected transaction from rollup");
let data = tx.rlp().to_vec();
let seq_action = SequenceAction {
rollup_id,
data,
fee_asset_id: default_native_asset_id(),
};

match executor_handle
.send_timeout(seq_action, Duration::from_millis(500))
.await
{
Ok(()) => {}
Err(SendTimeoutError::Timeout(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"timed out sending new transaction to executor after 500ms; dropping tx"
);
}
Err(SendTimeoutError::Closed(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"executor channel closed while sending transaction; dropping transaction \
and exiting event loop"
);
loop {
select! {
biased;
() = shutdown_token.cancelled() => {
tx_stream.unsubscribe().await?;
status.send_modify(|status| status.is_connected = false);
break;
},
tx_res = tx_stream.next() => {
if let Some(tx) = tx_res {
let tx_hash = tx.hash;
debug!(transaction.hash = %tx_hash, "collected transaction from rollup");
let data = tx.rlp().to_vec();
let seq_action = SequenceAction {
rollup_id,
data,
fee_asset_id: default_native_asset_id(),
};

match executor_handle
.send_timeout(seq_action, Duration::from_millis(500))
.await
{
Ok(()) => {}
Err(SendTimeoutError::Timeout(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"timed out sending new transaction to executor after 500ms; dropping tx"
);
}
Err(SendTimeoutError::Closed(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"executor channel closed while sending transaction; dropping transaction \
and exiting event loop"
);
break;
}
}
} else {
status.send_modify(|status| status.is_connected = false);
break;
}
}
}
}

Ok(())
}
}
Loading

0 comments on commit ae4bca2

Please sign in to comment.