Skip to content

Commit

Permalink
refactor: remove dead code
Browse files Browse the repository at this point in the history
  • Loading branch information
nickjiang2378 committed Feb 5, 2025
1 parent e708fec commit 98cf758
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 29 deletions.
2 changes: 0 additions & 2 deletions hydro_deploy/core/src/hydroflow_crate/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ pub async fn build_crate_memoized(params: BuildParams) -> Result<&'static BuildO
command.env("CARGO_TARGET_DIR", target_dir);
}

ProgressTracker::println(&format!("Command to be executed: {:?}", command));

let mut spawned = command
.current_dir(&params.src)
.stdout(Stdio::piped())
Expand Down
3 changes: 0 additions & 3 deletions hydro_deploy/core/src/hydroflow_crate/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,6 @@ impl Service for HydroflowCrateService {
*self.server_defns.try_write().unwrap() =
serde_json::from_str(ready_line.trim_start_matches("ready: ")).unwrap();
} else {
ProgressTracker::println(
format!("Did not find ready. Instead found: {:?}", ready_line).as_str(),
);
bail!("expected ready");
}

Expand Down
9 changes: 0 additions & 9 deletions hydro_deploy/core/src/kubernetes/launched_binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,6 @@ use tokio::sync::{mpsc, oneshot};
use crate::util::prioritized_broadcast;
use crate::LaunchedBinary;

// pub struct LaunchedPodBinary {
// stdin_sender: Sender<String>,
// stdout_cli_receivers: Arc<RwLock<Option<tokio::sync::oneshot::Sender<String>>>>,
// stdout_receivers: Arc<RwLock<Vec<Sender<String>>>>,
// stderr_receivers: Arc<RwLock<Vec<Sender<String>>>>,
// }

