Skip to content

Commit

Permalink
Change schema of requests and responses to support id
Browse files Browse the repository at this point in the history
  • Loading branch information
m30m committed Feb 15, 2024
1 parent fc6b871 commit 105f9a6
Showing 1 changed file with 71 additions and 55 deletions.
126 changes: 71 additions & 55 deletions auction-server/src/api/ws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,32 +58,47 @@ pub struct WsState {
}

#[derive(Deserialize, Debug, Clone)]
#[serde(tag = "type")]
#[serde(tag = "method", content = "params")]
enum ClientMessage {
#[serde(rename = "subscribe")]
Subscribe { chain_ids: Vec<ChainId> },
#[serde(rename = "unsubscribe")]
Unsubscribe { chain_ids: Vec<ChainId> },
}

#[derive(Deserialize, Debug, Clone)]
struct ClientRequest {
id: String,
#[serde(flatten)]
msg: ClientMessage,
}

/// This enum is used to send an update to the client for any subscriptions made
#[derive(Serialize, Clone)]
#[serde(tag = "type")]
enum ServerMessage {
#[serde(rename = "response")]
Response(ServerResponseMessage),
enum ServerUpdateResponse {
#[serde(rename = "new_opportunity")]
NewOpportunity {
opportunity: OpportunityParamsWithMetadata,
},
}

#[derive(Serialize, Debug, Clone)]
#[serde(tag = "status")]
enum ServerResponseMessage {
#[serde(tag = "status", content = "result")]
enum ServerResultMessage {
#[serde(rename = "success")]
Success,
#[serde(rename = "error")]
Err { error: String },
Err(String),
}

/// This enum is used to send the result for a specific client request with the same id
/// id is only None when the client message is invalid
#[derive(Serialize, Debug, Clone)]
struct ServerResultResponse {
id: Option<String>,
#[serde(flatten)]
result: ServerResultMessage,
}

pub async fn ws_route_handler(
Expand Down Expand Up @@ -145,12 +160,11 @@ impl Subscriber {
sender,
chain_ids: HashSet::new(),
ping_interval: tokio::time::interval(PING_INTERVAL_DURATION),
exit_check_interval: tokio::time::interval(Duration::from_secs(5)),
exit_check_interval: tokio::time::interval(EXIT_CHECK_INTERVAL),
responded_to_ping: true, // We start with true so we don't close the connection immediately
}
}

#[tracing::instrument(skip(self))]
pub async fn run(&mut self) {
while !self.closed {
if let Err(e) = self.handle_next().await {
Expand Down Expand Up @@ -200,33 +214,30 @@ impl Subscriber {
return Ok(());
}
let message =
serde_json::to_string(&ServerMessage::NewOpportunity { opportunity })?;
serde_json::to_string(&ServerUpdateResponse::NewOpportunity { opportunity })?;
self.sender.send(message.into()).await?;
}
}

Ok(())
}

#[tracing::instrument(skip(self, message))]
async fn handle_client_message(&mut self, message: Message) -> Result<()> {
let maybe_client_message = match message {
Message::Close(_) => {
// Closing the connection. We don't remove it from the subscribers
// list, instead when the Subscriber struct is dropped the channel
// to subscribers list will be closed and it will eventually get
// removed.
tracing::trace!(id = self.id, "Subscriber Closed Connection.");

// Send the close message to gracefully shut down the connection
// Otherwise the client might get an abnormal Websocket closure
// error.
self.sender.close().await?;
self.closed = true;
return Ok(());
}
Message::Text(text) => serde_json::from_str::<ClientMessage>(&text),
Message::Binary(data) => serde_json::from_slice::<ClientMessage>(&data),
Message::Text(text) => serde_json::from_str::<ClientRequest>(&text),
Message::Binary(data) => serde_json::from_slice::<ClientRequest>(&data),
Message::Ping(_) => {
// Axum will send Pong automatically
return Ok(());
Expand All @@ -237,64 +248,69 @@ impl Subscriber {
}
};

match maybe_client_message {
let request_id = match maybe_client_message {
Err(e) => {
self.sender
.send(
serde_json::to_string(&ServerMessage::Response(
ServerResponseMessage::Err {
error: e.to_string(),
},
))?
serde_json::to_string(&ServerResultResponse {
id: None,
result: ServerResultMessage::Err(e.to_string()),
})?
.into(),
)
.await?;
return Ok(());
}

Ok(ClientMessage::Subscribe { chain_ids }) => {
let available_chain_ids: Vec<&ChainId> = self.store.chains.keys().collect();
Ok(ClientRequest { msg, id }) => {
match msg {
ClientMessage::Subscribe { chain_ids } => {
let available_chain_ids: Vec<&ChainId> = self.store.chains.keys().collect();

let not_found_chain_ids: Vec<&ChainId> = chain_ids
.iter()
.filter(|chain_id| !available_chain_ids.contains(chain_id))
.collect();
let not_found_chain_ids: Vec<&ChainId> = chain_ids
.iter()
.filter(|chain_id| !available_chain_ids.contains(chain_id))
.collect();

// If there is a single chain id that is not found, we don't subscribe to any of the
// asked correct chain ids and return an error to be more explicit and clear.
if !not_found_chain_ids.is_empty() {
self.sender
.send(
serde_json::to_string(&ServerMessage::Response(
ServerResponseMessage::Err {
error: format!(
"Chain id(s) with id(s) {:?} not found",
not_found_chain_ids
),
},
))?
.into(),
)
.await?;
return Ok(());
} else {
self.chain_ids.extend(chain_ids.into_iter());
// If there is a single chain id that is not found, we don't subscribe to any of the
// asked correct chain ids and return an error to be more explicit and clear.
if !not_found_chain_ids.is_empty() {
self.sender
.send(
serde_json::to_string(&ServerResultResponse {
id: Some(id),
result: ServerResultMessage::Err(format!(
"Chain id(s) with id(s) {:?} not found",
not_found_chain_ids
)),
})?
.into(),
)
.await?;
return Ok(());
} else {
self.chain_ids.extend(chain_ids.into_iter());
}
}
ClientMessage::Unsubscribe { chain_ids } => {
self.chain_ids
.retain(|chain_id| !chain_ids.contains(chain_id));
}
}
id
}
Ok(ClientMessage::Unsubscribe { chain_ids }) => {
self.chain_ids
.retain(|chain_id| !chain_ids.contains(chain_id));
}
}


};
self.sender
.send(
serde_json::to_string(&ServerMessage::Response(ServerResponseMessage::Success))?
.into(),
serde_json::to_string(&ServerResultResponse {
id: Some(request_id),
result: ServerResultMessage::Success,
})?
.into(),
)
.await?;


Ok(())
}
}
Expand Down

0 comments on commit 105f9a6

Please sign in to comment.