Skip to content

Commit

Permalink
addressing first set of review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bharath-123 committed Apr 2, 2024
1 parent 8f95f90 commit f07c5db
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 19 deletions.
23 changes: 13 additions & 10 deletions crates/astria-composer/src/collectors/geth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,15 @@ impl Geth {
.await
.wrap_err("failed to subscribe eth client to full pending transactions")?;

let send_timeout_duration = Duration::from_millis(500);

status.send_modify(|status| status.is_connected = true);

loop {
select! {
biased;
() = shutdown_token.cancelled() => {
tx_stream.unsubscribe().await?;
status.send_modify(|status| status.is_connected = false);
break;
},
tx_res = tx_stream.next() => {
Expand All @@ -188,33 +189,35 @@ impl Geth {
};

match executor_handle
.send_timeout(seq_action, Duration::from_millis(500))
.send_timeout(seq_action, send_timeout_duration)
.await
{
Ok(()) => {}
Err(SendTimeoutError::Timeout(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"timed out sending new transaction to executor after 500ms; dropping tx"
);
transaction.hash = %tx_hash,
timeout_ms = send_timeout_duration.as_millis(),
"timed out sending new transaction to executor; dropping tx",
);
}
Err(SendTimeoutError::Closed(_seq_action)) => {
warn!(
transaction.hash = %tx_hash,
"executor channel closed while sending transaction; dropping transaction \
and exiting event loop"
);
transaction.hash = %tx_hash,
"executor channel closed while sending transaction; dropping transaction \
and exiting event loop"
);
break;
}
}
} else {
status.send_modify(|status| status.is_connected = false);
break;
}
}
}
}

status.send_modify(|status| status.is_connected = false);

Ok(())
}
}
13 changes: 6 additions & 7 deletions crates/astria-composer/src/composer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,6 @@ impl Composer {
biased;
_ = sigterm.recv() => {
info!("received SIGTERM; shutting down");
shutdown_token.cancel();
break;
},
o = &mut api_task => {
Expand Down Expand Up @@ -323,17 +322,17 @@ async fn shutdown(
.map(flatten)
{
Ok(Ok(())) => info!("executor shut down"),
Ok(Err(error)) => error!(%error, "executor failed to shut down"),
Err(error) => error!(%error, "executor panciked"),
Ok(Err(error)) => error!(%error, "executor shutdown with error"),
Err(error) => error!(%error, "executor failed to shutdown in time"),
}
// We give the grpc server 5 seconds to shutdown.
match tokio::time::timeout(std::time::Duration::from_secs(5), grpc_server_task_handle)
.await
.map(flatten)
{
Ok(Ok(())) => info!("grpc server shut down"),
Ok(Err(error)) => error!(%error, "grpc server failed to shut down"),
Err(error) => error!(%error, "grpc server failed to shut down"),
Ok(Err(error)) => error!(%error, "grpc server shutdown with error"),
Err(error) => error!(%error, "grpc server failed to shut down in time"),
}

let shutdown_loop = async {
Expand Down Expand Up @@ -369,8 +368,8 @@ async fn shutdown(
.map(flatten)
{
Ok(Ok(())) => info!("api server shut down"),
Ok(Err(error)) => error!(%error, "api server failed to shut down"),
Err(error) => error!(%error, "api server panicked"),
Ok(Err(error)) => error!(%error, "api server shutdown with error"),
Err(error) => error!(%error, "api server failed to shutdown in time"),
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/astria-composer/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,8 +240,6 @@ impl Executor {
// do not accept any new txs during shutdown.
self.serialized_rollup_transactions.close();

self.status.send_modify(|status| status.is_connected = false);

break Ok("received shutdown signal");
}
// process submission result and update nonce
Expand Down Expand Up @@ -292,6 +290,8 @@ impl Executor {
}
};

self.status.send_modify(|status| status.is_connected = false);

match reason {
Ok(reason) => {
info!(%reason, "draining remaining bundles");
Expand Down

0 comments on commit f07c5db

Please sign in to comment.