From 55151e6eada78a10445b53237fe4730aa1bb68b2 Mon Sep 17 00:00:00 2001 From: zajko Date: Wed, 29 Jan 2025 17:03:41 +0100 Subject: [PATCH] Aligning timeouts to reflect real-world scenarios (#399) * Changing error messages in case of node disconnection; * cleaning up unused properties * removing request_limit and the logic attached to that since we don't actually handle multiple in-flight requests to binary-port * Removing the possibility to define "infinite" as a valid retry amount in node client connector since it can lead to deadlocks. That allowed removal of RpcServerConfigTarget, NodeClientConfigTarget, ExponentialBackoffConfigTarget and MaxAttemptsTarget since we don't need custom code for deserialization of the config file. * Added some metrics to track unwanted events (timeouts on connection/sending/receiving data from binary port, detecting response id mismatch) * Changed buckets definitions in RESPONSE_TIME_MS_BUCKETS constant * Added MAX_COMPONENT_STARTUP_TIMEOUT_SECS guard in case one of the components hangs on startup * Making keepalive loop use the standard mechnism of sending messages to gain retries and id-checks * Aligning message_timeout_secs --- Cargo.lock | 24 +- README.md | 4 +- metrics/src/rpc.rs | 39 ++- .../example_configs/EXAMPLE_NCTL_CONFIG.toml | 6 +- .../EXAMPLE_NCTL_POSTGRES_CONFIG.toml | 6 +- .../example_configs/EXAMPLE_NODE_CONFIG.toml | 6 +- .../default_debian_config.toml | 8 +- .../default_rpc_only_config.toml | 4 +- .../default_sse_only_config.toml | 2 +- rpc_sidecar/src/config.rs | 234 +----------------- rpc_sidecar/src/lib.rs | 5 +- rpc_sidecar/src/node_client.rs | 191 +++++++------- sidecar/src/config.rs | 13 +- sidecar/src/run.rs | 27 +- 14 files changed, 194 insertions(+), 375 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7c517db9..f1971d1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -493,7 +493,7 @@ dependencies = [ [[package]] name = "casper-binary-port" version = "1.0.0" -source = "git+https://github.com/casper-network/casper-node.git?branch=dev#b1540891bdbb25d586a209afa4e2f687d397cc66" +source = "git+https://github.com/casper-network/casper-node.git?branch=dev#83ab415edd779746477f77c5d6daf1ec9525f965" dependencies = [ "bincode", "bytes", @@ -699,7 +699,7 @@ dependencies = [ [[package]] name = "casper-types" version = "5.0.0" -source = "git+https://github.com/casper-network/casper-node.git?branch=dev#b1540891bdbb25d586a209afa4e2f687d397cc66" +source = "git+https://github.com/casper-network/casper-node.git?branch=dev#83ab415edd779746477f77c5d6daf1ec9525f965" dependencies = [ "base16", "base64 0.13.1", @@ -865,9 +865,9 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" [[package]] name = "cpufeatures" -version = "0.2.16" +version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16b80225097f2e5ae4e7179dd2266824648f3e2f49d9134d584b76389d31c4c3" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" dependencies = [ "libc", ] @@ -3048,9 +3048,9 @@ dependencies = [ [[package]] name = "native-tls" -version = "0.2.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8614eb2c83d59d1c8cc974dd3f920198647674a0a035e1af1fa58707e317466" +checksum = "0dab59f8e050d5df8e4dd87d9206fb6f65a483e20ac9fda365ade4fab353196c" dependencies = [ "libc", "log", @@ -3240,9 +3240,9 @@ checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" [[package]] name = "openssl" -version = "0.10.68" +version = "0.10.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6174bc48f102d208783c2c84bf931bb75927a617866870de8a4ea85597f871f5" +checksum = "f5e534d133a060a3c19daec1eb3e98ec6f4685978834f2dbadfe2ec215bab64e" dependencies = [ "bitflags 2.8.0", "cfg-if", @@ -3266,9 +3266,9 @@ dependencies = [ [[package]] name = "openssl-probe" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" [[package]] name = "openssl-sys" @@ -5325,9 +5325,9 @@ checksum = "7eec5d1121208364f6793f7d2e222bf75a915c19557537745b195b253dd64217" [[package]] name = "unicode-ident" -version = "1.0.14" +version = "1.0.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adb9e6ca4f869e1180728b7950e35922a7fc6397f7b641499e8f3ef06e50dc83" +checksum = "11cd88e12b17c6494200a9c1b683a04fcac9573ed74cd1b62aeb2727c5592243" [[package]] name = "unicode-normalization" diff --git a/README.md b/README.md index 3dff2da9..95f7cb32 100644 --- a/README.md +++ b/README.md @@ -208,8 +208,8 @@ address = '0.0.0.0:28101' max_message_size_bytes = 4_194_304 request_limit = 3 request_buffer_size = 16 -message_timeout_secs = 30 -client_access_timeout_secs = 2 +message_timeout_secs = 10 +client_access_timeout_secs = 10 [rpc_server.speculative_exec_server] enable_server = true diff --git a/metrics/src/rpc.rs b/metrics/src/rpc.rs index f2334e38..df3690b0 100644 --- a/metrics/src/rpc.rs +++ b/metrics/src/rpc.rs @@ -8,8 +8,8 @@ const RESPONSE_SIZE_BUCKETS: &[f64; 8] = &[ 5e+2_f64, 1e+3_f64, 2e+3_f64, 5e+3_f64, 5e+4_f64, 5e+5_f64, 5e+6_f64, 5e+7_f64, ]; -const RESPONSE_TIME_MS_BUCKETS: &[f64; 8] = &[ - 1_f64, 5_f64, 10_f64, 30_f64, 50_f64, 100_f64, 200_f64, 300_f64, +const RESPONSE_TIME_MS_BUCKETS: &[f64; 9] = &[ + 1_f64, 5_f64, 10_f64, 30_f64, 50_f64, 100_f64, 300_f64, 1000_f64, 3000_f64, ]; static ENDPOINT_CALLS: Lazy = Lazy::new(|| { @@ -24,6 +24,21 @@ static ENDPOINT_CALLS: Lazy = Lazy::new(|| { counter }); +static TIMEOUT_COUNTERS: Lazy = Lazy::new(|| { + let counter = IntCounterVec::new( + Opts::new( + "rpc_server_timeout_counts", + "Counters for how many of the requests failed due to internal timeout", + ), + &["timer"], + ) + .unwrap(); + REGISTRY + .register(Box::new(counter.clone())) + .expect("cannot register metric"); + counter +}); + static RESPONSE_TIMES_MS: Lazy = Lazy::new(|| { let histogram = HistogramVec::new( HistogramOpts { @@ -56,6 +71,18 @@ static RECONNECT_TIMES_MS: Lazy = Lazy::new(|| { histogram }); +static MISMATCHED_IDS: Lazy = Lazy::new(|| { + let counter = IntGauge::new( + "rpc_server_mismatched_ids", + "Number of mismatched ID events observed in responses from binary port", + ) + .expect("rpc_server_mismatched_ids metric can't be created"); + REGISTRY + .register(Box::new(counter.clone())) + .expect("cannot register metric"); + counter +}); + static DISCONNECT_EVENTS: Lazy = Lazy::new(|| { let counter = IntGauge::new( "rpc_server_disconnects", @@ -108,3 +135,11 @@ pub fn register_request_size(method: &str, payload_size: f64) { .with_label_values(&[method]) .observe(payload_size); } + +pub fn register_timeout(timer_name: &str) { + TIMEOUT_COUNTERS.with_label_values(&[timer_name]).inc(); +} + +pub fn register_mismatched_id() { + MISMATCHED_IDS.inc(); +} diff --git a/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml b/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml index 1d7400c8..5797319c 100644 --- a/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml +++ b/resources/example_configs/EXAMPLE_NCTL_CONFIG.toml @@ -18,10 +18,8 @@ cors_origin = "" ip_address = "0.0.0.0" port = 28102 max_message_size_bytes = 4194304 -request_limit = 3 -request_buffer_size = 16 -message_timeout_secs = 30 -client_access_timeout_secs = 2 +message_timeout_secs = 10 +client_access_timeout_secs = 10 keepalive_timeout_ms = 10_000 [rpc_server.node_client.exponential_backoff] diff --git a/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml b/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml index f8413f01..2c55c06c 100644 --- a/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml +++ b/resources/example_configs/EXAMPLE_NCTL_POSTGRES_CONFIG.toml @@ -18,10 +18,8 @@ cors_origin = "" ip_address = "0.0.0.0" port = 28102 max_message_size_bytes = 4194304 -request_limit = 3 -request_buffer_size = 16 -message_timeout_secs = 30 -client_access_timeout_secs = 2 +message_timeout_secs = 10 +client_access_timeout_secs = 10 keepalive_timeout_ms = 10_000 [rpc_server.node_client.exponential_backoff] diff --git a/resources/example_configs/EXAMPLE_NODE_CONFIG.toml b/resources/example_configs/EXAMPLE_NODE_CONFIG.toml index 5637c0b3..a1801798 100644 --- a/resources/example_configs/EXAMPLE_NODE_CONFIG.toml +++ b/resources/example_configs/EXAMPLE_NODE_CONFIG.toml @@ -18,10 +18,8 @@ cors_origin = "" ip_address = "3.20.57.210" port = 7777 max_message_size_bytes = 4194304 -request_limit = 10 -request_buffer_size = 50 -message_timeout_secs = 60 -client_access_timeout_secs = 60 +message_timeout_secs = 10 +client_access_timeout_secs = 10 keepalive_timeout_ms = 10_000 [rpc_server.node_client.exponential_backoff] diff --git a/resources/example_configs/default_debian_config.toml b/resources/example_configs/default_debian_config.toml index c0639b98..5270aab4 100644 --- a/resources/example_configs/default_debian_config.toml +++ b/resources/example_configs/default_debian_config.toml @@ -71,14 +71,10 @@ ip_address = '127.0.0.1' port = 7779 # Maximum size of a message in bytes. max_message_size_bytes = 4_194_304 -# Maximum number of in-flight node requests. -request_limit = 3 -# Number of node requests that can be buffered. -request_buffer_size = 16 # Timeout for a node request in seconds. -message_timeout_secs = 30 +message_timeout_secs = 10 # Timeout specifying how long to wait for binary port client to be available. -client_access_timeout_secs = 2 +client_access_timeout_secs = 10 # The amount of time in milliseconds to wait between sending keepalive requests. keepalive_timeout_ms = 10_000 diff --git a/resources/example_configs/default_rpc_only_config.toml b/resources/example_configs/default_rpc_only_config.toml index 63e53aa0..f33add55 100644 --- a/resources/example_configs/default_rpc_only_config.toml +++ b/resources/example_configs/default_rpc_only_config.toml @@ -76,9 +76,9 @@ request_limit = 3 # Number of node requests that can be buffered. request_buffer_size = 16 # Timeout for a node request in seconds. -message_timeout_secs = 30 +message_timeout_secs = 10 # Timeout specifying how long to wait for binary port client to be available. -client_access_timeout_secs = 2 +client_access_timeout_secs = 10 # The amount of time in milliseconds to wait between sending keepalive requests. keepalive_timeout_ms = 10_000 diff --git a/resources/example_configs/default_sse_only_config.toml b/resources/example_configs/default_sse_only_config.toml index b477133a..5ab0b882 100644 --- a/resources/example_configs/default_sse_only_config.toml +++ b/resources/example_configs/default_sse_only_config.toml @@ -32,7 +32,7 @@ port = 18888 max_concurrent_requests = 50 max_requests_per_second = 50 -[admin_server] +[admin_api_server] enable_server = true port = 18887 max_concurrent_requests = 1 diff --git a/rpc_sidecar/src/config.rs b/rpc_sidecar/src/config.rs index bda13d46..7ac5f730 100644 --- a/rpc_sidecar/src/config.rs +++ b/rpc_sidecar/src/config.rs @@ -1,7 +1,4 @@ -use std::{ - convert::{TryFrom, TryInto}, - net::{IpAddr, Ipv4Addr}, -}; +use std::net::{IpAddr, Ipv4Addr}; use datasize::DataSize; use serde::Deserialize; @@ -22,32 +19,6 @@ const DEFAULT_MAX_BODY_BYTES: u32 = 2_621_440; /// Default CORS origin. const DEFAULT_CORS_ORIGIN: &str = ""; -#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] -// Disallow unknown fields to ensure config files and command-line overrides contain valid keys. -#[serde(deny_unknown_fields)] -pub struct RpcServerConfigTarget { - pub main_server: RpcConfig, - pub speculative_exec_server: Option, - pub node_client: NodeClientConfigTarget, -} - -impl TryFrom for RpcServerConfig { - type Error = FieldParseError; - fn try_from(value: RpcServerConfigTarget) -> Result { - let node_client = value.node_client.try_into().map_err(|e: FieldParseError| { - FieldParseError::ParseError { - field_name: "node_client", - error: e.to_string(), - } - })?; - Ok(RpcServerConfig { - main_server: value.main_server, - speculative_exec_server: value.speculative_exec_server, - node_client, - }) - } -} - #[derive(Error, Debug)] pub enum FieldParseError { #[error("failed to parse field {} with error: {}", .field_name, .error)] @@ -116,10 +87,6 @@ const DEFAULT_MAX_PAYLOAD_SIZE: u32 = 4 * 1024 * 1024; const DEFAULT_MESSAGE_TIMEOUT_SECS: u64 = 30; /// Default timeout for client access. const DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS: u64 = 10; -/// Default request limit. -const DEFAULT_NODE_REQUEST_LIMIT: u16 = 3; -/// Default request buffer size. -const DEFAULT_REQUEST_BUFFER_SIZE: usize = 16; /// Default exponential backoff base delay. const DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS: u64 = 1000; /// Default exponential backoff maximum delay. @@ -128,6 +95,8 @@ const DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS: u64 = 64_000; const DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT: u64 = 2; /// Default keep alive timeout milliseconds. const DEFAULT_KEEPALIVE_TIMEOUT_MS: u64 = 1_000; +/// Default max attempts +const DEFAULT_EXPONENTIAL_BACKOFF_MAX_ATTEMPTS: u32 = 3; /// Node client configuration. #[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] @@ -145,10 +114,6 @@ pub struct NodeClientConfig { /// Timeout specifying how long to wait for binary port client to be available. // Access to the client is synchronized. pub client_access_timeout_secs: u64, - /// Maximum number of in-flight node requests. - pub request_limit: u16, - /// Number of node requests that can be buffered. - pub request_buffer_size: usize, /// The amount of ms to wait between sending keepalive requests. pub keepalive_timeout_ms: u64, /// Configuration for exponential backoff to be used for re-connects. @@ -161,9 +126,7 @@ impl NodeClientConfig { NodeClientConfig { ip_address: DEFAULT_NODE_CONNECT_IP_ADDRESS, port: DEFAULT_NODE_CONNECT_PORT, - request_limit: DEFAULT_NODE_REQUEST_LIMIT, max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, - request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE, message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS, client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS, keepalive_timeout_ms: DEFAULT_KEEPALIVE_TIMEOUT_MS, @@ -171,7 +134,7 @@ impl NodeClientConfig { initial_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS, max_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS, coefficient: DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT, - max_attempts: MaxAttempts::Infinite, + max_attempts: DEFAULT_EXPONENTIAL_BACKOFF_MAX_ATTEMPTS, }, } } @@ -183,9 +146,7 @@ impl NodeClientConfig { NodeClientConfig { ip_address: localhost, port, - request_limit: DEFAULT_NODE_REQUEST_LIMIT, max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, - request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE, message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS, client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS, keepalive_timeout_ms: DEFAULT_KEEPALIVE_TIMEOUT_MS, @@ -193,7 +154,7 @@ impl NodeClientConfig { initial_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_BASE_MS, max_delay_ms: DEFAULT_EXPONENTIAL_BACKOFF_MAX_MS, coefficient: DEFAULT_EXPONENTIAL_BACKOFF_COEFFICIENT, - max_attempts: MaxAttempts::Infinite, + max_attempts: DEFAULT_EXPONENTIAL_BACKOFF_MAX_ATTEMPTS, }, } } @@ -201,14 +162,12 @@ impl NodeClientConfig { /// Creates an instance of `NodeClientConfig` with specified listening port and maximum number /// of reconnection retries. #[cfg(any(feature = "testing", test))] - pub fn new_with_port_and_retries(port: u16, num_of_retries: usize) -> Self { + pub fn new_with_port_and_retries(port: u16, num_of_retries: u32) -> Self { let localhost = IpAddr::V4(Ipv4Addr::LOCALHOST); NodeClientConfig { ip_address: localhost, port, - request_limit: DEFAULT_NODE_REQUEST_LIMIT, max_message_size_bytes: DEFAULT_MAX_PAYLOAD_SIZE, - request_buffer_size: DEFAULT_REQUEST_BUFFER_SIZE, message_timeout_secs: DEFAULT_MESSAGE_TIMEOUT_SECS, client_access_timeout_secs: DEFAULT_CLIENT_ACCESS_TIMEOUT_SECS, keepalive_timeout_ms: DEFAULT_KEEPALIVE_TIMEOUT_MS, @@ -216,7 +175,7 @@ impl NodeClientConfig { initial_delay_ms: 500, max_delay_ms: 3000, coefficient: 3, - max_attempts: MaxAttempts::Finite(num_of_retries), + max_attempts: num_of_retries, }, } } @@ -228,57 +187,6 @@ impl Default for NodeClientConfig { } } -/// Node client configuration. -#[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] -// Disallow unknown fields to ensure config files and command-line overrides contain valid keys. -#[serde(deny_unknown_fields)] -pub struct NodeClientConfigTarget { - /// IP address of the node. - pub ip_address: IpAddr, - /// TCP port of the node - pub port: u16, - /// Maximum size of a message in bytes. - pub max_message_size_bytes: u32, - /// Message transfer timeout in seconds. - pub message_timeout_secs: u64, - /// Timeout specifying how long to wait for binary port client to be available. - // Access to the client is synchronized. - pub client_access_timeout_secs: u64, - /// Maximum number of in-flight node requests. - pub request_limit: u16, - /// Number of node requests that can be buffered. - pub request_buffer_size: usize, - /// The amount of ms to wait between sending keepalive requests. - pub keepalive_timeout_ms: u64, - /// Configuration for exponential backoff to be used for re-connects. - pub exponential_backoff: ExponentialBackoffConfigTarget, -} - -impl TryFrom for NodeClientConfig { - type Error = FieldParseError; - fn try_from(value: NodeClientConfigTarget) -> Result { - let exponential_backoff = - value - .exponential_backoff - .try_into() - .map_err(|e: FieldParseError| FieldParseError::ParseError { - field_name: "exponential_backoff", - error: e.to_string(), - })?; - Ok(NodeClientConfig { - ip_address: value.ip_address, - port: value.port, - request_limit: value.request_limit, - max_message_size_bytes: value.max_message_size_bytes, - request_buffer_size: value.request_buffer_size, - client_access_timeout_secs: value.client_access_timeout_secs, - message_timeout_secs: value.message_timeout_secs, - keepalive_timeout_ms: value.keepalive_timeout_ms, - exponential_backoff, - }) - } -} - /// Exponential backoff configuration for re-connects. #[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] // Disallow unknown fields to ensure config files and command-line overrides contain valid keys. @@ -291,131 +199,5 @@ pub struct ExponentialBackoffConfig { /// The multiplier to apply to the previous delay to get the next delay. pub coefficient: u64, /// Maximum number of connection attempts. - pub max_attempts: MaxAttempts, -} - -/// Exponential backoff configuration for re-connects. -#[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] -// Disallow unknown fields to ensure config files and command-line overrides contain valid keys. -#[serde(deny_unknown_fields)] -pub struct ExponentialBackoffConfigTarget { - /// Initial wait time before the first re-connect attempt. - pub initial_delay_ms: u64, - /// Maximum wait time between re-connect attempts. - pub max_delay_ms: u64, - /// The multiplier to apply to the previous delay to get the next delay. - pub coefficient: u64, - /// Maximum number of re-connect attempts. - pub max_attempts: MaxAttemptsTarget, -} - -impl TryFrom for ExponentialBackoffConfig { - type Error = FieldParseError; - fn try_from(value: ExponentialBackoffConfigTarget) -> Result { - let max_attempts = value - .max_attempts - .try_into() - .map_err(|e: MaxAttemptsError| FieldParseError::ParseError { - field_name: "max_attempts", - error: e.to_string(), - })?; - Ok(ExponentialBackoffConfig { - initial_delay_ms: value.initial_delay_ms, - max_delay_ms: value.max_delay_ms, - coefficient: value.coefficient, - max_attempts, - }) - } -} - -#[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] -pub enum MaxAttempts { - Infinite, - Finite(usize), -} - -impl MaxAttempts { - pub fn can_attempt(&self, current_attempt: usize) -> bool { - match self { - MaxAttempts::Infinite => true, - MaxAttempts::Finite(max_attempts) => *max_attempts >= current_attempt, - } - } -} - -#[derive(Clone, DataSize, Debug, Deserialize, PartialEq, Eq)] -#[serde(untagged)] -pub enum MaxAttemptsTarget { - StringBased(String), - UsizeBased(usize), -} - -impl TryFrom for MaxAttempts { - type Error = MaxAttemptsError; - fn try_from(value: MaxAttemptsTarget) -> Result { - match value { - MaxAttemptsTarget::StringBased(s) => { - if s == "infinite" { - Ok(MaxAttempts::Infinite) - } else { - Err(MaxAttemptsError::UnexpectedValue(s)) - } - } - MaxAttemptsTarget::UsizeBased(u) => { - if u == 0 { - Err(MaxAttemptsError::UnexpectedValue(u.to_string())) - } else { - Ok(MaxAttempts::Finite(u)) - } - } - } - } -} - -#[derive(Error, Debug)] -pub enum MaxAttemptsError { - #[error("Max attempts must be either 'infinite' or a integer > 0. Got: {}", .0)] - UnexpectedValue(String), -} - -#[cfg(test)] -mod tests { - use super::*; - #[test] - fn test_should_deserialize_infinite() { - let json = r#""infinite""#.to_string(); - let deserialized: MaxAttempts = serde_json::from_str::(&json) - .unwrap() - .try_into() - .unwrap(); - assert_eq!(deserialized, MaxAttempts::Infinite); - } - - #[test] - fn test_should_deserialize_finite() { - let json = r#"125"#.to_string(); - let deserialized: MaxAttempts = serde_json::from_str::(&json) - .unwrap() - .try_into() - .unwrap(); - assert_eq!(deserialized, MaxAttempts::Finite(125)); - } - - #[test] - fn test_should_fail_on_other_inputs() { - assert_failing_deserialization(r#""x""#); - assert_failing_deserialization(r#""infiniteee""#); - assert_failing_deserialization(r#""infinite ""#); - assert_failing_deserialization(r#"" infinite""#); - let deserialized = serde_json::from_str::(r#"-1"#); - assert!(deserialized.is_err()); - } - - fn assert_failing_deserialization(input: &str) { - let deserialized: Result = - serde_json::from_str::(input) - .unwrap() - .try_into(); - assert!(deserialized.is_err(), "input = {}", input); - } + pub max_attempts: u32, } diff --git a/rpc_sidecar/src/lib.rs b/rpc_sidecar/src/lib.rs index 3149cad5..2909f3e4 100644 --- a/rpc_sidecar/src/lib.rs +++ b/rpc_sidecar/src/lib.rs @@ -11,8 +11,7 @@ use anyhow::Error; use casper_binary_port::{BinaryRequest, BinaryRequestHeader}; use casper_types::bytesrepr::ToBytes; use casper_types::{bytesrepr, ProtocolVersion}; -pub use config::{FieldParseError, RpcServerConfig, RpcServerConfigTarget}; -pub use config::{NodeClientConfig, RpcConfig}; +pub use config::{FieldParseError, NodeClientConfig, RpcConfig, RpcServerConfig}; use futures::future::BoxFuture; use futures::FutureExt; pub use http_server::run as run_rpc_server; @@ -40,7 +39,7 @@ pub async fn build_rpc_server<'a>( ) -> MaybeRpcServerReturn<'a> { let (node_client, reconnect_loop, keepalive_loop) = FramedNodeClient::new(config.node_client.clone(), maybe_network_name).await?; - let node_client: Arc = Arc::new(node_client); + let node_client: Arc = node_client; let mut futures = Vec::new(); let main_server_config = config.main_server; if main_server_config.enable_server { diff --git a/rpc_sidecar/src/node_client.rs b/rpc_sidecar/src/node_client.rs index 29c067ef..a8d81e76 100644 --- a/rpc_sidecar/src/node_client.rs +++ b/rpc_sidecar/src/node_client.rs @@ -1,12 +1,16 @@ -use crate::{config::MaxAttempts, encode_request, NodeClientConfig, SUPPORTED_PROTOCOL_VERSION}; +use crate::{ + config::ExponentialBackoffConfig, encode_request, NodeClientConfig, SUPPORTED_PROTOCOL_VERSION, +}; use anyhow::Error as AnyhowError; use async_trait::async_trait; use futures::{Future, SinkExt, StreamExt}; -use metrics::rpc::{inc_disconnect, observe_reconnect_time}; +use metrics::rpc::{ + inc_disconnect, observe_reconnect_time, register_mismatched_id, register_timeout, +}; use serde::de::DeserializeOwned; use std::{ convert::{TryFrom, TryInto}, - net::SocketAddr, + net::{IpAddr, SocketAddr}, sync::{ atomic::{AtomicU16, Ordering}, Arc, @@ -39,7 +43,7 @@ use std::{ }; use tokio::{ net::TcpStream, - sync::{futures::Notified, RwLock, RwLockWriteGuard, Semaphore}, + sync::{futures::Notified, RwLock, RwLockWriteGuard}, }; use tracing::{error, field, info, warn}; @@ -697,8 +701,8 @@ pub enum Error { TooManyMismatchedResponses { max: u8 }, #[error("failed to deserialize the original request provided with the response: {0}")] OriginalRequestDeserialization(String), - #[error("failed to deserialize the envelope of a response: {0}")] - EnvelopeDeserialization(String), + #[error("failed to deserialize the envelope of a response")] + EnvelopeDeserialization, #[error("failed to deserialize a response: {0}")] Deserialization(String), #[error("failed to serialize a request: {0}")] @@ -765,6 +769,8 @@ pub enum Error { GasPriceToleranceTooLow, #[error("received v1 transaction for speculative execution")] ReceivedV1Transaction, + #[error("connection to node lost")] + ConnectionLost, } impl Error { @@ -924,7 +930,6 @@ pub struct FramedNodeClient { reconnect: Arc>, shutdown: Arc>, config: NodeClientConfig, - request_limit: Semaphore, current_request_id: Arc, } @@ -934,14 +939,20 @@ impl FramedNodeClient { maybe_network_name: Option, ) -> Result< ( - Self, + Arc, impl Future>, impl Future>, ), AnyhowError, > { let stream = Arc::new(RwLock::new( - Self::connect_with_retries(&config, None).await?, + Self::connect_with_retries( + &config.ip_address, + config.port, + &config.exponential_backoff, + config.max_message_size_bytes, + ) + .await?, )); let shutdown = Notify::::new(); @@ -953,22 +964,15 @@ impl FramedNodeClient { Arc::clone(&shutdown), Arc::clone(&reconnect), ); - - let current_request_id = Arc::new(AtomicU16::new(INITIAL_REQUEST_ID)); - let keepalive_loop = Self::keepalive_loop( - config.clone(), - Arc::clone(&stream), - Arc::clone(¤t_request_id), - ); - - let node_client = Self { + let keepalive_timeout = Duration::from_millis(config.keepalive_timeout_ms); + let node_client = Arc::new(Self { client: Arc::clone(&stream), - request_limit: Semaphore::new(config.request_limit as usize), reconnect, shutdown, config, current_request_id: AtomicU16::new(INITIAL_REQUEST_ID).into(), - }; + }); + let keepalive_loop = Self::keepalive_loop(node_client.clone(), keepalive_timeout); // validate network name if let Some(network_name) = maybe_network_name { @@ -1009,39 +1013,12 @@ impl FramedNodeClient { } async fn keepalive_loop( - config: NodeClientConfig, - client: Arc>>, - current_request_id: Arc, + client: Arc, + keepalive_timeout: Duration, ) -> Result<(), AnyhowError> { - let keepalive_timeout = Duration::from_millis(config.keepalive_timeout_ms); - loop { tokio::time::sleep(keepalive_timeout).await; - - let mut client = client.write().await; - - let next_id = current_request_id.fetch_add(1, Ordering::Relaxed); - let payload = BinaryMessage::new( - encode_request(&BinaryRequest::KeepAliveRequest, next_id) - .expect("should always serialize a request"), - ); - - if tokio::time::timeout( - Duration::from_secs(config.message_timeout_secs), - client.send(payload), - ) - .await - .is_err() - { - continue; - } - - tokio::time::timeout( - Duration::from_secs(config.message_timeout_secs), - client.next(), - ) - .await - .ok(); + client.send_request(BinaryRequest::KeepAliveRequest).await?; } } @@ -1057,8 +1034,10 @@ impl FramedNodeClient { client.send(payload), ) .await - .map_err(|_| Error::RequestFailed("timeout".to_owned()))? - { + .map_err(|_| { + register_timeout("sending_payload"); + Error::RequestFailed("timeout".to_owned()) + })? { return Err(Error::RequestFailed(err.to_string())); }; @@ -1069,7 +1048,8 @@ impl FramedNodeClient { ) .await else { - return Err(Error::RequestFailed("timeout".to_owned())); + register_timeout("receiving_response"); + return Err(Error::ConnectionLost); }; if let Some(response) = maybe_response { @@ -1078,7 +1058,13 @@ impl FramedNodeClient { .map_err(|err| Error::RequestFailed(err.to_string()))? .payload(), ) - .map_err(|err| Error::EnvelopeDeserialization(err.to_string()))?; + .map_err(|err| { + error!( + "Error when deserializing binary port envelope: {}", + err.to_string() + ); + Error::EnvelopeDeserialization + })?; match validate_response(resp, request_id, &self.shutdown) { Ok(response) => return Ok(response), Err(err) if matches!(err, Error::RequestResponseIdMismatch { expected, got } if expected > got) => @@ -1086,12 +1072,16 @@ impl FramedNodeClient { // If our expected ID is greater than the one we received, it means we can // try to recover from the situation by reading more responses from the stream. warn!(%err, "received a response with an outdated id, trying another response"); + register_mismatched_id(); continue; } - Err(err) => return Err(err), + Err(err) => { + register_mismatched_id(); + return Err(err); + } } } else { - return Err(Error::RequestFailed("disconnected".to_owned())); + return Err(Error::ConnectionLost); } } @@ -1111,28 +1101,26 @@ impl FramedNodeClient { } async fn connect_with_retries( - config: &NodeClientConfig, - maybe_max_attempts_override: Option<&MaxAttempts>, + ip_address: &IpAddr, + port: u16, + backoff_config: &ExponentialBackoffConfig, + max_message_size_bytes: u32, ) -> Result, AnyhowError> { - let mut wait = config.exponential_backoff.initial_delay_ms; - let max_attempts = if let Some(attempts) = maybe_max_attempts_override { - attempts - } else { - &config.exponential_backoff.max_attempts - }; - let tcp_socket = SocketAddr::new(config.ip_address, config.port); + let mut wait = backoff_config.initial_delay_ms; + let max_attempts = &backoff_config.max_attempts; + let tcp_socket = SocketAddr::new(*ip_address, port); let mut current_attempt = 1; loop { match TcpStream::connect(tcp_socket).await { Ok(stream) => { return Ok(Framed::new( stream, - BinaryMessageCodec::new(config.max_message_size_bytes), + BinaryMessageCodec::new(max_message_size_bytes), )) } Err(err) => { current_attempt += 1; - if !max_attempts.can_attempt(current_attempt) { + if *max_attempts < current_attempt { anyhow::bail!( "Couldn't connect to node {} after {} attempts", tcp_socket, @@ -1141,8 +1129,7 @@ impl FramedNodeClient { } warn!(%err, "failed to connect to node {tcp_socket}, waiting {wait}ms before retrying"); tokio::time::sleep(Duration::from_millis(wait)).await; - wait = (wait * config.exponential_backoff.coefficient) - .min(config.exponential_backoff.max_delay_ms); + wait = (wait * backoff_config.coefficient).min(backoff_config.max_delay_ms); } }; } @@ -1150,12 +1137,17 @@ impl FramedNodeClient { async fn reconnect_internal( config: &NodeClientConfig, - maybe_max_attempts_override: Option<&MaxAttempts>, ) -> Result, AnyhowError> { let disconnected_start = Instant::now(); inc_disconnect(); error!("node connection closed, will attempt to reconnect"); - let stream = Self::connect_with_retries(config, maybe_max_attempts_override).await?; + let stream = Self::connect_with_retries( + &config.ip_address, + config.port, + &config.exponential_backoff, + config.max_message_size_bytes, + ) + .await?; info!("connection with the node has been re-established"); observe_reconnect_time(disconnected_start.elapsed()); Ok(stream) @@ -1164,27 +1156,13 @@ impl FramedNodeClient { async fn reconnect( config: &NodeClientConfig, ) -> Result, AnyhowError> { - Self::reconnect_internal(config, None).await - } - - async fn reconnect_without_retries( - config: &NodeClientConfig, - ) -> Result, AnyhowError> { - Self::reconnect_internal(config, Some(&MaxAttempts::Finite(1))).await + Self::reconnect_internal(config).await } } #[async_trait] impl NodeClient for FramedNodeClient { async fn send_request(&self, req: BinaryRequest) -> Result { - let _permit = self - .request_limit - .acquire() - .await - .map_err(|err| Error::RequestFailed(err.to_string()))?; - - // TODO: Use queue instead of individual timeouts. Currently it is possible to go pass the - // semaphore and the immediately wait for the client to become available. let mut client = match tokio::time::timeout( Duration::from_secs(self.config.client_access_timeout_secs), self.client.write(), @@ -1192,7 +1170,10 @@ impl NodeClient for FramedNodeClient { .await { Ok(client) => client, - Err(err) => return Err(Error::RequestFailed(err.to_string())), + Err(_) => { + register_timeout("acquiring_client"); + return Err(Error::ConnectionLost); + } }; let result = self.send_request_internal(&req, &mut client).await; @@ -1202,14 +1183,26 @@ impl NodeClient for FramedNodeClient { err = display_error(&err), "binary port client handler error" ); - // attempt to reconnect once in case the node was restarted and connection broke + // attempt to reconnect in case the node was restarted and connection broke client.close().await.ok(); - match Self::reconnect_without_retries(&self.config).await { - Ok(new_client) => { + let ip_address = &self.config.ip_address; + + match tokio::time::timeout( + Duration::from_secs(self.config.client_access_timeout_secs), + Self::connect_with_retries( + ip_address, + self.config.port, + &self.config.exponential_backoff, + self.config.max_message_size_bytes, + ), + ) + .await + { + Ok(Ok(new_client)) => { *client = new_client; return self.send_request_internal(&req, &mut client).await; } - Err(err) => { + Ok(Err(err)) => { warn!( %err, addr = %self.config.ip_address, @@ -1218,7 +1211,16 @@ impl NodeClient for FramedNodeClient { // schedule standard reconnect process with multiple retries // and return a response self.reconnect.notify_one(); - return Err(Error::RequestFailed("disconnected".to_owned())); + return Err(Error::ConnectionLost); + } + Err(_) => { + warn!( + %err, + addr = %self.config.ip_address, + "failed to reestablish connection in timely fashion" + ); + register_timeout("reacquiring_connection"); + return Err(Error::ConnectionLost); } } } @@ -1585,10 +1587,7 @@ mod tests { let err = query_global_state_for_string_value(&mut rng, &c) .await .unwrap_err(); - assert!(matches!( - err, - Error::RequestFailed(e) if e == "disconnected" - )); + assert!(matches!(err, Error::ConnectionLost)); // restart node let mock_server_handle = start_mock_binary_port_responding_with_stored_value( diff --git a/sidecar/src/config.rs b/sidecar/src/config.rs index 4af83a9f..fe958de7 100644 --- a/sidecar/src/config.rs +++ b/sidecar/src/config.rs @@ -3,7 +3,7 @@ use casper_event_sidecar::{ AdminApiServerConfig, DatabaseConfigError, RestApiServerConfig, SseEventServerConfig, StorageConfig, StorageConfigSerdeTarget, }; -use casper_rpc_sidecar::{FieldParseError, RpcServerConfig, RpcServerConfigTarget}; +use casper_rpc_sidecar::{FieldParseError, RpcServerConfig}; use serde::Deserialize; use thiserror::Error; @@ -16,7 +16,7 @@ pub struct SidecarConfigTarget { rest_api_server: Option, admin_api_server: Option, sse_server: Option, - rpc_server: Option, + rpc_server: Option, } #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] @@ -114,15 +114,12 @@ impl TryFrom for SidecarConfig { .map(|target| target.try_into().map(Some)) .unwrap_or(Ok(None)); let storage_config = storage_config_res?; - let rpc_server_config_res: Option> = - value.rpc_server.map(|target| target.try_into()); - let rpc_server_config = invert(rpc_server_config_res)?; Ok(SidecarConfig { max_thread_count: value.max_thread_count, max_blocking_thread_count: value.max_blocking_thread_count, network_name: value.network_name, sse_server: sse_server_config, - rpc_server: rpc_server_config, + rpc_server: value.rpc_server, storage: storage_config, rest_api_server: value.rest_api_server, admin_api_server: value.admin_api_server, @@ -130,10 +127,6 @@ impl TryFrom for SidecarConfig { } } -fn invert(x: Option>) -> Result, E> { - x.map_or(Ok(None), |v| v.map(Some)) -} - #[derive(Error, Debug)] pub enum ConfigReadError { #[error("failed to read sidecar configuration. Underlying reason: {}", .error)] diff --git a/sidecar/src/run.rs b/sidecar/src/run.rs index c44ac177..46063edd 100644 --- a/sidecar/src/run.rs +++ b/sidecar/src/run.rs @@ -2,10 +2,15 @@ use crate::component::*; use crate::config::SidecarConfig; use anyhow::{anyhow, Context, Error}; use casper_event_sidecar::LazyDatabaseWrapper; -use std::process::ExitCode; -use tokio::signal::unix::{signal, SignalKind}; +use std::{process::ExitCode, time::Duration}; +use tokio::{ + signal::unix::{signal, SignalKind}, + time::timeout, +}; use tracing::{error, info}; +const MAX_COMPONENT_STARTUP_TIMEOUT_SECS: u64 = 30; + pub async fn run(config: SidecarConfig) -> Result { let maybe_database = config .storage @@ -50,8 +55,24 @@ async fn do_run( components: Vec>, ) -> Result { let mut component_futures = Vec::new(); + let max_startup_duration = Duration::from_secs(MAX_COMPONENT_STARTUP_TIMEOUT_SECS); for component in components.iter() { - let maybe_future = component.prepare_component_task(&config).await?; + let component_name = component.name(); + let component_startup_res = timeout( + max_startup_duration, + component.prepare_component_task(&config), + ) + .await; + if component_startup_res.is_err() { + return Err(ComponentError::Initialization { + component_name: component_name.clone(), + internal_error: anyhow!( + "Failed to start component {component_name} in {MAX_COMPONENT_STARTUP_TIMEOUT_SECS} [s]" + ), + }); + } + + let maybe_future = component_startup_res.unwrap()?; if let Some(future) = maybe_future { component_futures.push(future); }