Skip to content

Commit

Permalink
Add a journal tracing all actor messages
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
  • Loading branch information
didier-wenzek committed Jan 29, 2025
1 parent 2f75b67 commit dbea35d
Show file tree
Hide file tree
Showing 6 changed files with 56 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 17 additions & 0 deletions crates/common/tedge_config/src/system_services/log_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,25 @@ pub fn log_init(
.with_filter(filter_fn(|metadata| metadata.target() == "Audit"))
});

// Actor traces
let trace_appender = RollingFileAppender::builder()
.rotation(Rotation::DAILY)
.filename_prefix("tedge.actors.log")
.max_log_files(2);
let trace_layer = trace_appender
.build("/var/log/tedge")
.ok()
.map(|trace_appender| {
tracing_subscriber::fmt::layer()
.with_writer(trace_appender)
.with_timer(tracing_subscriber::fmt::time::UtcTime::rfc_3339())
.with_filter(LevelFilter::DEBUG)
.with_filter(filter_fn(|metadata| metadata.target() == "Actors"))
});

tracing_subscriber::registry()
.with(audit_layer)
.with(trace_layer)
.with(log_layer)
.init();

Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_actors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ test-helpers = []
[dependencies]
async-trait = { workspace = true }
futures = { workspace = true }
log = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, default_features = false, features = [
"sync",
"rt",
"macros",
"time",
] }
tracing = { workspace = true }

