Skip to content

Commit

Permalink
implement graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
bharath-123 committed Apr 3, 2024
1 parent f472a22 commit 708e35f
Show file tree
Hide file tree
Showing 7 changed files with 409 additions and 51 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/astria-composer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,15 @@ tokio = { workspace = true, features = [
"rt-multi-thread",
"sync",
"time",
"signal",
] }
tokio-util = { workspace = true, features = ["rt"] }
tracing = { workspace = true, features = ["attributes"] }
tryhard = { workspace = true }
tonic = { workspace = true }
tokio-stream = { workspace = true, features = ["net"] }
tonic-health = "0.10.2"
itertools = "0.11.0"

[dependencies.sequencer-client]
package = "astria-sequencer-client"
Expand Down
122 changes: 90 additions & 32 deletions crates/astria-composer/src/collectors/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,25 @@ use astria_core::sequencer::v1::{
};
use astria_eyre::eyre::{
self,
eyre,
WrapErr as _,
};
use ethers::providers::{
Provider,
ProviderError,
Ws,
};
use tokio::sync::{
mpsc::error::SendTimeoutError,
watch,
use tokio::{
select,
sync::{
mpsc::error::SendTimeoutError,
watch,
},
};
use tokio_util::sync::CancellationToken;
use tracing::{
debug,
error,
info,
instrument,
warn,
Expand All @@ -59,6 +65,8 @@ pub(crate) struct Geth {
status: watch::Sender<Status>,
/// Rollup URL
url: String,
// Token to signal the geth collector to stop upon shutdown.
shutdown_token: CancellationToken,
}

#[derive(Debug)]
Expand All @@ -80,7 +88,12 @@ impl Status {

impl Geth {
/// Initializes a new collector instance
pub(crate) fn new(chain_name: String, url: String, executor_handle: executor::Handle) -> Self {
pub(crate) fn new(
chain_name: String,
url: String,
executor_handle: executor::Handle,
shutdown_token: CancellationToken,
) -> Self {
let (status, _) = watch::channel(Status::new());
let rollup_id = RollupId::from_unhashed_bytes(&chain_name);
info!(
Expand All @@ -94,6 +107,7 @@ impl Geth {
executor_handle,
status,
url,
shutdown_token,
}
}

Expand All @@ -116,6 +130,7 @@ impl Geth {
executor_handle,
status,
url,
shutdown_token,
..
} = self;

Expand Down Expand Up @@ -153,39 +168,82 @@ impl Geth {
.await
.wrap_err("failed to subscribe eth client to full pending transactions")?;

let send_timeout_duration = Duration::from_millis(500);

status.send_modify(|status| status.is_connected = true);

while let Some(tx) = tx_stream.next().await {
let tx_hash = tx.hash;
debug!(transaction.hash = %tx_hash, "collected transaction from rollup");
let data = tx.rlp().to_vec();
let seq_action = SequenceAction {
rollup_id,
data,
fee_asset_id: default_native_asset_id(),
};

match executor_handle
.send_timeout(seq_action, Duration::from_millis(500))
.await
{
Ok(()) => {}
Err(SendTimeoutError::Timeout(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"timed out sending new transaction to executor after 500ms; dropping tx"
);
let reason = loop {
select! {
biased;
() = shutdown_token.cancelled() => {
info!("shutdown signal received");
break Ok("shutdown signal received");
},
tx_res = tx_stream.next() => {
if let Some(tx) = tx_res {
let tx_hash = tx.hash;
debug!(transaction.hash = %tx_hash, "collected transaction from rollup");
let data = tx.rlp().to_vec();
let seq_action = SequenceAction {
rollup_id,
data,
fee_asset_id: default_native_asset_id(),
};

match executor_handle
.send_timeout(seq_action, send_timeout_duration)
.await
{
Ok(()) => {}
Err(SendTimeoutError::Timeout(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
timeout_ms = send_timeout_duration.as_millis(),
"timed out sending new transaction to executor; dropping tx",
);
}
Err(SendTimeoutError::Closed(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"executor channel closed while sending transaction; dropping transaction \
and exiting event loop"
);
break Err(eyre!("executor channel closed while sending transaction"));
}
}
} else {
break Err(eyre!("geth tx stream ended"));
}
}
Err(SendTimeoutError::Closed(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"executor channel closed while sending transaction; dropping transaction \
and exiting event loop"
);
break;
}
};

match &reason {
Ok(reason) => {
info!(reason, "starting shutdown");
}
Err(reason) => {
error!(%reason, "starting shutdown");
}
};

debug!("sending connection status to false");
status.send_modify(|status| status.is_connected = false);

debug!("unsubscribing from geth tx stream");
match tx_stream.unsubscribe().await {
Ok(res) => {
if res {
info!("unsubscribed from geth tx stream");
} else {
warn!("failed to unsubscribe from geth tx stream");
}
}
Err(err) => {
error!(%err, "errored out while unsubscribing from geth tx stream");
}
}
Ok(())

reason.map(|_| ())
}
}
Loading

0 comments on commit 708e35f

Please sign in to comment.