Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deps: move libp2p-server and metrics example to use axum 0.7 #5246

Merged
merged 14 commits into from
Apr 11, 2024
Merged
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion examples/metrics/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ release = false

[dependencies]
futures = "0.3.30"
hyper = { version = "0.14", features = ["server", "tcp", "http1"] }
axum = "0.7"
libp2p = { path = "../../libp2p", features = ["tokio", "metrics", "ping", "noise", "identify", "tcp", "yamux", "macros"] }
opentelemetry = { version = "0.22.0", features = ["metrics"] }
opentelemetry-otlp = { version = "0.15.0", features = ["metrics"] }
Expand Down
105 changes: 38 additions & 67 deletions examples/metrics/src/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,45 +18,66 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use hyper::http::StatusCode;
use hyper::service::Service;
use hyper::{Body, Method, Request, Response, Server};
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::Response;
use axum::routing::get;
use axum::Router;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::pin::Pin;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::net::TcpListener;

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";

pub(crate) async fn metrics_server(registry: Registry) -> Result<(), std::io::Error> {
// Serve on localhost.
let addr = ([127, 0, 0, 1], 0).into();

let server = Server::bind(&addr).serve(MakeMetricService::new(registry));
tracing::info!(metrics_server=%format!("http://{}/metrics", server.local_addr()));
if let Err(e) = server.await {
tracing::error!("server error: {}", e);
}
let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
let service = MetricService::new(registry);
let server = Router::new()
.route("/metrics", get(respond_with_metrics))
.fallback(respond_with_404_not_found)
.with_state(service);
let tcp_listener = TcpListener::bind(addr).await.unwrap();
let local_addr = tcp_listener.local_addr()?;
tracing::info!(metrics_server=%format!("http://{}/metrics", local_addr));
axum::serve(tcp_listener, server.into_make_service())
.await
.unwrap();
Ok(())
}

#[derive(Clone)]
pub(crate) struct MetricService {
reg: Arc<Mutex<Registry>>,
}

async fn respond_with_metrics(state: State<MetricService>) -> Response<String> {
state.respond_with_metrics()
}

async fn respond_with_404_not_found(state: State<MetricService>) -> Response<String> {
state.respond_with_404_not_found()
}

type SharedRegistry = Arc<Mutex<Registry>>;

impl MetricService {
fn get_reg(&mut self) -> SharedRegistry {
fn new(registry: Registry) -> Self {
Self {
reg: Arc::new(Mutex::new(registry)),
}
}

fn get_reg(&self) -> SharedRegistry {
Arc::clone(&self.reg)
}
fn respond_with_metrics(&mut self) -> Response<String> {
fn respond_with_metrics(&self) -> Response<String> {
let mut response: Response<String> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
axum::http::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE.try_into().unwrap(),
);

Expand All @@ -67,60 +88,10 @@ impl MetricService {

response
}
fn respond_with_404_not_found(&mut self) -> Response<String> {
fn respond_with_404_not_found(&self) -> Response<String> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body("Not found try localhost:[port]/metrics".to_string())
.unwrap()
}
}

impl Service<Request<Body>> for MetricService {
type Response = Response<String>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
let req_path = req.uri().path();
let req_method = req.method();
let resp = if (req_method == Method::GET) && (req_path == "/metrics") {
// Encode and serve metrics from registry.
self.respond_with_metrics()
} else {
self.respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
}
}

pub(crate) struct MakeMetricService {
reg: SharedRegistry,
}

impl MakeMetricService {
pub(crate) fn new(registry: Registry) -> MakeMetricService {
MakeMetricService {
reg: Arc::new(Mutex::new(registry)),
}
}
}

impl<T> Service<T> for MakeMetricService {
type Response = MetricService;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _: T) -> Self::Future {
let reg = self.reg.clone();
let fut = async move { Ok(MetricService { reg }) };
Box::pin(fut)
}
}
2 changes: 1 addition & 1 deletion misc/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ base64 = "0.21"
clap = { version = "4.5.4", features = ["derive"] }
futures = "0.3"
futures-timer = "3"
hyper = { version = "0.14", features = ["server", "tcp", "http1"] }
axum = "0.7"
libp2p = { workspace = true, features = ["autonat", "dns", "tokio", "noise", "tcp", "yamux", "identify", "kad", "ping", "relay", "metrics", "rsa", "macros", "quic", "websocket"] }
prometheus-client = { workspace = true }
serde = "1.0.197"
Expand Down
110 changes: 41 additions & 69 deletions misc/server/src/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,47 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use hyper::http::StatusCode;
use hyper::service::Service;
use hyper::{Body, Method, Request, Response, Server};
use axum::extract::State;
use axum::http::StatusCode;
use axum::response::Response;
use axum::routing::get;
use axum::Router;
use prometheus_client::encoding::text::encode;
use prometheus_client::registry::Registry;
use std::future::Future;
use std::pin::Pin;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll};
use tokio::net::TcpListener;