[dev-dependencies]
tokio = { workspace = true, default_features = false, features = [
Expand Down
27 changes: 16 additions & 11 deletions crates/core/tedge_actors/src/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ use crate::RuntimeRequest;
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::StreamExt;
use log::debug;
use std::fmt::Debug;

#[async_trait]
Expand Down Expand Up @@ -160,22 +159,22 @@ impl<Input: Debug> LoggingReceiver<Input> {
}

#[async_trait]
impl<Input: Send + Debug> MessageReceiver<Input> for LoggingReceiver<Input> {
impl<Input: Send + Debug + Sync> MessageReceiver<Input> for LoggingReceiver<Input> {
async fn try_recv(&mut self) -> Result<Option<Input>, RuntimeRequest> {
let message = self.receiver.try_recv().await;
debug!(target: &self.name, "recv {:?}", message);
log_message_received(&self.name, &message);
message
}

async fn recv(&mut self) -> Option<Input> {
let message = self.receiver.recv().await;
debug!(target: &self.name, "recv {:?}", message);
log_message_received(&self.name, &message);
message
}

async fn recv_signal(&mut self) -> Option<RuntimeRequest> {
let message = self.receiver.recv_signal().await;
debug!(target: &self.name, "recv {:?}", message);
log_message_received(&self.name, &message);
message
}
}
Expand Down Expand Up @@ -208,8 +207,14 @@ impl<Output: Message> Sender<Output> for LoggingSender<Output> {
}
}

pub fn log_message_sent<I: Debug>(target: &str, message: I) {
debug!(target: target, "send {message:?}");
#[inline]
pub fn log_message_received<I: Debug>(actor: &str, message: &I) {
tracing::debug!(target: "Actors", actor, recv = ?message);
}

#[inline]
pub fn log_message_sent<I: Debug>(actor: &str, message: &I) {
tracing::debug!(target: "Actors", actor, send = ?message);
}

/// An unbounded receiver
Expand Down Expand Up @@ -251,10 +256,10 @@ impl<Input: Debug> UnboundedLoggingReceiver<Input> {
}

#[async_trait]
impl<Input: Send + Debug> MessageReceiver<Input> for UnboundedLoggingReceiver<Input> {
impl<Input: Send + Debug + Sync> MessageReceiver<Input> for UnboundedLoggingReceiver<Input> {
async fn try_recv(&mut self) -> Result<Option<Input>, RuntimeRequest> {
let message = self.next_message().await;
debug!(target: &self.name, "recv {:?}", message);
log_message_received(&self.name, &message);
message
}

Expand All @@ -263,13 +268,13 @@ impl<Input: Send + Debug> MessageReceiver<Input> for UnboundedLoggingReceiver<In
Ok(Some(message)) => Some(message),
_ => None,
};
debug!(target: &self.name, "recv {:?}", message);
log_message_received(&self.name, &message);
message
}

async fn recv_signal(&mut self) -> Option<RuntimeRequest> {
let message = self.signal_receiver.next().await;
debug!(target: &self.name, "recv {:?}", message);
log_message_received(&self.name, &message);
message
}
}
Expand Down
38 changes: 19 additions & 19 deletions crates/core/tedge_actors/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@ use crate::RuntimeRequestSink;
use futures::channel::mpsc;
use futures::prelude::*;
use futures::stream::FuturesUnordered;
use log::debug;
use log::error;
use log::info;
use std::collections::HashMap;
use std::panic;
use std::time::Duration;
use tokio::task::JoinError;
use tokio::task::JoinHandle;
use tracing::debug;
use tracing::error;
use tracing::info;

/// Actions sent by actors to the runtime
#[derive(Debug)]
Expand Down Expand Up @@ -95,7 +95,7 @@ impl Runtime {
/// and all the running tasks have reach completion (successfully or not).
pub async fn run_to_completion(self) -> Result<(), RuntimeError> {
if let Err(err) = Runtime::wait_for_completion(self.bg_task).await {
error!("Aborted due to {err}");
error!(target: "Actors", "Aborted due to {err}");
std::process::exit(1)
}

Expand Down Expand Up @@ -138,7 +138,7 @@ impl RuntimeHandle {

/// Send an action to the runtime
async fn send(&mut self, action: RuntimeAction) -> Result<(), ChannelError> {
debug!(target: "Runtime", "schedule {:?}", action);
debug!(target: "Actors", "schedule {:?}", action);
self.actions_sender.send(action).await?;
Ok(())
}
Expand Down Expand Up @@ -175,7 +175,7 @@ impl RuntimeActor {
}

async fn run(mut self) -> Result<(), RuntimeError> {
info!(target: "Runtime", "Started");
info!(target: "Actors", "Started");
let mut aborting_error = None;
let mut actors_count: usize = 0;
loop {
Expand All @@ -186,7 +186,7 @@ impl RuntimeActor {
match action {
RuntimeAction::Spawn(actor) => {
let running_name = format!("{}-{}", actor.name(), actors_count);
info!(target: "Runtime", "Running {running_name}");
info!(target: "Actors", "Running {running_name}");
self.send_event(RuntimeEvent::Started {
task: running_name.clone(),
})
Expand All @@ -196,22 +196,22 @@ impl RuntimeActor {
actors_count += 1;
}
RuntimeAction::Shutdown => {
info!(target: "Runtime", "Shutting down");
info!(target: "Actors", "Shutting down");
shutdown_actors(&mut self.running_actors).await;
break;
}
}
}
None => {
info!(target: "Runtime", "Runtime actions channel closed, runtime stopping");
info!(target: "Actors", "Runtime actions channel closed, runtime stopping");
shutdown_actors(&mut self.running_actors).await;
break;
}
}
},
Some(finished_actor) = self.futures.next() => {
if let Err(error) = self.handle_actor_finishing(finished_actor).await {
info!(target: "Runtime", "Shutting down on error: {error}");
info!(target: "Actors", "Shutting down on error: {error}");
aborting_error = Some(error);
shutdown_actors(&mut self.running_actors).await;
break
Expand All @@ -222,12 +222,12 @@ impl RuntimeActor {

tokio::select! {
_ = tokio::time::sleep(self.cleanup_duration) => {
error!(target: "Runtime", "Timeout waiting for all actors to shutdown");
error!(target: "Actors", "Timeout waiting for all actors to shutdown");
for still_running in self.running_actors.keys() {
error!(target: "Runtime", "Failed to shutdown: {still_running}")
error!(target: "Actors", "Failed to shutdown: {still_running}")
}
}
_ = self.wait_for_actors_to_finish() => info!(target: "Runtime", "All actors have finished")
_ = self.wait_for_actors_to_finish() => info!(target: "Actors", "All actors have finished")
}

match aborting_error {
Expand All @@ -248,18 +248,18 @@ impl RuntimeActor {
) -> Result<(), RuntimeError> {
match finished_actor {
Err(e) => {
error!(target: "Runtime", "Failed to execute actor: {e}");
error!(target: "Actors", "Failed to execute actor: {e}");
Err(RuntimeError::JoinError(e))
}
Ok(Ok(actor)) => {
self.running_actors.remove(&actor);
info!(target: "Runtime", "Actor has finished: {actor}");
info!(target: "Actors", "Actor has finished: {actor}");
self.send_event(RuntimeEvent::Stopped { task: actor }).await;
Ok(())
}
Ok(Err((actor, error))) => {
self.running_actors.remove(&actor);
error!(target: "Runtime", "Actor {actor} has finished unsuccessfully: {error:?}");
error!(target: "Actors", "Actor {actor} has finished unsuccessfully: {error:?}");
self.send_event(RuntimeEvent::Aborted {
task: actor.clone(),
error: format!("{error}"),
Expand All @@ -273,7 +273,7 @@ impl RuntimeActor {
async fn send_event(&mut self, event: RuntimeEvent) {
if let Some(events) = &mut self.events {
if let Err(e) = events.send(event).await {
error!(target: "Runtime", "Failed to send RuntimeEvent: {e}");
error!(target: "Actors", "Failed to send RuntimeEvent: {e}");
}
}
}
Expand All @@ -286,10 +286,10 @@ where
for (running_as, sender) in a {
match sender.send(RuntimeRequest::Shutdown).await {
Ok(()) => {
debug!(target: "Runtime", "Successfully sent shutdown request to {running_as}")
debug!(target: "Actors", "Successfully sent shutdown request to {running_as}")
}
Err(e) => {
error!(target: "Runtime", "Failed to send shutdown request to {running_as}: {e:?}")
error!(target: "Actors", "Failed to send shutdown request to {running_as}: {e:?}")
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/servers/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl<Request: Message, Response: Message> ConcurrentServerMessageBox<Request, Re
}
Some(result) = self.running_request_handlers.next() => {
if let Err(err) = result {
log::error!("Fail to run a request to completion: {err}");
tracing::error!(target: "Actors", "Fail to run a request to completion: {err}");
}
}
else => {
Expand All @@ -73,7 +73,7 @@ impl<Request: Message, Response: Message> ConcurrentServerMessageBox<Request, Re
tokio::select! {
Some(result) = self.running_request_handlers.next() => {
if let Err(err) = result {
log::error!("Fail to run a request to completion: {err}");
tracing::error!(target: "Actors", "Fail to run a request to completion: {err}");
}
ControlFlow::Continue(())
},
Expand Down

0 comments on commit dbea35d

Please sign in to comment.