Skip to content

Commit

Permalink
feat(topolotree): perf improvements and better deploy logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shadaj committed Sep 28, 2023
1 parent 70d53ac commit 5df849b
Show file tree
Hide file tree
Showing 13 changed files with 204 additions and 258 deletions.
5 changes: 3 additions & 2 deletions hydro_cli/src/core/custom_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ impl Service for CustomService {
}
}

async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) {
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
if self.launched_host.is_some() {
return;
return Ok(());
}

let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result);
self.launched_host = Some(launched.await);
Ok(())
}

async fn ready(&mut self) -> Result<()> {
Expand Down
34 changes: 19 additions & 15 deletions hydro_cli/src/core/deployment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::sync::{Arc, Weak};

use anyhow::Result;
use futures::{StreamExt, TryStreamExt};
use tokio::sync::RwLock;

use super::{progress, Host, ResourcePool, ResourceResult, Service};
Expand Down Expand Up @@ -62,22 +63,25 @@ impl Deployment {
.await;

progress::ProgressTracker::with_group("deploy", None, || {
let services_future =
self.services
.iter_mut()
.map(|service: &mut Weak<RwLock<dyn Service>>| async {
service
.upgrade()
.unwrap()
.write()
.await
.deploy(&result)
.await;
});

futures::future::join_all(services_future)
let services_future = self
.services
.iter_mut()
.map(|service: &mut Weak<RwLock<dyn Service>>| async {
service
.upgrade()
.unwrap()
.write()
.await
.deploy(&result)
.await
})
.collect::<Vec<_>>();

futures::stream::iter(services_future)
.buffer_unordered(8)
.try_fold((), |_, _| async { Ok(()) })
})
.await;
.await?;

progress::ProgressTracker::with_group("ready", None, || {
let all_services_ready =
Expand Down
7 changes: 4 additions & 3 deletions hydro_cli/src/core/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,11 @@ impl LaunchedSSHHost for LaunchedComputeEngine {
let mut config = SessionConfiguration::new();
config.set_compress(true);

let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config)).await?;

tokio::time::timeout(Duration::from_secs(15), async move {
let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config))
.await?;

session.handshake().await?;

session
Expand Down
25 changes: 17 additions & 8 deletions hydro_cli/src/core/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ pub struct HydroflowCrate {
/// Configuration for the ports that this service will listen on a port for.
port_to_bind: HashMap<String, ServerStrategy>,

built_binary: Option<JoinHandle<Result<BuiltCrate>>>,
building_binary: Option<JoinHandle<Result<BuiltCrate>>>,
built_binary: Option<BuiltCrate>,
launched_host: Option<Arc<dyn LaunchedHost>>,

/// A map of port names to config for how other services can connect to this one.
Expand Down Expand Up @@ -77,6 +78,7 @@ impl HydroflowCrate {
external_ports,
port_to_server: HashMap::new(),
port_to_bind: HashMap::new(),
building_binary: None,
built_binary: None,
launched_host: None,
server_defns: Arc::new(RwLock::new(HashMap::new())),
Expand Down Expand Up @@ -191,7 +193,7 @@ impl Service for HydroflowCrate {
}

let built = self.build();
self.built_binary = Some(built);
self.building_binary = Some(built);

let mut host = self
.on
Expand All @@ -208,9 +210,9 @@ impl Service for HydroflowCrate {
}
}

async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) {
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()> {
if self.launched_host.is_some() {
return;
return Ok(());
}

ProgressTracker::with_group(
Expand All @@ -221,11 +223,18 @@ impl Service for HydroflowCrate {
None,
|| async {
let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result);
self.launched_host = Some(launched.await);
let launched = host_write.provision(resource_result).await;

let built = self.building_binary.take().unwrap().await??.clone();

launched.copy_binary(built.clone()).await?;

self.built_binary = Some(built);
self.launched_host = Some(launched);
Ok(())
},
)
.await;
.await
}

async fn ready(&mut self) -> Result<()> {
Expand All @@ -242,7 +251,7 @@ impl Service for HydroflowCrate {
|| async {
let launched_host = self.launched_host.as_ref().unwrap();

let built = self.built_binary.take().unwrap().await??.clone();
let built = self.built_binary.as_ref().unwrap().clone();
let args = self.args.as_ref().cloned().unwrap_or_default();

let binary = launched_host
Expand Down
18 changes: 18 additions & 0 deletions hydro_cli/src/core/localhost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,20 @@ pub fn create_broadcast<T: AsyncRead + Send + Unpin + 'static>(
break;
}
}

if let Some(cli_receivers) = weak_cli_receivers.upgrade() {
let cli_receivers = cli_receivers.write().await;
for r in cli_receivers.iter() {
r.close();
}
}

if let Some(receivers) = weak_receivers.upgrade() {
let receivers = receivers.write().await;
for r in receivers.iter() {
r.close();
}
}
});

(cli_receivers, receivers)
Expand Down Expand Up @@ -161,6 +175,10 @@ impl LaunchedHost for LaunchedLocalhost {
}
}

async fn copy_binary(&self, _binary: Arc<(String, Vec<u8>, PathBuf)>) -> Result<()> {
Ok(())
}

async fn launch_binary(
&self,
id: String,
Expand Down
4 changes: 3 additions & 1 deletion hydro_cli/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ pub trait LaunchedHost: Send + Sync {
/// to listen to network connections (such as the IP address to bind to).
fn server_config(&self, strategy: &ServerStrategy) -> ServerBindConfig;

async fn copy_binary(&self, binary: Arc<(String, Vec<u8>, PathBuf)>) -> Result<()>;

async fn launch_binary(
&self,
id: String,
Expand Down Expand Up @@ -186,7 +188,7 @@ pub trait Service: Send + Sync {
fn collect_resources(&mut self, resource_batch: &mut ResourceBatch);

/// Connects to the acquired resources and prepares the service to be launched.
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>);
async fn deploy(&mut self, resource_result: &Arc<ResourceResult>) -> Result<()>;

/// Launches the service, which should start listening for incoming network
/// connections. The service should not start computing at this point.
Expand Down
23 changes: 17 additions & 6 deletions hydro_cli/src/core/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,7 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
LaunchedSSHHost::server_config(self, bind_type)
}

async fn launch_binary(
&self,
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
async fn copy_binary(&self, binary: Arc<(String, Vec<u8>, PathBuf)>) -> Result<()> {
let session = self.open_ssh_session().await?;

let sftp = async_retry(
Expand Down Expand Up @@ -172,6 +167,22 @@ impl<T: LaunchedSSHHost> LaunchedHost for T {
}
drop(sftp);

Ok(())
}

async fn launch_binary(
&self,
id: String,
binary: Arc<(String, Vec<u8>, PathBuf)>,
args: &[String],
) -> Result<Arc<RwLock<dyn LaunchedBinary>>> {
let session = self.open_ssh_session().await?;

let unique_name = &binary.0;

let user = self.ssh_user();
let binary_path = PathBuf::from(format!("/home/{user}/hydro-{unique_name}"));

let channel = ProgressTracker::leaf(
format!("launching binary /home/{user}/hydro-{unique_name}"),
async {
Expand Down
4 changes: 3 additions & 1 deletion hydro_cli/src/core/terraform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ impl TerraformPool {
.current_dir(deployment_folder.path())
.arg("apply")
.arg("-auto-approve")
.arg("-no-color");
.arg("-no-color")
.arg("-parallelism=128");

#[cfg(unix)]
{
Expand Down Expand Up @@ -288,6 +289,7 @@ fn destroy_deployment(deployment_folder: &TempDir) {
.arg("destroy")
.arg("-auto-approve")
.arg("-no-color")
.arg("-parallelism=128")
.stdout(Stdio::piped());

#[cfg(unix)]
Expand Down
37 changes: 24 additions & 13 deletions topolotree/src/latency_measure.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::atomic::AtomicU64;
use std::io::Write;
use std::sync::atomic::{AtomicBool, AtomicU64};
use std::sync::{mpsc, Arc};
use std::thread;
use std::time::Instant;
Expand Down Expand Up @@ -34,10 +35,12 @@ async fn main() {

let atomic_counter = Arc::new(AtomicU64::new(0));
let atomic_borrow = atomic_counter.clone();
let atomic_keep_running = Arc::new(AtomicBool::new(true));
let atomic_keep_running_clone = atomic_keep_running.clone();
let (latency_sender, latency_receiver) = mpsc::channel::<u128>();
thread::spawn(move || {
let printer_thread = thread::spawn(move || {
let mut last_instant = Instant::now();
loop {
while atomic_keep_running_clone.load(std::sync::atomic::Ordering::Relaxed) {
thread::sleep(std::time::Duration::from_millis(100));
let now = Instant::now();
let counter = atomic_borrow.swap(0, std::sync::atomic::Ordering::Relaxed);
Expand All @@ -48,6 +51,8 @@ async fn main() {
while let Ok(latency) = latency_receiver.try_recv() {
println!("latency,{}", latency);
}

std::io::stdout().flush().unwrap()
}
});

Expand All @@ -68,19 +73,17 @@ async fn main() {
let inc_sender = inc_sender.clone();
let latency_sender = latency_sender.clone();
let atomic_counter = atomic_counter.clone();
let keep_running = atomic_keep_running.clone();
tokio::spawn(async move {
#[cfg(debug_assertions)]
let mut count_tracker = HashMap::new();

let mut next_base: u64 = 0;

loop {
let id = ((((next_base % keys_per_partition)
+ (partition_n * keys_per_partition))
/ (num_clients))
* num_clients)
+ i;
next_base += 1;
while keep_running.load(std::sync::atomic::Ordering::Relaxed) {
let id = (partition_n * keys_per_partition)
+ ((((next_base % keys_per_partition) / num_clients) * num_clients) + i);
next_base = next_base.wrapping_add(1);
let increment = rand::random::<bool>();
let change = if increment { 1 } else { -1 };
let start = Instant::now();
Expand All @@ -93,10 +96,12 @@ async fn main() {
{
let count = count_tracker.entry(id).or_insert(0);
*count += change;
assert!(*count == received);
assert_eq!(*count, received);
}

latency_sender.send(start.elapsed().as_micros()).unwrap();
if next_base % 100 == 0 {
latency_sender.send(start.elapsed().as_micros()).unwrap();
}

atomic_counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
Expand All @@ -113,7 +118,7 @@ async fn main() {
continue;
}

if queues[(updated.key % num_clients) as usize]
if queues[((updated.key % keys_per_partition) % num_clients) as usize]
.send(updated.value)
.is_err()
{
Expand All @@ -125,5 +130,11 @@ async fn main() {
let mut line = String::new();
std::io::stdin().read_line(&mut line).unwrap();
assert!(line.starts_with("stop"));

atomic_keep_running.store(false, std::sync::atomic::Ordering::Relaxed);
printer_thread.join().unwrap();

println!("end");

std::process::exit(0);
}
Loading

0 comments on commit 5df849b

Please sign in to comment.