Skip to content

Commit

Permalink
Merge pull request #31 from omnia-network/lorenzo
Browse files Browse the repository at this point in the history
Metrics improvement
  • Loading branch information
ilbertt authored Mar 29, 2024
2 parents f47a0a3 + af16b75 commit 4447c96
Show file tree
Hide file tree
Showing 10 changed files with 1,295 additions and 118 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ serde = "1.0.176"
reqwest = "0.11.7"
tokio = { version = "1.29.1", features = ["full"] }
tracing = "0.1.40"
metrics = "0.22.1"

canister-utils = { path = "src/canister-utils" }
ic-identity = { path = "src/ic-identity" }
1 change: 1 addition & 0 deletions src/gateway-state/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ dashmap = "5.5.3"
tokio = { workspace = true }
tracing = { workspace = true }

metrics = { workspace = true }
canister-utils = { workspace = true }
52 changes: 37 additions & 15 deletions src/gateway-state/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use canister_utils::{ClientKey, IcWsCanisterMessage};
use std::sync::Arc;

use dashmap::{mapref::entry::Entry, DashMap};
use ic_agent::export::Principal;
use std::sync::Arc;
use metrics::gauge;
use tokio::sync::mpsc::Sender;
use tracing::Span;
use tracing::{debug, Span};

use canister_utils::{ClientKey, IcWsCanisterMessage};

/// State of the WS Gateway that can be shared between threads
#[derive(Clone)]
Expand Down Expand Up @@ -59,6 +62,12 @@ impl GatewayState {
span: client_session_span,
},
);

// Increment the number of clients connected to the canister
let clients_connected = poller_state.len();
debug!("Clients connected: {}", clients_connected);
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);
// the poller shall not be started again
None
},
Expand All @@ -74,6 +83,12 @@ impl GatewayState {
},
);
entry.insert(Arc::clone(&poller_state));

// Increment the number of clients connected to the canister
let clients_connected = poller_state.len();
debug!("Clients connected: {}", clients_connected);
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);
// the poller shall be started
Some(poller_state)
},
Expand Down Expand Up @@ -104,6 +119,12 @@ impl GatewayState {
// if this is encountered it might indicate a race condition
unreachable!("Client key not found in poller state");
}

// Decrement the number of clients connected to the canister
let clients_connected = poller_state.len();
debug!("Clients connected: {}", clients_connected);
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);
// even if this is the last client session for the canister, do not remove the canister from the gateway state
// this will be done by the poller task
}
Expand All @@ -115,14 +136,6 @@ impl GatewayState {
// therefore there is no need to do anything else here
}

pub fn get_clients_count(&self, canister_id: CanisterPrincipal) -> usize {
if let Some(poller_state) = self.inner.data.get(&canister_id) {
poller_state.len()
} else {
0
}
}

pub fn get_active_pollers_count(&self) -> usize {
self.inner.data.len()
}
Expand Down Expand Up @@ -155,7 +168,15 @@ impl GatewayState {
// returns 'ClientRemovalResult::Removed' if the client was removed, 'ClientRemovalResult::Vacant' if there was no such client
return {
match poller_state.remove(&client_key) {
Some(_) => ClientRemovalResult::Removed(client_key),
Some(_) => {
// Decrement the number of clients connected to the canister
let clients_connected = poller_state.len();
debug!("Clients connected: {}", clients_connected);
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);

ClientRemovalResult::Removed(client_key)
},
None => ClientRemovalResult::Vacant,
}
};
Expand Down Expand Up @@ -262,14 +283,15 @@ pub type CanisterPrincipal = Principal;

#[cfg(test)]
mod tests {
use tokio::sync::mpsc::{self, Receiver};

use super::*;
use std::{
thread,
time::{Duration, Instant},
};

use tokio::sync::mpsc::{self, Receiver};

use super::*;

#[tokio::test]
async fn should_insert_new_client_channels_and_get_new_poller_state_once() {
let clients_count = 1000;
Expand Down
2 changes: 1 addition & 1 deletion src/ic-websocket-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ opentelemetry = { version = "0.21" }
opentelemetry-otlp = { version = "0.14.0" }
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] }
rand = "0.8"
metrics = "0.22.1"
metrics = { workspace = true }
metrics-exporter-prometheus = "0.13.1"
metrics-util = "0.16.2"

Expand Down
12 changes: 11 additions & 1 deletion src/ic-websocket-gateway/src/canister_poller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use gateway_state::{
CanisterPrincipal, CanisterRemovalResult, ClientSender, GatewayState, PollerState,
};
use ic_agent::{agent::RejectCode, Agent, AgentError};
use metrics::{gauge, histogram};
use std::{sync::Arc, time::Duration};
use tokio::{sync::mpsc::Sender, time::timeout};
use tracing::{error, span, trace, warn, Instrument, Level, Span};
Expand Down Expand Up @@ -77,6 +78,12 @@ impl CanisterPoller {
// this enables to crawl polling iterations in reverse chronological order
polling_iteration_span.follows_from(previous_polling_iteration_span.id());
}