const METRICS_CONTENT_TYPE: &str = "application/openmetrics-text;charset=utf-8;version=1.0.0";
pub(crate) async fn metrics_server(
registry: Registry,
metrics_path: String,
) -> Result<(), hyper::Error> {
) -> Result<(), std::io::Error> {
// Serve on localhost.
let addr = ([0, 0, 0, 0], 8888).into();

let server = Server::bind(&addr).serve(MakeMetricService::new(registry, metrics_path.clone()));
tracing::info!(metrics_server=%format!("http://{}{}", server.local_addr(), metrics_path));
server.await?;
let addr: SocketAddr = ([0, 0, 0, 0], 8888).into();
let service = MetricService::new(registry, metrics_path.clone());
let server = Router::new()
.route(&metrics_path, get(respond_with_metrics))
.fallback(respond_with_404_not_found)
.with_state(service);
let tcp_listener = TcpListener::bind(addr).await?;
let local_addr = tcp_listener.local_addr()?;
tracing::info!(metrics_server=%format!("http://{}{}", local_addr, metrics_path));
axum::serve(tcp_listener, server.into_make_service())
.await
.unwrap();
Ok(())
}

async fn respond_with_metrics(state: State<MetricService>) -> Response<String> {
state.respond_with_metrics()
}

async fn respond_with_404_not_found(state: State<MetricService>) -> Response<String> {
state.respond_with_404_not_found()
}

#[derive(Clone)]
pub(crate) struct MetricService {
reg: Arc<Mutex<Registry>>,
metrics_path: String,
Expand All @@ -49,14 +67,21 @@ pub(crate) struct MetricService {
type SharedRegistry = Arc<Mutex<Registry>>;

impl MetricService {
fn get_reg(&mut self) -> SharedRegistry {
fn new(reg: Registry, metrics_path: String) -> Self {
Self {
reg: Arc::new(Mutex::new(reg)),
metrics_path,
}
}

fn get_reg(&self) -> SharedRegistry {
Arc::clone(&self.reg)
}
fn respond_with_metrics(&mut self) -> Response<String> {
fn respond_with_metrics(&self) -> Response<String> {
let mut response: Response<String> = Response::default();

response.headers_mut().insert(
hyper::header::CONTENT_TYPE,
axum::http::header::CONTENT_TYPE,
METRICS_CONTENT_TYPE.try_into().unwrap(),
);

Expand All @@ -67,7 +92,7 @@ impl MetricService {

response
}
fn respond_with_404_not_found(&mut self) -> Response<String> {
fn respond_with_404_not_found(&self) -> Response<String> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(format!(
Expand All @@ -77,56 +102,3 @@ impl MetricService {
.unwrap()
}
}

impl Service<Request<Body>> for MetricService {
type Response = Response<String>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, req: Request<Body>) -> Self::Future {
let req_path = req.uri().path();
let req_method = req.method();
let resp = if (req_method == Method::GET) && (req_path == self.metrics_path) {
// Encode and serve metrics from registry.
self.respond_with_metrics()
} else {
self.respond_with_404_not_found()
};
Box::pin(async { Ok(resp) })
}
}

pub(crate) struct MakeMetricService {
reg: SharedRegistry,
metrics_path: String,
}

impl MakeMetricService {
pub(crate) fn new(registry: Registry, metrics_path: String) -> MakeMetricService {
MakeMetricService {
reg: Arc::new(Mutex::new(registry)),
metrics_path,
}
}
}

impl<T> Service<T> for MakeMetricService {
type Response = MetricService;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn poll_ready(&mut self, _: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, _: T) -> Self::Future {
let reg = self.reg.clone();
let metrics_path = self.metrics_path.clone();
let fut = async move { Ok(MetricService { reg, metrics_path }) };
Box::pin(fut)
}
}
Loading