From 7c514d044f0fb7540c7332c7e8f7e5ae408912fe Mon Sep 17 00:00:00 2001 From: Conrad Irwin Date: Fri, 23 Feb 2024 12:23:15 -0700 Subject: [PATCH] Fix collab (#8298) Co-Authored-By: Marshall We broke it by deploying two servers simultaneously. Release Notes: - N/A Co-authored-by: Marshall --- crates/collab/k8s/collab.template.yml | 2 +- crates/collab/src/api.rs | 8 +++-- crates/collab/src/main.rs | 49 +++++++++++++++++---------- 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/crates/collab/k8s/collab.template.yml b/crates/collab/k8s/collab.template.yml index aea416e47df8e9..624ae0df3a6829 100644 --- a/crates/collab/k8s/collab.template.yml +++ b/crates/collab/k8s/collab.template.yml @@ -58,7 +58,7 @@ spec: - name: ${ZED_SERVICE_NAME} image: "${ZED_IMAGE_ID}" args: - - serve + - serve ${ZED_SERVICE_NAME} ports: - containerPort: 8080 protocol: TCP diff --git a/crates/collab/src/api.rs b/crates/collab/src/api.rs index d3e36a92e4dd88..336db95e5e395b 100644 --- a/crates/collab/src/api.rs +++ b/crates/collab/src/api.rs @@ -25,7 +25,7 @@ use tracing::instrument; pub use extensions::fetch_extensions_from_blob_store_periodically; -pub fn routes(rpc_server: Arc, state: Arc) -> Router { +pub fn routes(rpc_server: Option>, state: Arc) -> Router { Router::new() .route("/user", get(get_authenticated_user)) .route("/users/:id/access_tokens", post(create_access_token)) @@ -136,8 +136,12 @@ async fn trace_panic(panic: Json) -> Result<()> { } async fn get_rpc_server_snapshot( - Extension(rpc_server): Extension>, + Extension(rpc_server): Extension>>, ) -> Result { + let Some(rpc_server) = rpc_server else { + return Err(Error::Internal(anyhow!("rpc server is not available"))); + }; + Ok(ErasedJson::pretty(rpc_server.snapshot().await)) } diff --git a/crates/collab/src/main.rs b/crates/collab/src/main.rs index b3e93bf94d8fd0..45de396165dc94 100644 --- a/crates/collab/src/main.rs +++ b/crates/collab/src/main.rs @@ -28,7 +28,8 @@ async fn main() -> Result<()> { ); } - match args().skip(1).next().as_deref() { + let mut args = args().skip(1); + match args.next().as_deref() { Some("version") => { println!("collab v{} ({})", VERSION, REVISION.unwrap_or("unknown")); } @@ -36,6 +37,8 @@ async fn main() -> Result<()> { run_migrations().await?; } Some("serve") => { + let is_api_only = args.next().is_some_and(|arg| arg == "api"); + let config = envy::from_env::().expect("error loading config"); init_tracing(&config); @@ -46,24 +49,33 @@ async fn main() -> Result<()> { let listener = TcpListener::bind(&format!("0.0.0.0:{}", state.config.http_port)) .expect("failed to bind TCP listener"); - let epoch = state - .db - .create_server(&state.config.zed_environment) - .await?; - let rpc_server = collab::rpc::Server::new(epoch, state.clone(), Executor::Production); - rpc_server.start().await?; + let rpc_server = if !is_api_only { + let epoch = state + .db + .create_server(&state.config.zed_environment) + .await?; + let rpc_server = + collab::rpc::Server::new(epoch, state.clone(), Executor::Production); + rpc_server.start().await?; + + Some(rpc_server) + } else { + None + }; fetch_extensions_from_blob_store_periodically(state.clone(), Executor::Production); - let app = collab::api::routes(rpc_server.clone(), state.clone()) - .merge(collab::rpc::routes(rpc_server.clone())) - .merge( - Router::new() - .route("/", get(handle_root)) - .route("/healthz", get(handle_liveness_probe)) - .merge(collab::api::events::router()) - .layer(Extension(state.clone())), - ); + let mut app = collab::api::routes(rpc_server.clone(), state.clone()); + if let Some(rpc_server) = rpc_server.clone() { + app = app.merge(collab::rpc::routes(rpc_server)) + } + app = app.merge( + Router::new() + .route("/", get(handle_root)) + .route("/healthz", get(handle_liveness_probe)) + .merge(collab::api::events::router()) + .layer(Extension(state.clone())), + ); axum::Server::from_tcp(listener)? .serve(app.into_make_service_with_connect_info::()) @@ -77,7 +89,10 @@ async fn main() -> Result<()> { futures::pin_mut!(sigterm, sigint); futures::future::select(sigterm, sigint).await; tracing::info!("Received interrupt signal"); - rpc_server.teardown(); + + if let Some(rpc_server) = rpc_server { + rpc_server.teardown(); + } }) .await?; }