Skip to content

Commit

Permalink
feat(topolotree): new implementation and Hydro Deploy setup
Browse files Browse the repository at this point in the history
--


Co-authored-by: zzlk <2418897+zzlk@users.noreply.github.com>
Co-authored-by: Saikrishna Achalla <saikrishnaachalla@Saikrishnas-MacBook-Pro.local>
  • Loading branch information
2 people authored and shadaj committed Sep 18, 2023
1 parent d254e2d commit 71b932b
Show file tree
Hide file tree
Showing 32 changed files with 1,351 additions and 1,003 deletions.
25 changes: 21 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"multiplatform_test",
"pusherator",
"relalg",
"topolotree",
"variadics",
"website_playground",
]
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ nanoid = "0.4.0"
ctrlc = "3.2.5"
nix = "0.26.2"
hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.3.0" }
indicatif = "0.17.3"
indicatif = "0.17.6"
cargo_metadata = "0.15.4"

[dev-dependencies]
2 changes: 1 addition & 1 deletion hydro_cli/hydro/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Deployment(object):

def CustomService(self, on: "Host", external_ports: List[int]) -> "CustomService": ...

def HydroflowCrate(self, src: str, on: "Host", example: Optional[str] = None, profile: Optional[str] = None, features: Optional[List[str]] = None, args: Optional[List[str]] = None, display_id: Optional[str] = None, external_ports: Optional[List[int]] = None) -> "HydroflowCrate": ...
def HydroflowCrate(self, src: str, on: "Host", bin: Optional[str] = None, example: Optional[str] = None, profile: Optional[str] = None, features: Optional[List[str]] = None, args: Optional[List[str]] = None, display_id: Optional[str] = None, external_ports: Optional[List[int]] = None) -> "HydroflowCrate": ...

async def deploy(self): ...

Expand Down
4 changes: 3 additions & 1 deletion hydro_cli/src/core/custom_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ impl Service for CustomService {
Ok(())
}

async fn start(&mut self) {}
async fn start(&mut self) -> Result<()> {
Ok(())
}

