From 105f9a61c44b7f49b3ec9aec6b07e379f02617d6 Mon Sep 17 00:00:00 2001 From: Amin Moghaddam Date: Thu, 15 Feb 2024 18:49:55 +0100 Subject: [PATCH] Change schema of requests and responses to support id --- auction-server/src/api/ws.rs | 126 ++++++++++++++++++++--------------- 1 file changed, 71 insertions(+), 55 deletions(-) diff --git a/auction-server/src/api/ws.rs b/auction-server/src/api/ws.rs index 7029a75c..334a4b14 100644 --- a/auction-server/src/api/ws.rs +++ b/auction-server/src/api/ws.rs @@ -58,7 +58,7 @@ pub struct WsState { } #[derive(Deserialize, Debug, Clone)] -#[serde(tag = "type")] +#[serde(tag = "method", content = "params")] enum ClientMessage { #[serde(rename = "subscribe")] Subscribe { chain_ids: Vec }, @@ -66,11 +66,17 @@ enum ClientMessage { Unsubscribe { chain_ids: Vec }, } +#[derive(Deserialize, Debug, Clone)] +struct ClientRequest { + id: String, + #[serde(flatten)] + msg: ClientMessage, +} + +/// This enum is used to send an update to the client for any subscriptions made #[derive(Serialize, Clone)] #[serde(tag = "type")] -enum ServerMessage { - #[serde(rename = "response")] - Response(ServerResponseMessage), +enum ServerUpdateResponse { #[serde(rename = "new_opportunity")] NewOpportunity { opportunity: OpportunityParamsWithMetadata, @@ -78,12 +84,21 @@ enum ServerMessage { } #[derive(Serialize, Debug, Clone)] -#[serde(tag = "status")] -enum ServerResponseMessage { +#[serde(tag = "status", content = "result")] +enum ServerResultMessage { #[serde(rename = "success")] Success, #[serde(rename = "error")] - Err { error: String }, + Err(String), +} + +/// This enum is used to send the result for a specific client request with the same id +/// id is only None when the client message is invalid +#[derive(Serialize, Debug, Clone)] +struct ServerResultResponse { + id: Option, + #[serde(flatten)] + result: ServerResultMessage, } pub async fn ws_route_handler( @@ -145,12 +160,11 @@ impl Subscriber { sender, chain_ids: HashSet::new(), ping_interval: tokio::time::interval(PING_INTERVAL_DURATION), - exit_check_interval: tokio::time::interval(Duration::from_secs(5)), + exit_check_interval: tokio::time::interval(EXIT_CHECK_INTERVAL), responded_to_ping: true, // We start with true so we don't close the connection immediately } } - #[tracing::instrument(skip(self))] pub async fn run(&mut self) { while !self.closed { if let Err(e) = self.handle_next().await { @@ -200,7 +214,7 @@ impl Subscriber { return Ok(()); } let message = - serde_json::to_string(&ServerMessage::NewOpportunity { opportunity })?; + serde_json::to_string(&ServerUpdateResponse::NewOpportunity { opportunity })?; self.sender.send(message.into()).await?; } } @@ -208,7 +222,6 @@ impl Subscriber { Ok(()) } - #[tracing::instrument(skip(self, message))] async fn handle_client_message(&mut self, message: Message) -> Result<()> { let maybe_client_message = match message { Message::Close(_) => { @@ -216,8 +229,6 @@ impl Subscriber { // list, instead when the Subscriber struct is dropped the channel // to subscribers list will be closed and it will eventually get // removed. - tracing::trace!(id = self.id, "Subscriber Closed Connection."); - // Send the close message to gracefully shut down the connection // Otherwise the client might get an abnormal Websocket closure // error. @@ -225,8 +236,8 @@ impl Subscriber { self.closed = true; return Ok(()); } - Message::Text(text) => serde_json::from_str::(&text), - Message::Binary(data) => serde_json::from_slice::(&data), + Message::Text(text) => serde_json::from_str::(&text), + Message::Binary(data) => serde_json::from_slice::(&data), Message::Ping(_) => { // Axum will send Pong automatically return Ok(()); @@ -237,64 +248,69 @@ impl Subscriber { } }; - match maybe_client_message { + let request_id = match maybe_client_message { Err(e) => { self.sender .send( - serde_json::to_string(&ServerMessage::Response( - ServerResponseMessage::Err { - error: e.to_string(), - }, - ))? + serde_json::to_string(&ServerResultResponse { + id: None, + result: ServerResultMessage::Err(e.to_string()), + })? .into(), ) .await?; return Ok(()); } - Ok(ClientMessage::Subscribe { chain_ids }) => { - let available_chain_ids: Vec<&ChainId> = self.store.chains.keys().collect(); + Ok(ClientRequest { msg, id }) => { + match msg { + ClientMessage::Subscribe { chain_ids } => { + let available_chain_ids: Vec<&ChainId> = self.store.chains.keys().collect(); - let not_found_chain_ids: Vec<&ChainId> = chain_ids - .iter() - .filter(|chain_id| !available_chain_ids.contains(chain_id)) - .collect(); + let not_found_chain_ids: Vec<&ChainId> = chain_ids + .iter() + .filter(|chain_id| !available_chain_ids.contains(chain_id)) + .collect(); - // If there is a single chain id that is not found, we don't subscribe to any of the - // asked correct chain ids and return an error to be more explicit and clear. - if !not_found_chain_ids.is_empty() { - self.sender - .send( - serde_json::to_string(&ServerMessage::Response( - ServerResponseMessage::Err { - error: format!( - "Chain id(s) with id(s) {:?} not found", - not_found_chain_ids - ), - }, - ))? - .into(), - ) - .await?; - return Ok(()); - } else { - self.chain_ids.extend(chain_ids.into_iter()); + // If there is a single chain id that is not found, we don't subscribe to any of the + // asked correct chain ids and return an error to be more explicit and clear. + if !not_found_chain_ids.is_empty() { + self.sender + .send( + serde_json::to_string(&ServerResultResponse { + id: Some(id), + result: ServerResultMessage::Err(format!( + "Chain id(s) with id(s) {:?} not found", + not_found_chain_ids + )), + })? + .into(), + ) + .await?; + return Ok(()); + } else { + self.chain_ids.extend(chain_ids.into_iter()); + } + } + ClientMessage::Unsubscribe { chain_ids } => { + self.chain_ids + .retain(|chain_id| !chain_ids.contains(chain_id)); + } } + id } - Ok(ClientMessage::Unsubscribe { chain_ids }) => { - self.chain_ids - .retain(|chain_id| !chain_ids.contains(chain_id)); - } - } - - + }; self.sender .send( - serde_json::to_string(&ServerMessage::Response(ServerResponseMessage::Success))? - .into(), + serde_json::to_string(&ServerResultResponse { + id: Some(request_id), + result: ServerResultMessage::Success, + })? + .into(), ) .await?; + Ok(()) } }