diff --git a/Cargo.lock b/Cargo.lock index d4d39f542cae..563cff2740a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -34,6 +34,18 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "affinity" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763e484feceb7dd021b21c5c6f81aee06b1594a743455ec7efbf72e6355e447b" +dependencies = [ + "cfg-if", + "errno", + "libc", + "num_cpus", +] + [[package]] name = "ahash" version = "0.8.11" @@ -1185,6 +1197,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" name = "gossip_kv" version = "0.1.0" dependencies = [ + "affinity", "clap", "config", "governor", @@ -1929,9 +1942,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.158" +version = "0.2.167" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +checksum = "09d6582e104315a817dff97f75133544b2e094ee22447d2acf4a74e189ba06fc" [[package]] name = "libm" @@ -2244,6 +2257,16 @@ dependencies = [ "libm", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.9", + "libc", +] + [[package]] name = "number_prefix" version = "0.4.0" diff --git a/datastores/gossip_kv/Cargo.toml b/datastores/gossip_kv/Cargo.toml index 37943072a7d1..e404de1b141e 100644 --- a/datastores/gossip_kv/Cargo.toml +++ b/datastores/gossip_kv/Cargo.toml @@ -7,6 +7,7 @@ license = "Apache-2.0" publish = false [dependencies] +affinity = "0.1.2" clap = { version = "4.5.4", features = ["derive", "env"] } config = "0.14.0" governor = "0.7.0" diff --git a/datastores/gossip_kv/load_test_server/server.rs b/datastores/gossip_kv/load_test_server/server.rs index 671a5b80b4e7..fc9f759240d1 100644 --- a/datastores/gossip_kv/load_test_server/server.rs +++ b/datastores/gossip_kv/load_test_server/server.rs @@ -2,15 +2,16 @@ use std::convert::Infallible; use std::num::{NonZeroU32, ParseFloatError}; use std::thread::sleep; use std::time::Duration; - +use affinity::set_thread_affinity; use clap::Parser; use gossip_kv::membership::{MemberDataBuilder, Protocol}; -use gossip_kv::{ClientRequest, GossipMessage}; +use gossip_kv::{ClientRequest, GossipMessage, Key}; use governor::{Quota, RateLimiter}; use lazy_static::lazy_static; -use hydroflow::util::{unbounded_channel, unsync_channel}; +use hydroflow::util::{bounded_channel, unbounded_channel, unsync_channel}; use prometheus::{gather, register_int_counter, Encoder, IntCounter, TextEncoder}; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{Sender, UnboundedSender}; +use tokio::sync::watch::Receiver; use tokio::task; use tracing::{error, info, trace}; use warp::Filter; @@ -20,7 +21,7 @@ type LoadTestAddress = u64; use gossip_kv::server::{server, SeedNode}; use hydroflow::futures::sink::drain; use hydroflow::futures::stream; -use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream; +use hydroflow::tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream}; use hydroflow::tokio_stream::StreamExt; use hydroflow::util::unsync::mpsc::bounded; use lattices::cc_traits::Iter; @@ -61,7 +62,11 @@ fn run_server( seed_nodes: Vec>, opts: Opts, ) { + let (client_input_tx, client_input_rx) = bounded_channel(1000); + std::thread::spawn(move || { + set_thread_affinity(0).unwrap(); + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() @@ -69,7 +74,7 @@ fn run_server( let (gossip_output_tx, mut gossip_output_rx) = unsync_channel(None); - let (gossip_trigger_tx, gossip_trigger_rx) = unbounded_channel(); + let (gossip_trigger_tx, gossip_trigger_rx) = unbounded_channel(); let member_data = MemberDataBuilder::new(server_name.clone()) .add_protocol(Protocol::new("gossip".into(), gossip_address)) @@ -78,30 +83,11 @@ fn run_server( rt.block_on(async { let local = task::LocalSet::new(); - let (client_input_tx, client_input_rx) = bounded(1000); - - let put_throughput = opts.max_set_throughput; - local.spawn_local(async move { - let rate_limiter = RateLimiter::direct(Quota::per_second( - NonZeroU32::new(put_throughput).unwrap(), - )); - loop { - rate_limiter.until_ready().await; - let key = "/usr/table/key".parse().unwrap(); - let request = ClientRequest::Set { - key, - value: "FOOBAR".to_string(), - }; - client_input_tx.send((request, UNKNOWN_ADDRESS)).await.unwrap(); - SETS_SENT.inc(); - } - }); - let gossip_frequency = opts.gossip_frequency; local.spawn_local(async move { loop { tokio::time::sleep(gossip_frequency).await; - gossip_trigger_tx.send(()).unwrap(); + // gossip_trigger_tx.send(()).unwrap(); } }); @@ -134,6 +120,34 @@ fn run_server( local.await }); }); + + std::thread::spawn(move || { + set_thread_affinity(2).unwrap(); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let local = task::LocalSet::new(); + + let put_throughput = opts.max_set_throughput; + local.spawn_local(async move { + let rate_limiter = RateLimiter::direct(Quota::per_second( + NonZeroU32::new(put_throughput).unwrap(), + )); + let key_master : Key = "/usr/table/key".parse().unwrap(); + loop { + rate_limiter.until_ready().await; + let request = ClientRequest::Set { + key: key_master.clone(), + value: "FOOBAR".to_string(), + }; + client_input_tx.send((request, UNKNOWN_ADDRESS)).await.unwrap(); + SETS_SENT.inc(); + } + }); + }); + } struct Switchboard { diff --git a/hydroflow/src/util/mod.rs b/hydroflow/src/util/mod.rs index 4dd2332d4f36..2b744222bb3b 100644 --- a/hydroflow/src/util/mod.rs +++ b/hydroflow/src/util/mod.rs @@ -66,6 +66,12 @@ pub fn unbounded_channel() -> ( (send, recv) } +pub fn bounded_channel(buffer: usize) -> (tokio::sync::mpsc::Sender, tokio_stream::wrappers::ReceiverStream) { + let (send, recv) = tokio::sync::mpsc::channel(buffer); + let recv = tokio_stream::wrappers::ReceiverStream::new(recv); + (send, recv) +} + /// Returns an unsync channel as a (1) sender and (2) receiver `Stream` for use in Hydroflow. pub fn unsync_channel( capacity: Option,