async fn stop(&mut self) -> Result<()> {
Ok(())
Expand Down
18 changes: 10 additions & 8 deletions hydro_cli/src/core/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct Deployment {

impl Deployment {
pub async fn deploy(&mut self) -> Result<()> {
progress::ProgressTracker::with_group("deploy", || async {
progress::ProgressTracker::with_group("deploy", None, || async {
let mut resource_batch = super::ResourceBatch::new();
let active_services = self
.services
Expand All @@ -41,7 +41,7 @@ impl Deployment {
}

let result = Arc::new(
progress::ProgressTracker::with_group("provision", || async {
progress::ProgressTracker::with_group("provision", None, || async {
resource_batch
.provision(&mut self.resource_pool, self.last_resource_result.clone())
.await
Expand All @@ -50,7 +50,7 @@ impl Deployment {
);
self.last_resource_result = Some(result.clone());

progress::ProgressTracker::with_group("provision", || {
progress::ProgressTracker::with_group("provision", None, || {
let hosts_provisioned =
self.hosts
.iter_mut()
Expand All @@ -61,7 +61,7 @@ impl Deployment {
})
.await;

progress::ProgressTracker::with_group("deploy", || {
progress::ProgressTracker::with_group("deploy", None, || {
let services_future =
self.services
.iter_mut()
Expand All @@ -79,7 +79,7 @@ impl Deployment {
})
.await;

progress::ProgressTracker::with_group("ready", || {
progress::ProgressTracker::with_group("ready", None, || {
let all_services_ready =
self.services
.iter()
Expand All @@ -97,7 +97,7 @@ impl Deployment {
.await
}

pub async fn start(&mut self) {
pub async fn start(&mut self) -> Result<()> {
let active_services = self
.services
.iter()
Expand All @@ -110,10 +110,12 @@ impl Deployment {
self.services
.iter()
.map(|service: &Weak<RwLock<dyn Service>>| async {
service.upgrade().unwrap().write().await.start().await;
service.upgrade().unwrap().write().await.start().await?;
Ok(()) as Result<()>
});

futures::future::join_all(all_services_start).await;
futures::future::try_join_all(all_services_start).await?;
Ok(())
}

pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(
Expand Down
67 changes: 38 additions & 29 deletions hydro_cli/src/core/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,35 +92,44 @@ impl LaunchedSSHHost for LaunchedComputeEngine {
22,
);

let res = ProgressTracker::leaf(
format!(
"connecting to host @ {}",
self.external_ip.as_ref().unwrap()
),
async_retry(
&|| async {
let mut config = SessionConfiguration::new();
config.set_compress(true);

let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config)).await?;

session.handshake().await?;

session
.userauth_pubkey_file(
self.user.as_str(),
None,
self.ssh_key_path().as_path(),
None,
)
.await?;

Ok(session)
},
10,
Duration::from_secs(1),
),
let mut attempt_count = 0;

let res = async_retry(
|| {
attempt_count += 1;
ProgressTracker::leaf(
format!(
"connecting to host @ {} (attempt: {})",
self.external_ip.as_ref().unwrap(),
attempt_count
),
async {
let mut config = SessionConfiguration::new();
config.set_compress(true);

let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config)).await?;

tokio::time::timeout(Duration::from_secs(15), async move {
session.handshake().await?;

session
.userauth_pubkey_file(
self.user.as_str(),
None,
self.ssh_key_path().as_path(),
None,
)
.await?;

Ok(session)
})
.await?
},
)
},
10,
Duration::from_secs(1),
)
.await?;

Expand Down
7 changes: 7 additions & 0 deletions hydro_cli/src/core/hydroflow_crate/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type CacheKey = (
PathBuf,
Option<String>,
Option<String>,
Option<String>,
HostTargetType,
Option<Vec<String>>,
);
Expand All @@ -27,13 +28,15 @@ static BUILDS: Lazy<Mutex<HashMap<CacheKey, Arc<OnceCell<BuiltCrate>>>>> =

pub async fn build_crate(
src: PathBuf,
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
target_type: HostTargetType,
features: Option<Vec<String>>,
) -> Result<BuiltCrate> {
let key = (
src.clone(),
bin.clone(),
example.clone(),
profile.clone(),
target_type,
Expand All @@ -56,6 +59,10 @@ pub async fn build_crate(
profile.unwrap_or("release".to_string()),
]);

if let Some(bin) = bin.as_ref() {
command.args(["--bin", bin]);
}

if let Some(example) = example.as_ref() {
command.args(["--example", example]);
}
Expand Down
32 changes: 29 additions & 3 deletions hydro_cli/src/core/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub struct HydroflowCrate {
id: usize,
src: PathBuf,
on: Arc<RwLock<dyn Host>>,
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
features: Option<Vec<String>>,
Expand Down Expand Up @@ -55,6 +56,7 @@ impl HydroflowCrate {
id: usize,
src: PathBuf,
on: Arc<RwLock<dyn Host>>,
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
features: Option<Vec<String>>,
Expand All @@ -65,6 +67,7 @@ impl HydroflowCrate {
Self {
id,
src,
bin,
on,
example,
profile,
Expand Down Expand Up @@ -162,6 +165,7 @@ impl HydroflowCrate {

fn build(&mut self) -> JoinHandle<Result<BuiltCrate>> {
let src_cloned = self.src.canonicalize().unwrap();
let bin_cloned = self.bin.clone();
let example_cloned = self.example.clone();
let features_cloned = self.features.clone();
let host = self.on.clone();
Expand All @@ -170,6 +174,7 @@ impl HydroflowCrate {

tokio::task::spawn(build_crate(
src_cloned,
bin_cloned,
example_cloned,
profile_cloned,
target_type,
Expand Down Expand Up @@ -213,6 +218,7 @@ impl Service for HydroflowCrate {
.display_id
.clone()
.unwrap_or_else(|| format!("service/{}", self.id)),
None,
|| async {
let mut host_write = self.on.write().await;
let launched = host_write.provision(resource_result);
Expand All @@ -232,6 +238,7 @@ impl Service for HydroflowCrate {
.display_id
.clone()
.unwrap_or_else(|| format!("service/{}", self.id)),
None,
|| async {
let launched_host = self.launched_host.as_ref().unwrap();

Expand All @@ -256,7 +263,7 @@ impl Service for HydroflowCrate {
let formatted_bind_config = serde_json::to_string(&bind_config).unwrap();

// request stdout before sending config so we don't miss the "ready" response
let stdout_receiver = binary.write().await.stdout().await;
let stdout_receiver = binary.write().await.cli_stdout().await;

binary
.write()
Expand Down Expand Up @@ -286,9 +293,9 @@ impl Service for HydroflowCrate {
.await
}

async fn start(&mut self) {
async fn start(&mut self) -> Result<()> {
if self.started {
return;
return Ok(());
}

let mut sink_ports = HashMap::new();
Expand All @@ -298,6 +305,15 @@ impl Service for HydroflowCrate {

let formatted_defns = serde_json::to_string(&sink_ports).unwrap();

let stdout_receiver = self
.launched_binary
.as_mut()
.unwrap()
.write()
.await
.cli_stdout()
.await;

self.launched_binary
.as_mut()
.unwrap()
Expand All @@ -309,7 +325,17 @@ impl Service for HydroflowCrate {
.await
.unwrap();

let start_ack_line = ProgressTracker::leaf(
"waiting for ack start".to_string(),
tokio::time::timeout(Duration::from_secs(60), stdout_receiver.recv()),
)
.await??;
if !start_ack_line.starts_with("ack start") {
bail!("expected ack start");
}

self.started = true;
Ok(())
}

async fn stop(&mut self) -> Result<()> {
Expand Down
1 change: 0 additions & 1 deletion hydro_cli/src/core/hydroflow_crate/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,6 @@ impl ServerConfig {

ServerConfig::TaggedUnwrap(underlying) => {
let loaded = underlying.load_instantiated(select).await;
dbg!(&loaded);
if let ServerPort::Tagged(underlying, _) = loaded {
*underlying
} else {
Expand Down
Loading

0 comments on commit 71b932b

Please sign in to comment.