diff --git a/hydro_deploy/core/src/hydroflow_crate/build.rs b/hydro_deploy/core/src/hydroflow_crate/build.rs index 9384c820f7fa..102f293c5175 100644 --- a/hydro_deploy/core/src/hydroflow_crate/build.rs +++ b/hydro_deploy/core/src/hydroflow_crate/build.rs @@ -132,8 +132,6 @@ pub async fn build_crate_memoized(params: BuildParams) -> Result<&'static BuildO command.env("CARGO_TARGET_DIR", target_dir); } - ProgressTracker::println(&format!("Command to be executed: {:?}", command)); - let mut spawned = command .current_dir(¶ms.src) .stdout(Stdio::piped()) diff --git a/hydro_deploy/core/src/hydroflow_crate/service.rs b/hydro_deploy/core/src/hydroflow_crate/service.rs index 544917cb02b3..bbaf286b8f0e 100644 --- a/hydro_deploy/core/src/hydroflow_crate/service.rs +++ b/hydro_deploy/core/src/hydroflow_crate/service.rs @@ -267,9 +267,6 @@ impl Service for HydroflowCrateService { *self.server_defns.try_write().unwrap() = serde_json::from_str(ready_line.trim_start_matches("ready: ")).unwrap(); } else { - ProgressTracker::println( - format!("Did not find ready. Instead found: {:?}", ready_line).as_str(), - ); bail!("expected ready"); } diff --git a/hydro_deploy/core/src/kubernetes/launched_binary.rs b/hydro_deploy/core/src/kubernetes/launched_binary.rs index 5e128883f254..0599304165a2 100644 --- a/hydro_deploy/core/src/kubernetes/launched_binary.rs +++ b/hydro_deploy/core/src/kubernetes/launched_binary.rs @@ -14,13 +14,6 @@ use tokio::sync::{mpsc, oneshot}; use crate::util::prioritized_broadcast; use crate::LaunchedBinary; -// pub struct LaunchedPodBinary { -// stdin_sender: Sender, -// stdout_cli_receivers: Arc>>>, -// stdout_receivers: Arc>>>, -// stderr_receivers: Arc>>>, -// } - pub struct LaunchedPodBinary { stdin_sender: mpsc::UnboundedSender, stdout_deploy_receivers: Arc>>>, @@ -33,8 +26,6 @@ pub struct LaunchedPodBinary { impl LaunchedPodBinary { pub fn new(mut launched_pod_binary: AttachedProcess, id: String, pod_name: String) -> Self { // Create streams for stdout and stdin for the running binary in the pod - // let launched_pod_binary_mut = &mut launched_pod_binary; - let launch_binary_out = tokio_util::io::ReaderStream::new(launched_pod_binary.stdout().unwrap()); let launch_binary_err = diff --git a/hydro_deploy/core/src/kubernetes/mod.rs b/hydro_deploy/core/src/kubernetes/mod.rs index a29d73bc6db7..543332144b86 100644 --- a/hydro_deploy/core/src/kubernetes/mod.rs +++ b/hydro_deploy/core/src/kubernetes/mod.rs @@ -80,7 +80,7 @@ impl Host for PodHost { async fn provision(&self, _resource_result: &Arc) -> Arc { if self.launched.get().is_none() { let client = Client::try_default().await.unwrap(); - let pod_id = nanoid!(10, &ALPHABET); // pod names can only contain alphanumeric characters + let pod_id = nanoid!(10, &ALPHABET); // randomly create a pod name, kubernetes restricts names to alphanumeric characters let pod_name = format!("hydro-{}", pod_id); // Blank template for a new pod @@ -123,7 +123,6 @@ impl Host for PodHost { } } if !found_existing_pod { - ProgressTracker::println(format!("Creating new pod {:?}", pod_name).as_str()); let res = pods.create(&PostParams::default(), &p).await; match res { Err(e) => ProgressTracker::println(format!("{:?}", e).as_str()), @@ -143,10 +142,6 @@ impl Host for PodHost { WatchEvent::Added(o) | WatchEvent::Modified(o) => { let s = o.status.as_ref().expect("status exists on pod"); if s.phase.clone().unwrap_or_default() == "Running" { - ProgressTracker::println(&format!( - "Ready to attach to {}", - o.name_any() - )); break; } } @@ -163,6 +158,7 @@ impl Host for PodHost { } } + // Use the internal IP for communication between pods let internal_ip = pods .get_status(pod_name.clone().as_str()) .await @@ -179,7 +175,8 @@ impl Host for PodHost { })) .unwrap(); - // Update apt-get in the pod + // We use lsof later in order to verify that the hydroflow binary has been written successfully to the pod. + // First, update apt-get in the pod let ap = AttachParams::default() .stdin(false) .stdout(false) @@ -194,6 +191,7 @@ impl Host for PodHost { .unwrap(); let update_apt_status = update_apt.take_status().unwrap(); + // Verify that no errors occurred with command match update_apt_status.await { None => { ProgressTracker::println("Warning: Command 'apt-get update' failed in pod"); @@ -220,7 +218,6 @@ impl Host for PodHost { } _ => {} } - ProgressTracker::println("Finished apt install"); } self.launched.get().unwrap().clone() @@ -266,7 +263,6 @@ impl Host for PodHost { false } } - // target_host.as_any().downcast_ref::() ClientStrategy::InternalTcpPort(_target_host) => true, /* TODO: if I'm on the same cluster, can just return true first */ ClientStrategy::ForwardedTcpPort(_) => true, } @@ -284,7 +280,7 @@ impl LaunchedHost for LaunchedPod { fn server_config(&self, bind_type: &ServerStrategy) -> ServerBindConfig { match bind_type { ServerStrategy::UnixSocket => ServerBindConfig::UnixSocket, - ServerStrategy::InternalTcpPort => ServerBindConfig::TcpPort(self.internal_ip.clone()), /* TODO: change to pod's internal port */ + ServerStrategy::InternalTcpPort => ServerBindConfig::TcpPort(self.internal_ip.clone()), ServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"), ServerStrategy::Demux(demux) => { let mut config_map = HashMap::new(); @@ -311,7 +307,6 @@ impl LaunchedHost for LaunchedPod { async fn copy_binary(&self, binary: &BuildOutput) -> Result<()> { // Create a new pod in the running kubernetes cluster (we assume the user already has one up) - ProgressTracker::println(&format!("Copying binary to pod: {:?}", binary.unique_id)); let client = Client::try_default().await?; let pods: Api = Api::default_namespaced(client); @@ -400,7 +395,6 @@ impl LaunchedHost for LaunchedPod { args: &[String], _perf: Option, ) -> Result> { - ProgressTracker::println("Launching binary in Pod"); let client = Client::try_default().await?; let pods: Api = Api::default_namespaced(client); @@ -416,8 +410,8 @@ impl LaunchedHost for LaunchedPod { .stdin(true) .stdout(true) .stderr(true); + // Execute binary inside the new pod - tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; let launch_binary = match pods.exec(pod_name, args_list, &ap).await { Ok(exec) => exec, Err(e) => { diff --git a/hydro_deploy/core/src/ssh.rs b/hydro_deploy/core/src/ssh.rs index d6e5aaf0ba2b..546188423b5c 100644 --- a/hydro_deploy/core/src/ssh.rs +++ b/hydro_deploy/core/src/ssh.rs @@ -404,8 +404,8 @@ impl LaunchedHost for T { }); let id_clone = id.clone(); - // Pull away the first stdout stream into a different "prioritized" channel, - // and send everything else to stdout + // Pull away the first stdout stream (which contains setup info for communication between binaries) into a different "prioritized" channel, + // and send everything else to normal stdout let (stdout_deploy_receivers, stdout_receivers) = prioritized_broadcast(FuturesBufReader::new(channel.stream(0)).lines(), move |s| { ProgressTracker::println(format!("[{id_clone}] {s}"));