Skip to content

Commit

Permalink
print dropped packets together with duration
Browse files Browse the repository at this point in the history
  • Loading branch information
mielverkerken committed Oct 16, 2024
1 parent 49e0aca commit cd64db4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 15 deletions.
33 changes: 21 additions & 12 deletions rustiflow/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use flows::{
basic_flow::BasicFlow, cidds_flow::CiddsFlow, custom_flow::CustomFlow, flow::Flow,
nf_flow::NfFlow,
};
use log::{debug, error};
use log::{debug, error, info};
use output::OutputWriter;
use std::time::Instant;
use tokio::sync::mpsc;
Expand Down Expand Up @@ -121,7 +121,7 @@ async fn run_with_config(config: Config) {

debug!("Starting realtime processing...");
let start = Instant::now();
if let Err(err) = handle_realtime::<$flow_ty>(
let result = handle_realtime::<$flow_ty>(
&interface,
sender,
config.config.threads.unwrap_or(num_cpus::get() as u8),
Expand All @@ -131,21 +131,30 @@ async fn run_with_config(config: Config) {
config.config.expiration_check_interval,
ingress_only,
)
.await
{
error!("Error: {:?}", err);
}
.await;

// Wait for the output task to finish
output_task.await.unwrap_or_else(|e| {
// Wait for the output task to finish (flush and close the writer)
if let Err(e) = output_task.await {
error!("Error waiting for output task: {:?}", e);
});
}

let end = Instant::now();
debug!(
"Duration: {:?} milliseconds",
end.duration_since(start).as_millis()
info!(
"Duration: {:?} seconds",
end.duration_since(start).as_secs_f64()
);

// Now process the result and print the dropped packets
match result {
Ok(dropped_packets) => {
// If successful, log dropped packets count after writer is flushed
info!("Total dropped packets: {}", dropped_packets);
}
Err(err) => {
// Handle errors and log them
error!("Error during realtime processing: {:?}", err);
}
}
}};
}

Expand Down
7 changes: 4 additions & 3 deletions rustiflow/src/realtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use common::{EbpfEventIpv4, EbpfEventIpv6};
use log::{error, info};
use tokio::{io::unix::AsyncFd, signal, sync::mpsc::{self, Sender}, task::JoinSet};

/// Starts the realtime processing of packets on the given interface.
/// The function will return the number of packets dropped by the eBPF program.
pub async fn handle_realtime<T>(
interface: &str,
output_channel: Sender<T>,
Expand All @@ -23,7 +25,7 @@ pub async fn handle_realtime<T>(
early_export: Option<u64>,
expiration_check_interval: u64,
ingress_only: bool,
) -> Result<(), anyhow::Error>
) -> Result<u64, anyhow::Error>
where
T: Flow,
{
Expand Down Expand Up @@ -156,7 +158,6 @@ where
total_dropped += *cpu_val;
}
}
info!("Total dropped packets: {}", total_dropped);

// Cancel the tasks reading ebpf events
handle_set.abort_all();
Expand All @@ -179,7 +180,7 @@ where
}
}

Ok(())
Ok(total_dropped)
}

fn compute_shard_index(flow_key: &str, num_shards: u8) -> usize {
Expand Down

0 comments on commit cd64db4

Please sign in to comment.