pub struct LaunchedPodBinary {
stdin_sender: mpsc::UnboundedSender<String>,
stdout_deploy_receivers: Arc<Mutex<Option<oneshot::Sender<String>>>>,
Expand All @@ -33,8 +26,6 @@ pub struct LaunchedPodBinary {
impl LaunchedPodBinary {
pub fn new(mut launched_pod_binary: AttachedProcess, id: String, pod_name: String) -> Self {
// Create streams for stdout and stdin for the running binary in the pod
// let launched_pod_binary_mut = &mut launched_pod_binary;

let launch_binary_out =
tokio_util::io::ReaderStream::new(launched_pod_binary.stdout().unwrap());
let launch_binary_err =
Expand Down
20 changes: 7 additions & 13 deletions hydro_deploy/core/src/kubernetes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl Host for PodHost {
async fn provision(&self, _resource_result: &Arc<ResourceResult>) -> Arc<dyn LaunchedHost> {
if self.launched.get().is_none() {
let client = Client::try_default().await.unwrap();
let pod_id = nanoid!(10, &ALPHABET); // pod names can only contain alphanumeric characters
let pod_id = nanoid!(10, &ALPHABET); // randomly create a pod name, kubernetes restricts names to alphanumeric characters
let pod_name = format!("hydro-{}", pod_id);

// Blank template for a new pod
Expand Down Expand Up @@ -123,7 +123,6 @@ impl Host for PodHost {
}
}
if !found_existing_pod {
ProgressTracker::println(format!("Creating new pod {:?}", pod_name).as_str());
let res = pods.create(&PostParams::default(), &p).await;
match res {
Err(e) => ProgressTracker::println(format!("{:?}", e).as_str()),
Expand All @@ -143,10 +142,6 @@ impl Host for PodHost {
WatchEvent::Added(o) | WatchEvent::Modified(o) => {
let s = o.status.as_ref().expect("status exists on pod");
if s.phase.clone().unwrap_or_default() == "Running" {
ProgressTracker::println(&format!(
"Ready to attach to {}",
o.name_any()
));
break;
}
}
Expand All @@ -163,6 +158,7 @@ impl Host for PodHost {
}
}

// Use the internal IP for communication between pods
let internal_ip = pods
.get_status(pod_name.clone().as_str())
.await
Expand All @@ -179,7 +175,8 @@ impl Host for PodHost {
}))
.unwrap();

// Update apt-get in the pod
// We use lsof later in order to verify that the hydroflow binary has been written successfully to the pod.
// First, update apt-get in the pod
let ap = AttachParams::default()
.stdin(false)
.stdout(false)
Expand All @@ -194,6 +191,7 @@ impl Host for PodHost {
.unwrap();
let update_apt_status = update_apt.take_status().unwrap();

// Verify that no errors occurred with command
match update_apt_status.await {
None => {
ProgressTracker::println("Warning: Command 'apt-get update' failed in pod");
Expand All @@ -220,7 +218,6 @@ impl Host for PodHost {
}
_ => {}
}
ProgressTracker::println("Finished apt install");
}

self.launched.get().unwrap().clone()
Expand Down Expand Up @@ -266,7 +263,6 @@ impl Host for PodHost {
false
}
}
// target_host.as_any().downcast_ref::<PodHost>()
ClientStrategy::InternalTcpPort(_target_host) => true, /* TODO: if I'm on the same cluster, can just return true first */
ClientStrategy::ForwardedTcpPort(_) => true,
}
Expand All @@ -284,7 +280,7 @@ impl LaunchedHost for LaunchedPod {
fn server_config(&self, bind_type: &ServerStrategy) -> ServerBindConfig {
match bind_type {
ServerStrategy::UnixSocket => ServerBindConfig::UnixSocket,
ServerStrategy::InternalTcpPort => ServerBindConfig::TcpPort(self.internal_ip.clone()), /* TODO: change to pod's internal port */
ServerStrategy::InternalTcpPort => ServerBindConfig::TcpPort(self.internal_ip.clone()),
ServerStrategy::ExternalTcpPort(_) => panic!("Cannot bind to external port"),
ServerStrategy::Demux(demux) => {
let mut config_map = HashMap::new();
Expand All @@ -311,7 +307,6 @@ impl LaunchedHost for LaunchedPod {

async fn copy_binary(&self, binary: &BuildOutput) -> Result<()> {
// Create a new pod in the running kubernetes cluster (we assume the user already has one up)
ProgressTracker::println(&format!("Copying binary to pod: {:?}", binary.unique_id));
let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);

Expand Down Expand Up @@ -400,7 +395,6 @@ impl LaunchedHost for LaunchedPod {
args: &[String],
_perf: Option<TracingOptions>,
) -> Result<Box<dyn LaunchedBinary>> {
ProgressTracker::println("Launching binary in Pod");

let client = Client::try_default().await?;
let pods: Api<Pod> = Api::default_namespaced(client);
Expand All @@ -416,8 +410,8 @@ impl LaunchedHost for LaunchedPod {
.stdin(true)
.stdout(true)
.stderr(true);

// Execute binary inside the new pod
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
let launch_binary = match pods.exec(pod_name, args_list, &ap).await {
Ok(exec) => exec,
Err(e) => {
Expand Down
4 changes: 2 additions & 2 deletions hydro_deploy/core/src/ssh.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,8 @@ impl<T: LaunchedSshHost> LaunchedHost for T {
});

let id_clone = id.clone();
// Pull away the first stdout stream into a different "prioritized" channel,
// and send everything else to stdout
// Pull away the first stdout stream (which contains setup info for communication between binaries) into a different "prioritized" channel,
// and send everything else to normal stdout
let (stdout_deploy_receivers, stdout_receivers) =
prioritized_broadcast(FuturesBufReader::new(channel.stream(0)).lines(), move |s| {
ProgressTracker::println(format!("[{id_clone}] {s}"));
Expand Down

0 comments on commit 98cf758

Please sign in to comment.