Skip to content

Commit

Permalink
It may be the infecting writes are not being drained.
Browse files Browse the repository at this point in the history
Bounded Queues
Fixed Node Type:m4.16xlarge
Attempting CPU affinity.
  • Loading branch information
rohitkulshreshtha committed Dec 3, 2024
1 parent 07e074c commit 71c8c77
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 28 deletions.
27 changes: 25 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datastores/gossip_kv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ license = "Apache-2.0"
publish = false

[dependencies]
affinity = "0.1.2"
clap = { version = "4.5.4", features = ["derive", "env"] }
config = "0.14.0"
governor = "0.7.0"
Expand Down
66 changes: 40 additions & 26 deletions datastores/gossip_kv/load_test_server/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ use std::convert::Infallible;
use std::num::{NonZeroU32, ParseFloatError};
use std::thread::sleep;
use std::time::Duration;

use affinity::set_thread_affinity;
use clap::Parser;
use gossip_kv::membership::{MemberDataBuilder, Protocol};
use gossip_kv::{ClientRequest, GossipMessage};
use gossip_kv::{ClientRequest, GossipMessage, Key};
use governor::{Quota, RateLimiter};
use lazy_static::lazy_static;
use hydroflow::util::{unbounded_channel, unsync_channel};
use hydroflow::util::{bounded_channel, unbounded_channel, unsync_channel};
use prometheus::{gather, register_int_counter, Encoder, IntCounter, TextEncoder};
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{Sender, UnboundedSender};
use tokio::sync::watch::Receiver;
use tokio::task;
use tracing::{error, info, trace};
use warp::Filter;
Expand All @@ -20,7 +21,7 @@ type LoadTestAddress = u64;
use gossip_kv::server::{server, SeedNode};
use hydroflow::futures::sink::drain;
use hydroflow::futures::stream;
use hydroflow::tokio_stream::wrappers::UnboundedReceiverStream;
use hydroflow::tokio_stream::wrappers::{ReceiverStream, UnboundedReceiverStream};
use hydroflow::tokio_stream::StreamExt;
use hydroflow::util::unsync::mpsc::bounded;
use lattices::cc_traits::Iter;
Expand Down Expand Up @@ -61,15 +62,19 @@ fn run_server(
seed_nodes: Vec<SeedNode<LoadTestAddress>>,
opts: Opts,
) {
let (client_input_tx, client_input_rx) = bounded_channel(1000);

std::thread::spawn(move || {
set_thread_affinity(0).unwrap();

let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let (gossip_output_tx, mut gossip_output_rx) = unsync_channel(None);

let (gossip_trigger_tx, gossip_trigger_rx) = unbounded_channel();
let (gossip_trigger_tx, gossip_trigger_rx) = unbounded_channel();

let member_data = MemberDataBuilder::new(server_name.clone())
.add_protocol(Protocol::new("gossip".into(), gossip_address))
Expand All @@ -78,30 +83,11 @@ fn run_server(
rt.block_on(async {
let local = task::LocalSet::new();

let (client_input_tx, client_input_rx) = bounded(1000);

let put_throughput = opts.max_set_throughput;
local.spawn_local(async move {
let rate_limiter = RateLimiter::direct(Quota::per_second(
NonZeroU32::new(put_throughput).unwrap(),
));
loop {
rate_limiter.until_ready().await;
let key = "/usr/table/key".parse().unwrap();
let request = ClientRequest::Set {
key,
value: "FOOBAR".to_string(),
};
client_input_tx.send((request, UNKNOWN_ADDRESS)).await.unwrap();
SETS_SENT.inc();
}
});

let gossip_frequency = opts.gossip_frequency;
local.spawn_local(async move {
loop {
tokio::time::sleep(gossip_frequency).await;
gossip_trigger_tx.send(()).unwrap();
// gossip_trigger_tx.send(()).unwrap();
}
});

Expand Down Expand Up @@ -134,6 +120,34 @@ fn run_server(
local.await
});
});

std::thread::spawn(move || {
set_thread_affinity(2).unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();

let local = task::LocalSet::new();

let put_throughput = opts.max_set_throughput;
local.spawn_local(async move {
let rate_limiter = RateLimiter::direct(Quota::per_second(
NonZeroU32::new(put_throughput).unwrap(),
));
let key_master : Key = "/usr/table/key".parse().unwrap();
loop {
rate_limiter.until_ready().await;
let request = ClientRequest::Set {
key: key_master.clone(),
value: "FOOBAR".to_string(),
};
client_input_tx.send((request, UNKNOWN_ADDRESS)).await.unwrap();
SETS_SENT.inc();
}
});
});

}

struct Switchboard {
Expand Down
6 changes: 6 additions & 0 deletions hydroflow/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ pub fn unbounded_channel<T>() -> (
(send, recv)
}

pub fn bounded_channel<T>(buffer: usize) -> (tokio::sync::mpsc::Sender<T>, tokio_stream::wrappers::ReceiverStream<T>) {
let (send, recv) = tokio::sync::mpsc::channel(buffer);
let recv = tokio_stream::wrappers::ReceiverStream::new(recv);
(send, recv)
}

/// Returns an unsync channel as a (1) sender and (2) receiver `Stream` for use in Hydroflow.
pub fn unsync_channel<T>(
capacity: Option<NonZeroUsize>,
Expand Down

0 comments on commit 71c8c77

Please sign in to comment.