From 2298d1f779bc409b4ce9b1e0e1384a57ca4a7cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?19=E5=B9=B4=E6=A2=A6=E9=86=92?= <3949379+getong@users.noreply.github.com> Date: Tue, 26 Dec 2023 08:47:46 +0800 Subject: [PATCH] feat: update metric example to use hyper 1.0 version --- Cargo.lock | 93 ++++++++++++++++------------ examples/metrics/Cargo.toml | 14 +++-- examples/metrics/src/http_service.rs | 81 ++++++++++++++---------- examples/metrics/src/main.rs | 14 ++--- 4 files changed, 113 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 79059727317e..712c524fc41c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -516,7 +516,7 @@ dependencies = [ "http 1.0.0", "http-body 1.0.0", "http-body-util", - "hyper 1.0.1", + "hyper 1.1.0", "hyper-util", "itoa", "matchit", @@ -2173,9 +2173,9 @@ dependencies = [ [[package]] name = "hyper" -version = "1.0.1" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "403f9214f3e703236b221f1a9cd88ec8b4adfa5296de01ab96216361f4692f56" +checksum = "fb5aa53871fc917b1a9ed87b683a5d86db645e23acb32c2e0785a353e522fb75" dependencies = [ "bytes", "futures-channel", @@ -2188,6 +2188,7 @@ dependencies = [ "itoa", "pin-project-lite", "tokio", + "want", ] [[package]] @@ -2232,16 +2233,16 @@ dependencies = [ [[package]] name = "hyper-util" -version = "0.1.1" +version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9ca339002caeb0d159cc6e023dff48e199f081e42fa039895c7c6f38b37f2e9d" +checksum = "bdea9aac0dbe5a9240d68cfd9501e2db94222c6dc06843e06640b9e07f0fdc67" dependencies = [ "bytes", "futures-channel", "futures-util", "http 1.0.0", "http-body 1.0.0", - "hyper 1.0.1", + "hyper 1.1.0", "pin-project-lite", "socket2 0.5.5", "tokio", @@ -3689,12 +3690,16 @@ dependencies = [ name = "metrics-example" version = "0.1.0" dependencies = [ + "bytes", "futures", - "hyper 0.14.27", + "http-body-util", + "hyper 1.1.0", + "hyper-util", "libp2p", "opentelemetry", "opentelemetry-otlp", "opentelemetry_api", + "opentelemetry_sdk", "prometheus-client", "tokio", "tracing", @@ -4072,26 +4077,32 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" dependencies = [ - "opentelemetry_api", - "opentelemetry_sdk", + "futures-core", + "futures-sink", + "indexmap 2.0.0", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", ] [[package]] name = "opentelemetry-otlp" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275" +checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930" dependencies = [ "async-trait", "futures-core", "http 0.2.9", + "opentelemetry", "opentelemetry-proto", "opentelemetry-semantic-conventions", - "opentelemetry_api", "opentelemetry_sdk", "prost", "thiserror", @@ -4101,11 +4112,11 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb" +checksum = "a2e155ce5cc812ea3d1dffbd1539aed653de4bf4882d60e6e04dcf0901d674e1" dependencies = [ - "opentelemetry_api", + "opentelemetry", "opentelemetry_sdk", "prost", "tonic", @@ -4113,9 +4124,9 @@ dependencies = [ [[package]] name = "opentelemetry-semantic-conventions" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" dependencies = [ "opentelemetry", ] @@ -4138,22 +4149,21 @@ dependencies = [ [[package]] name = "opentelemetry_sdk" -version = "0.20.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026" +checksum = "968ba3f2ca03e90e5187f5e4f46c791ef7f2c163ae87789c8ce5f5ca3b7b7de5" dependencies = [ "async-trait", "crossbeam-channel", "futures-channel", "futures-executor", "futures-util", + "glob", "once_cell", - "opentelemetry_api", + "opentelemetry", "ordered-float", "percent-encoding", "rand 0.8.5", - "regex", - "serde_json", "thiserror", "tokio", "tokio-stream", @@ -4161,9 +4171,9 @@ dependencies = [ [[package]] name = "ordered-float" -version = "3.9.2" +version = "4.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" dependencies = [ "num-traits", ] @@ -6048,17 +6058,6 @@ dependencies = [ "valuable", ] -[[package]] -name = "tracing-log" -version = "0.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" -dependencies = [ - "lazy_static", - "log", - "tracing-core", -] - [[package]] name = "tracing-log" version = "0.2.0" @@ -6072,18 +6071,20 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75327c6b667828ddc28f5e3f169036cb793c3f588d83bf0f262a7f062ffed3c8" +checksum = "c67ac25c5407e7b961fafc6f7e9aa5958fd297aada2d20fa2ae1737357e55596" dependencies = [ + "js-sys", "once_cell", "opentelemetry", "opentelemetry_sdk", "smallvec", "tracing", "tracing-core", - "tracing-log 0.1.3", + "tracing-log", "tracing-subscriber", + "web-time", ] [[package]] @@ -6101,7 +6102,7 @@ dependencies = [ "thread_local", "tracing", "tracing-core", - "tracing-log 0.2.0", + "tracing-log", ] [[package]] @@ -6483,6 +6484,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa30049b1c872b72c89866d458eae9f20380ab280ffd1b1e18df2d3e2d98cfe0" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webdriver" version = "0.46.0" diff --git a/examples/metrics/Cargo.toml b/examples/metrics/Cargo.toml index c8f74a17ebda..e3dfae16fa6f 100644 --- a/examples/metrics/Cargo.toml +++ b/examples/metrics/Cargo.toml @@ -10,15 +10,19 @@ release = false [dependencies] futures = "0.3.29" -hyper = { version = "0.14", features = ["server", "tcp", "http1"] } +bytes = "1.5.0" +hyper = { version = "1.1", features = ["full"]} +http-body-util = "0.1.0" +hyper-util = { version = "0.1.2", features = ["full"] } libp2p = { path = "../../libp2p", features = ["tokio", "metrics", "ping", "noise", "identify", "tcp", "yamux", "macros"] } -opentelemetry = { version = "0.20.0", features = ["rt-tokio", "metrics"] } -opentelemetry-otlp = { version = "0.13.0", features = ["metrics"]} +opentelemetry = { version = "0.21.0", features = ["metrics"] } +opentelemetry-otlp = { version = "0.14.0", features = ["metrics"]} opentelemetry_api = "0.20.0" prometheus-client = { workspace = true } +opentelemetry_sdk = {version = "0.21.1", features = ["rt-tokio", "metrics"]} tokio = { version = "1", features = ["full"] } -tracing = "0.1.37" -tracing-opentelemetry = "0.21.0" +tracing = "0.1.40" +tracing-opentelemetry = "0.22.0" tracing-subscriber = { version = "0.3", features = ["env-filter"] } [lints] diff --git a/examples/metrics/src/http_service.rs b/examples/metrics/src/http_service.rs index 8c77d724ea30..ad98551774a6 100644 --- a/examples/metrics/src/http_service.rs +++ b/examples/metrics/src/http_service.rs @@ -18,42 +18,57 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. +use bytes::Bytes; +use http_body_util::Full; use hyper::http::StatusCode; +use hyper::server::conn::http1; use hyper::service::Service; -use hyper::{Body, Method, Request, Response, Server}; +use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; +use hyper_util::rt::TokioIo; use prometheus_client::encoding::text::encode; use prometheus_client::registry::Registry; use std::future::Future; +use std::net::SocketAddr; use std::pin::Pin; use std::sync::{Arc, Mutex}; -use std::task::{Context, Poll}; +use tokio::net::TcpListener; const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0"; pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> { // Serve on localhost. - let addr = ([127, 0, 0, 1], 0).into(); - - let server = Server::bind(&addr).serve(MakeMetricService::new(registry)); - tracing::info!(metrics_server=%format!("http://{}/metrics", server.local_addr())); - if let Err(e) = server.await { - tracing::error!("server error: {}", e); + let addr: SocketAddr = ([127, 0, 0, 1], 0).into(); + tracing::info!(metrics_server=%format!("http://{:?}/metrics", addr)); + let make_metrics_service = MakeMetricService::new(registry); + let listener = TcpListener::bind(addr).await?; + loop { + let (stream, _) = listener.accept().await?; + let io = TokioIo::new(stream); + let make_metrics_service_clone = make_metrics_service.clone(); + tokio::task::spawn(async move { + if let Err(err) = http1::Builder::new() + .serve_connection(io, make_metrics_service_clone) + .await + { + tracing::error!("server error: {}", err); + } + }); } - Ok(()) } +type SharedRegistry = Arc>; + +#[derive(Debug, Clone)] pub(crate) struct MetricService { - reg: Arc>, + reg: SharedRegistry, } -type SharedRegistry = Arc>; - impl MetricService { fn get_reg(&mut self) -> SharedRegistry { Arc::clone(&self.reg) } - fn respond_with_metrics(&mut self) -> Response { - let mut response: Response = Response::default(); + fn respond_with_metrics(&mut self) -> Response> { + let mut response: Response> = Response::default(); response.headers_mut().insert( hyper::header::CONTENT_TYPE, @@ -61,42 +76,44 @@ impl MetricService { ); let reg = self.get_reg(); - encode(&mut response.body_mut(), ®.lock().unwrap()).unwrap(); + let mut inner_str = String::new(); + encode(&mut inner_str, ®.lock().unwrap()).unwrap(); + *response.body_mut() = Full::new(Bytes::from(inner_str)); *response.status_mut() = StatusCode::OK; response } - fn respond_with_404_not_found(&mut self) -> Response { + + fn respond_with_404_not_found(&mut self) -> Response> { Response::builder() .status(StatusCode::NOT_FOUND) - .body("Not found try localhost:[port]/metrics".to_string()) + .body(Full::new(Bytes::from( + "Not found try localhost:[port]/metrics".to_string(), + ))) .unwrap() } } -impl Service> for MetricService { - type Response = Response; +impl Service> for MetricService { + type Response = Response>; type Error = hyper::Error; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, req: Request) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { let req_path = req.uri().path(); let req_method = req.method(); let resp = if (req_method == Method::GET) && (req_path == "/metrics") { // Encode and serve metrics from registry. - self.respond_with_metrics() + self.clone().respond_with_metrics() } else { - self.respond_with_404_not_found() + self.clone().respond_with_404_not_found() }; Box::pin(async { Ok(resp) }) } } +#[derive(Debug, Clone)] pub(crate) struct MakeMetricService { reg: SharedRegistry, } @@ -109,18 +126,14 @@ impl MakeMetricService { } } -impl Service for MakeMetricService { - type Response = MetricService; +impl Service> for MakeMetricService { + type Response = Response>; type Error = hyper::Error; type Future = Pin> + Send>>; - fn poll_ready(&mut self, _: &mut Context) -> Poll> { - Poll::Ready(Ok(())) - } - - fn call(&mut self, _: T) -> Self::Future { + fn call(&self, req: Request) -> Self::Future { let reg = self.reg.clone(); - let fut = async move { Ok(MetricService { reg }) }; + let fut = async move { Ok(MetricService { reg }.call(req).await.unwrap()) }; Box::pin(fut) } } diff --git a/examples/metrics/src/main.rs b/examples/metrics/src/main.rs index 3ab6815cb32d..6b7ad9cedfb2 100644 --- a/examples/metrics/src/main.rs +++ b/examples/metrics/src/main.rs @@ -25,8 +25,7 @@ use libp2p::core::Multiaddr; use libp2p::metrics::{Metrics, Recorder}; use libp2p::swarm::{NetworkBehaviour, SwarmEvent}; use libp2p::{identify, identity, noise, ping, tcp, yamux}; -use opentelemetry::sdk; -use opentelemetry_api::KeyValue; +use opentelemetry::KeyValue; use prometheus_client::registry::Registry; use std::error::Error; use std::time::Duration; @@ -87,13 +86,10 @@ fn setup_tracing() -> Result<(), Box> { let tracer = opentelemetry_otlp::new_pipeline() .tracing() .with_exporter(opentelemetry_otlp::new_exporter().tonic()) - .with_trace_config( - sdk::trace::Config::default().with_resource(sdk::Resource::new(vec![KeyValue::new( - "service.name", - "libp2p", - )])), - ) - .install_batch(opentelemetry::runtime::Tokio)?; + .with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource( + opentelemetry_sdk::Resource::new(vec![KeyValue::new("service.name", "libp2p")]), + )) + .install_batch(opentelemetry_sdk::runtime::Tokio)?; tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().with_filter(EnvFilter::from_default_env()))