// register the number of active clients
let clients_connected = self.poller_state.len();
gauge!("clients_connected", "canister_id" => self.canister_id.to_string())
.set(clients_connected as f64);

if let Err(e) = self
.poll_and_relay()
.instrument(polling_iteration_span.clone())
Expand Down Expand Up @@ -140,7 +147,10 @@ impl CanisterPoller {
PollingStatus::NoMessagesPolled => (),
}

// compute the amout of time to sleep for before polling again
// record the time it took to poll the canister
let delta = start_polling_instant.elapsed();
histogram!("poller_duration", "canister_id" => self.canister_id.to_string()).record(delta);

let effective_polling_interval =
self.compute_effective_polling_interval(start_polling_instant);
// if no messages are returned or if the queue is fully drained, sleep for 'effective_polling_interval' before polling again
Expand Down
21 changes: 2 additions & 19 deletions src/ic-websocket-gateway/src/client_session_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use canister_utils::{ws_close, CanisterWsCloseArguments, ClientKey, IcWsCanister
use futures_util::StreamExt;
use gateway_state::{CanisterPrincipal, ClientRemovalResult, GatewayState, PollerState};
use ic_agent::Agent;
use metrics::{gauge, histogram};
use metrics::{counter, gauge, histogram};
use std::sync::Arc;
use std::time::Instant;
use tokio::{
Expand Down Expand Up @@ -189,14 +189,9 @@ impl ClientSessionHandler {
client_session_span.in_scope(|| {
debug!("Client session opened");

let canister_id = self.get_canister_id(&client_session);
let client_key = self.get_client_key(&client_session);

// Clients connection metrics
let clients_connected = self.gateway_state.get_clients_count(canister_id);
debug!("Clients connected: {}", clients_connected.to_string());
gauge!("clients_connected", "canister_id" => canister_id.to_string()).set(clients_connected as f64);

counter!("client_connected_count", "client_key" => client_key.to_string()).absolute(1);
// Calculate the time it took to open the connection and record it using the timer started in ws_listener.rs
let delta = self.start_connection_time.elapsed();
histogram!("connection_opening_time", "client_key" => client_key.to_string()).record(delta);
Expand All @@ -215,12 +210,6 @@ impl ClientSessionHandler {
.remove_client(canister_id, client_key.clone());
debug!("Client removed from gateway state");

// Clients connection metrics
let clients_connected = self.gateway_state.get_clients_count(canister_id);
debug!("Clients connected: {}", clients_connected.to_string());
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);

let delta = client_start_session_time.elapsed();
histogram!("connection_duration", "client_key" => client_key.to_string())
.record(delta);
Expand Down Expand Up @@ -256,12 +245,6 @@ impl ClientSessionHandler {
{
debug!("Client removed from gateway state");

// Clients connection metrics
let clients_connected = self.gateway_state.get_clients_count(canister_id);
debug!("Clients connected: {}", clients_connected.to_string());
gauge!("clients_connected", "canister_id" => canister_id.to_string())
.set(clients_connected as f64);

let delta = client_start_session_time.elapsed();
histogram!("connection_duration", "client_key" => client_key.to_string())
.record(delta);
Expand Down
11 changes: 8 additions & 3 deletions src/ic-websocket-gateway/src/gateway_metrics.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use metrics::{describe_gauge, describe_histogram, gauge};
use metrics::{describe_counter, describe_gauge, describe_histogram, gauge};
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::MetricKindMask;
use std::error::Error;
Expand All @@ -9,12 +9,16 @@ use std::time::Duration;
pub fn init_metrics(address: &str) -> Result<(), Box<dyn Error>> {
let builder = PrometheusBuilder::new().with_http_listener(SocketAddr::from_str(address)?);

// Set the idle timeout for counters and histograms to 30 seconds then the metrics are removed from the registry
// Set the idle timeout for counters and histograms to 10 seconds then the metrics are removed from the registry
builder
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(30)))
.idle_timeout(MetricKindMask::ALL, Some(Duration::from_secs(10)))
.install()
.expect("failed to install Prometheus recorder");

describe_counter!(
"client_connected_count",
"Each time that a client connects it emits a point"
);
describe_gauge!(
"clients_connected",
"The number of clients currently connected"
Expand All @@ -32,6 +36,7 @@ pub fn init_metrics(address: &str) -> Result<(), Box<dyn Error>> {
"connection_opening_time",
"The time it takes to open a connection"
);
describe_histogram!("poller_duration", "The time it takes to poll the canister");

gauge!("active_pollers").set(0.0);

Expand Down
Loading

0 comments on commit 4447c96

Please sign in to comment.