diff --git a/Cargo.lock b/Cargo.lock index e610ce9..c9c76c5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -80,19 +80,6 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" -[[package]] -name = "async-channel" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "336d835910fab747186c56586562cb46f42809c2843ef3a84f47509009522838" -dependencies = [ - "concurrent-queue", - "event-listener", - "event-listener-strategy", - "futures-core", - "pin-project-lite", -] - [[package]] name = "async-trait" version = "0.1.74" @@ -262,15 +249,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "concurrent-queue" -version = "2.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "core-foundation" version = "0.9.3" @@ -287,15 +265,6 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e496a50fda8aacccc86d7529e2c1e0892dbd0f898a6b5645b5561b89c3210efa" -[[package]] -name = "crossbeam-utils" -version = "0.8.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294" -dependencies = [ - "cfg-if", -] - [[package]] name = "dtoa" version = "1.0.9" @@ -311,27 +280,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "event-listener" -version = "3.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29e56284f00d94c1bc7fd3c77027b4623c88c1f53d8d2394c6199f2921dea325" -dependencies = [ - "concurrent-queue", - "parking", - "pin-project-lite", -] - -[[package]] -name = "event-listener-strategy" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d96b852f1345da36d551b9473fa1e2b1eb5c5195585c6c018118bc92a8d91160" -dependencies = [ - "event-listener", - "pin-project-lite", -] - [[package]] name = "fnv" version = "1.0.7" @@ -662,7 +610,6 @@ name = "openmetrics_udpserver" version = "0.1.0" dependencies = [ "anyhow", - "async-channel", "axum", "byteorder", "bytes", @@ -690,12 +637,6 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" -[[package]] -name = "parking" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" - [[package]] name = "parking_lot" version = "0.12.1" diff --git a/openmetrics_udpserver/Cargo.toml b/openmetrics_udpserver/Cargo.toml index 0ca89e8..9307463 100644 --- a/openmetrics_udpserver/Cargo.toml +++ b/openmetrics_udpserver/Cargo.toml @@ -10,10 +10,9 @@ clap = "4.4.*" bytes = "1.5.*" regex = "1.10.*" byteorder = "1.5.*" -async-channel = "2.0.*" prometheus-client = "0.21.*" hyper = { version = "0.14.*", features = ["http2", "server"] } -tokio = { version = "1.33.*", features = ["macros", "rt-multi-thread", "signal"] } +tokio = { version = "1.33.*", features = ["macros", "rt-multi-thread", "signal", "sync"] } axum = { version = "0.6.*", features = ["macros", "http1", "tokio"], default-features = false } openmetrics_udpserver_lib = { path = "../openmetrics_udpserver_lib" } diff --git a/openmetrics_udpserver/src/main.rs b/openmetrics_udpserver/src/main.rs index 66785bf..f781d46 100644 --- a/openmetrics_udpserver/src/main.rs +++ b/openmetrics_udpserver/src/main.rs @@ -10,11 +10,11 @@ use crate::processor::{InboundMetric, Processor}; use crate::serverdensity::aggregator::{ServerDensityAggregator, ServerDensityConfig}; use crate::udp_server::UdpServer; use anyhow::{anyhow, Context}; -use async_channel::unbounded; use clap::{Arg, ArgAction, Command}; use prometheus_client::registry::Registry; use std::process::exit; use std::sync::Arc; +use tokio::sync::broadcast::channel; use tokio::sync::RwLock; #[tokio::main] @@ -86,10 +86,10 @@ async fn main() -> anyhow::Result<(), anyhow::Error> { println!("http host: {}", &config.http_bind); let metric_registry = Arc::new(RwLock::new(Registry::default())); - let (sender, receiver) = unbounded::(); + let (sender, receiver) = channel::(100_000); let processor_config = config.clone(); - let processor_receiver = receiver.clone(); + let processor_receiver = sender.subscribe(); let processor_registry = metric_registry.clone(); let processor_handle = tokio::spawn(async move { let mut processor = Processor::new(processor_config, processor_registry); diff --git a/openmetrics_udpserver/src/processor.rs b/openmetrics_udpserver/src/processor.rs index 1659c19..570c982 100644 --- a/openmetrics_udpserver/src/processor.rs +++ b/openmetrics_udpserver/src/processor.rs @@ -3,14 +3,16 @@ use crate::metrics::resetting_counter::ResettingCounterMetric; use crate::metrics::resetting_value_metric::ResettingSingleValMetric; use crate::metrics::ModifyMetric; use anyhow::anyhow; -use async_channel::Receiver; use openmetrics_udpserver_lib::MetricType; use prometheus_client::registry::{Metric, Registry}; use regex::Regex; use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; +use tokio::sync::broadcast::error::TryRecvError; +use tokio::sync::broadcast::Receiver; use tokio::sync::RwLock; +use tokio::task::yield_now; #[derive(Debug, Clone)] pub struct InboundMetric { @@ -34,11 +36,11 @@ impl Processor { } } - pub async fn run(&mut self, receiver: Receiver) { + pub async fn run(&mut self, mut receiver: Receiver) { let regex_allowed_chars = Regex::new(r"^[^a-zA-Z_:]|[^a-zA-Z0-9_:]") .expect("Unable to compile metrics naming regex, should not happen"); loop { - match receiver.recv().await { + match receiver.try_recv() { Ok(inbound_metric) => { let metric_name = regex_allowed_chars .replace_all(&inbound_metric.name.replace('.', "_"), "") @@ -73,7 +75,12 @@ impl Processor { } } } - Err(_) => panic!("All metric senders were dropped, should not happen"), + Err(TryRecvError::Empty | TryRecvError::Lagged(_)) => { + yield_now().await; + } + Err(TryRecvError::Closed) => { + panic!("All metric senders were dropped, should not happen") + } } } } diff --git a/openmetrics_udpserver/src/serverdensity/aggregator.rs b/openmetrics_udpserver/src/serverdensity/aggregator.rs index 3abf829..e7016fb 100644 --- a/openmetrics_udpserver/src/serverdensity/aggregator.rs +++ b/openmetrics_udpserver/src/serverdensity/aggregator.rs @@ -1,6 +1,5 @@ use crate::processor::InboundMetric; use crate::serverdensity::{AverageHandler, MinHandler, PeakHandler, SumHandler}; -use async_channel::{Receiver, TryRecvError}; use clap::ArgMatches; use openmetrics_udpserver_lib::MetricType; use regex::Regex; @@ -9,6 +8,8 @@ use std::collections::HashMap; use std::fs::File; use std::io::{BufReader, Read}; use std::time::{Duration, SystemTime}; +use tokio::sync::broadcast::error::TryRecvError; +use tokio::sync::broadcast::Receiver; #[derive(Clone)] pub struct ServerDensityConfig { @@ -114,7 +115,7 @@ impl ServerDensityAggregator { } } - pub async fn run(&self, receiver: Receiver) { + pub async fn run(&self, mut receiver: Receiver) { let regex = Regex::new(r"[^0-9a-zA-ZäöüÄÖÜß\-()._]*").expect("failed to compile regex"); let mut metricmap = HashMap::new(); @@ -163,12 +164,12 @@ impl ServerDensityAggregator { ); } } - Err(TryRecvError::Empty) => { - break; - } Err(TryRecvError::Closed) => { panic!("channel disconnected, should never happen."); } + Err(TryRecvError::Empty | TryRecvError::Lagged(_)) => { + break; + } }; } diff --git a/openmetrics_udpserver/src/udp_server.rs b/openmetrics_udpserver/src/udp_server.rs index 734d046..d8dc974 100644 --- a/openmetrics_udpserver/src/udp_server.rs +++ b/openmetrics_udpserver/src/udp_server.rs @@ -1,10 +1,10 @@ use crate::config::Config; use crate::processor::InboundMetric; -use async_channel::Sender; use byteorder::BigEndian; use byteorder::ByteOrder; use openmetrics_udpserver_lib::MetricType; -use std::net::UdpSocket; +use tokio::net::UdpSocket; +use tokio::sync::broadcast::Sender; pub struct UdpServer { config: Config, @@ -20,41 +20,36 @@ impl UdpServer { } pub async fn run(&self) { - let mut udp_socket = - UdpSocket::bind(&self.config.udp_bind).expect("Unable to bind UDP Server"); + let udp_socket = UdpSocket::bind(&self.config.udp_bind) + .await + .expect("Unable to bind UDP Server"); loop { - match self.read(&mut udp_socket) { - Ok(metric) => { - if let Err(err) = self.metric_sender.send(metric).await { - eprintln!("Unable to process inbound metric: {}", err); + if udp_socket.readable().await.is_ok() { + let mut buf = [0; 300]; + if let Ok(read_bytes) = udp_socket.try_recv(&mut buf) { + match self.decode_buffer(&buf, read_bytes) { + Ok(inbound_metric) => { + if let Err(err) = self.metric_sender.send(inbound_metric) { + eprintln!("Unable to process inbound metric: {}", err); + } + } + Err(err) => { + eprintln!("could not decode message from socket: {}", err); + } } } - Err(err) => { - eprintln!("could not read message from socket: {}", err); - } } } } - fn read(&self, socket: &mut UdpSocket) -> Result { - let mut buf = [0; 300]; - let (amt, _) = socket - .recv_from(&mut buf) - .map_err(|_| "Couldn't recv from socket".to_string())?; - - if amt <= 6 { - return Err("UDP Package size is too small".to_string()); - } - - let metric_type = match MetricType::from_u16(BigEndian::read_u16(&buf[0..2])) { + fn decode_buffer(&self, data: &[u8], read_bytes: usize) -> Result { + let metric_type = match MetricType::from_u16(BigEndian::read_u16(&data[0..2])) { Some(m) => m, - None => { - return Err("Got unsupported metric type".to_string()); - } + None => return Err("Got unsupported metric type".to_string()), }; - let count = BigEndian::read_i32(&buf[2..6]); - let name = String::from_utf8_lossy(&buf[6..amt]) + let count = BigEndian::read_i32(&data[2..6]); + let name = String::from_utf8_lossy(&data[6..read_bytes]) .to_string() .replace('"', "");