From e973ba82064ac41626788789ba77f6c947c1ab4f Mon Sep 17 00:00:00 2001 From: Eran Ifrah Date: Sun, 6 Oct 2024 12:11:59 +0300 Subject: [PATCH 1/4] Added new routing option: `SingleNodeRoutingInfo::RandomPrimary` Signed-off-by: Eran Ifrah --- redis/Cargo.toml | 2 +- redis/src/aio/tokio.rs | 1 + redis/src/cluster.rs | 18 +++++++------- redis/src/cluster_async/mod.rs | 6 +++++ redis/src/cluster_routing.rs | 43 ++++++++++++++++++++++++++++++++-- 5 files changed, 58 insertions(+), 12 deletions(-) diff --git a/redis/Cargo.toml b/redis/Cargo.toml index fd79ff079..dc4322c82 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -108,7 +108,7 @@ arcstr = "1.1.5" uuid = { version = "1.6.1", optional = true } [features] -default = ["acl", "streams", "geospatial", "script", "keep-alive"] +default = ["acl", "streams", "geospatial", "script", "keep-alive", "cluster-async", "tls-rustls", "cluster", "tokio-comp", "tokio-rustls-comp"] acl = [] aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/alloc", "futures-util/sink", "tokio/io-util", "tokio-util", "tokio-util/codec", "combine/tokio", "async-trait", "fast-math", "dispose"] geospatial = [] diff --git a/redis/src/aio/tokio.rs b/redis/src/aio/tokio.rs index cb6663225..7c04f6264 100644 --- a/redis/src/aio/tokio.rs +++ b/redis/src/aio/tokio.rs @@ -1,5 +1,6 @@ use super::{AsyncStream, RedisResult, RedisRuntime, SocketAddr}; use async_trait::async_trait; +#[allow(unused_imports)] // silence warning in multiple configuration builds use std::{ future::Future, io, diff --git a/redis/src/cluster.rs b/redis/src/cluster.rs index 5c0702d85..f9c76f516 100644 --- a/redis/src/cluster.rs +++ b/redis/src/cluster.rs @@ -41,14 +41,14 @@ use std::str::FromStr; use std::thread; use std::time::Duration; -use rand::{seq::IteratorRandom, thread_rng, Rng}; +use rand::{seq::IteratorRandom, thread_rng}; use crate::cluster_pipeline::UNROUTABLE_ERROR; use crate::cluster_routing::{ - MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, SlotAddr, + MultipleNodeRoutingInfo, ResponsePolicy, Routable, SingleNodeRoutingInfo, }; use crate::cluster_slotmap::SlotMap; -use crate::cluster_topology::{parse_and_count_slots, SLOT_SIZE}; +use crate::cluster_topology::parse_and_count_slots; use crate::cmd::{cmd, Cmd}; use crate::connection::{ connect, Connection, ConnectionAddr, ConnectionInfo, ConnectionLike, RedisConnectionInfo, @@ -459,12 +459,9 @@ where }; match RoutingInfo::for_routable(cmd) { - Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) => { - let mut rng = thread_rng(); - Ok(addr_for_slot(Route::new( - rng.gen_range(0..SLOT_SIZE), - SlotAddr::Master, - ))?) + Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + | Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::RandomPrimary)) => { + Ok(addr_for_slot(Route::new_random_primary())?) } Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::SpecificNode(route))) => { Ok(addr_for_slot(route)?) @@ -730,6 +727,9 @@ where SingleNodeRoutingInfo::SpecificNode(route) => { self.get_connection(&mut connections, route)? } + SingleNodeRoutingInfo::RandomPrimary => { + self.get_connection(&mut connections, &Route::new_random_primary())? + } SingleNodeRoutingInfo::ByAddress { host, port } => { let address = format!("{host}:{port}"); let conn = self.get_connection_by_addr(&mut connections, &address)?; diff --git a/redis/src/cluster_async/mod.rs b/redis/src/cluster_async/mod.rs index 965a05cf8..be7beb79b 100644 --- a/redis/src/cluster_async/mod.rs +++ b/redis/src/cluster_async/mod.rs @@ -588,6 +588,9 @@ impl From for InternalSingleNodeRouting { SingleNodeRoutingInfo::SpecificNode(route) => { InternalSingleNodeRouting::SpecificNode(route) } + SingleNodeRoutingInfo::RandomPrimary => { + InternalSingleNodeRouting::SpecificNode(Route::new_random_primary()) + } SingleNodeRoutingInfo::ByAddress { host, port } => { InternalSingleNodeRouting::ByAddress(format!("{host}:{port}")) } @@ -620,6 +623,9 @@ fn route_for_pipeline(pipeline: &crate::Pipeline) -> RedisResult> Some(cluster_routing::RoutingInfo::SingleNode( SingleNodeRoutingInfo::SpecificNode(route), )) => Some(route), + Some(cluster_routing::RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary, + )) => Some(Route::new_random_primary()), Some(cluster_routing::RoutingInfo::MultiNode(_)) => None, Some(cluster_routing::RoutingInfo::SingleNode(SingleNodeRoutingInfo::ByAddress { .. diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index 848d4d750..b03e9e3a9 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -66,6 +66,8 @@ pub enum RoutingInfo { pub enum SingleNodeRoutingInfo { /// Route to any node at random Random, + /// Route to any *primary* node + RandomPrimary, /// Route to the node that matches the [Route] SpecificNode(Route), /// Route to the node with the given address. @@ -610,7 +612,13 @@ impl RoutingInfo { .and_then(|x| std::str::from_utf8(x).ok()) .and_then(|x| x.parse::().ok())?; if key_count == 0 { - Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + if is_readonly_cmd(cmd) { + Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + } else { + Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary, + )) + } } else { r.arg_idx(3).map(|key| RoutingInfo::for_key(cmd, key)) } @@ -949,6 +957,18 @@ impl Route { pub fn slot_addr(&self) -> SlotAddr { self.1 } + + /// Returns a new Route for a random primary node + pub fn new_random_primary() -> Self { + Self::new(random_slot(), SlotAddr::Master) + } +} + +/// Choose a random slot from `0..SLOT_SIZE` (excluding) +fn random_slot() -> u16 { + use rand::Rng; + let mut rng = rand::thread_rng(); + rng.gen_range(0..crate::cluster_topology::SLOT_SIZE) } #[cfg(test)] @@ -1096,12 +1116,31 @@ mod tests { cmd("EVAL").arg(r#"redis.call("PING");"#).arg(0), cmd("EVALSHA").arg(r#"redis.call("PING");"#).arg(0), ] { + // EVAL / EVALSHA are expected to be routed to a RandomPrimary assert_eq!( RoutingInfo::for_routable(cmd), - Some(RoutingInfo::SingleNode(SingleNodeRoutingInfo::Random)) + Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary + )) ); } + // FCALL (with 0 keys) is expected to be routed to a random primary node + assert_eq!( + RoutingInfo::for_routable(cmd("FCALL").arg("foo").arg(0)), + Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::RandomPrimary + )) + ); + + // While FCALL with N keys is expected to be routed to a specific node + assert_eq!( + RoutingInfo::for_routable(cmd("FCALL").arg("foo").arg(1).arg("mykey")), + Some(RoutingInfo::SingleNode( + SingleNodeRoutingInfo::SpecificNode(Route::new(slot(b"mykey"), SlotAddr::Master)) + )) + ); + for (cmd, expected) in [ ( cmd("EVAL") From b8ac9dfd28bb495a5a52be0411b3a4dba81a1609 Mon Sep 17 00:00:00 2001 From: Eran Ifrah Date: Sun, 6 Oct 2024 19:51:44 +0300 Subject: [PATCH 2/4] Undo changes to Cargo.toml file Signed-off-by: Eran Ifrah --- redis/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/Cargo.toml b/redis/Cargo.toml index dc4322c82..fd79ff079 100644 --- a/redis/Cargo.toml +++ b/redis/Cargo.toml @@ -108,7 +108,7 @@ arcstr = "1.1.5" uuid = { version = "1.6.1", optional = true } [features] -default = ["acl", "streams", "geospatial", "script", "keep-alive", "cluster-async", "tls-rustls", "cluster", "tokio-comp", "tokio-rustls-comp"] +default = ["acl", "streams", "geospatial", "script", "keep-alive"] acl = [] aio = ["bytes", "pin-project-lite", "futures-util", "futures-util/alloc", "futures-util/sink", "tokio/io-util", "tokio-util", "tokio-util/codec", "combine/tokio", "async-trait", "fast-math", "dispose"] geospatial = [] From 9ffba5aa95c372edb59d419538504f791f34e063 Mon Sep 17 00:00:00 2001 From: Eran Ifrah Date: Mon, 7 Oct 2024 12:30:36 +0300 Subject: [PATCH 3/4] Removed un-needed call to "unused_import" Signed-off-by: Eran Ifrah --- redis/src/aio/tokio.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/redis/src/aio/tokio.rs b/redis/src/aio/tokio.rs index 7c04f6264..cb6663225 100644 --- a/redis/src/aio/tokio.rs +++ b/redis/src/aio/tokio.rs @@ -1,6 +1,5 @@ use super::{AsyncStream, RedisResult, RedisRuntime, SocketAddr}; use async_trait::async_trait; -#[allow(unused_imports)] // silence warning in multiple configuration builds use std::{ future::Future, io, From 21fd6d53a587d7118cda0ad2d36df62966b9abda Mon Sep 17 00:00:00 2001 From: Eran Ifrah Date: Mon, 7 Oct 2024 12:47:03 +0300 Subject: [PATCH 4/4] Moved "use" statement to the top Signed-off-by: Eran Ifrah --- redis/src/cluster_routing.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/redis/src/cluster_routing.rs b/redis/src/cluster_routing.rs index b03e9e3a9..ded53bc35 100644 --- a/redis/src/cluster_routing.rs +++ b/redis/src/cluster_routing.rs @@ -1,3 +1,4 @@ +use rand::Rng; use std::cmp::min; use std::collections::HashMap; @@ -966,7 +967,6 @@ impl Route { /// Choose a random slot from `0..SLOT_SIZE` (excluding) fn random_slot() -> u16 { - use rand::Rng; let mut rng = rand::thread_rng(); rng.gen_range(0..crate::cluster_topology::SLOT_SIZE) }