Skip to content

Commit

Permalink
[OSS-26] Queues improvements (#32)
Browse files Browse the repository at this point in the history
  • Loading branch information
stefandanaita authored Jul 30, 2024
1 parent 552763f commit 4084a1f
Show file tree
Hide file tree
Showing 11 changed files with 479 additions and 54 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ chrono = { version = "0.4", features = ["serde"] }
http = "1"
reqwest = { version = "0.12", features = ["json"] }
reqwest-middleware = { version = "0.3.1", features = ["json"] }
rust_decimal = "1.35"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
thiserror = "1"
Expand Down
2 changes: 2 additions & 0 deletions src/api/_generic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use http::StatusCode;
use reqwest::Response;
use serde::de::DeserializeOwned;

#[tracing::instrument(skip(response))]
pub async fn handle_response<T>(response: Response) -> Result<T, RabbitMqClientError>
where
T: DeserializeOwned,
Expand All @@ -24,6 +25,7 @@ where
}
}

#[tracing::instrument(skip(response))]
pub async fn handle_empty_response(response: Response) -> Result<(), RabbitMqClientError> {
let status = response.status();

Expand Down
55 changes: 2 additions & 53 deletions src/api/exchange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use crate::errors::RabbitMqClientError;
use crate::RabbitMqClient;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[async_trait]
pub trait ExchangeApi {
Expand Down Expand Up @@ -39,13 +38,6 @@ pub trait ExchangeApi {
exchange: String,
) -> Result<(), RabbitMqClientError>;

async fn publish_message(
&self,
vhost: String,
exchange: String,
request: RabbitMqExchangeMessagePublishRequest,
) -> Result<RabbitMqExchangeMessagePublishResponse, RabbitMqClientError>;

async fn list_source_bindings(
&self,
vhost: String,
Expand Down Expand Up @@ -151,28 +143,6 @@ impl ExchangeApi for RabbitMqClient {
handle_empty_response(response).await
}

async fn publish_message(
&self,
vhost: String,
exchange: String,
request: RabbitMqExchangeMessagePublishRequest,
) -> Result<RabbitMqExchangeMessagePublishResponse, RabbitMqClientError> {
let response = self
.client
.request(
reqwest::Method::POST,
format!(
"{}/api/exchanges/{}/{}/publish",
self.api_url, vhost, exchange
),
)
.json(&request)
.send()
.await?;

handle_response(response).await
}

async fn list_source_bindings(
&self,
vhost: String,
Expand Down Expand Up @@ -229,8 +199,8 @@ pub struct RabbitMqExchange {

#[derive(Debug, Deserialize)]
pub struct RabbitMqExchangeMessageStats {
pub publish_in: i64,
pub publish_out: i64,
pub publish_in: Option<i64>,
pub publish_out: Option<i64>,
}

#[derive(Debug, Serialize)]
Expand All @@ -241,24 +211,3 @@ pub struct RabbitMqExchangeRequest {
pub durable: bool,
pub internal: bool,
}

#[derive(Debug, Serialize)]
pub struct RabbitMqExchangeMessagePublishRequest {
pub properties: HashMap<String, String>,
pub routing_key: String,
pub payload: String,
pub payload_encoding: RabbitMqMessageEncoding,
}

#[derive(Debug, Serialize)]
pub enum RabbitMqMessageEncoding {
#[serde(rename = "string")]
String,
#[serde(rename = "base64")]
Base64,
}

#[derive(Debug, Deserialize)]
pub struct RabbitMqExchangeMessagePublishResponse {
pub routed: bool,
}
127 changes: 127 additions & 0 deletions src/api/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use crate::api::_generic::handle_response;
use crate::errors::RabbitMqClientError;
use crate::RabbitMqClient;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

#[async_trait]
pub trait MessageApi {
async fn publish_message(
&self,
vhost: String,
exchange: String,
request: RabbitMqPublishMessageRequest,
) -> Result<RabbitMqPublishMessageResponse, RabbitMqClientError>;

async fn get_messages(
&self,
vhost: String,
queue: String,
options: RabbitMqGetMessagesOptions,
) -> Result<Vec<RabbitMqMessage>, RabbitMqClientError>;
}

#[async_trait]
impl MessageApi for RabbitMqClient {
async fn publish_message(
&self,
vhost: String,
exchange: String,
request: RabbitMqPublishMessageRequest,
) -> Result<RabbitMqPublishMessageResponse, RabbitMqClientError> {
let response = self
.client
.request(
reqwest::Method::POST,
format!(
"{}/api/exchanges/{}/{}/publish",
self.api_url, vhost, exchange
),
)
.json(&request)
.send()
.await?;

handle_response(response).await
}

#[tracing::instrument(skip(self))]
async fn get_messages(
&self,
vhost: String,
queue: String,
options: RabbitMqGetMessagesOptions,
) -> Result<Vec<RabbitMqMessage>, RabbitMqClientError> {
let response = self
.client
.request(
reqwest::Method::POST,
format!("{}/api/queues/{}/{}/get", self.api_url, vhost, queue),
)
.json(&options)
.send()
.await?;

handle_response(response).await
}
}

#[derive(Debug, Serialize)]
pub struct RabbitMqPublishMessageRequest {
pub properties: HashMap<String, String>,
pub routing_key: String,
pub payload: String,
pub payload_encoding: RabbitMqMessageEncoding,
}

#[derive(Debug, Deserialize, Serialize)]
pub enum RabbitMqMessageEncoding {
#[serde(rename = "string")]
String,
#[serde(rename = "base64")]
Base64,
}

#[derive(Debug, Deserialize)]
pub struct RabbitMqPublishMessageResponse {
pub routed: bool,
}

#[derive(Debug, Serialize)]
pub struct RabbitMqGetMessagesOptions {
pub count: u32,
#[serde(rename = "ackmode")]
pub ack_mode: RabbitMqGetMessagesAckMode,
pub encoding: RabbitMqGetMessagesEncoding,
#[serde(skip_serializing_if = "Option::is_none")]
pub truncate: Option<u64>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "snake_case")]
pub enum RabbitMqGetMessagesAckMode {
AckRequeueTrue,
AckRequeueFalse,
RejectRequeueTrue,
RejectRequeueFalse,
}

#[derive(Debug, Serialize)]
pub enum RabbitMqGetMessagesEncoding {
#[serde(rename = "auto")]
Auto,
#[serde(rename = "base64")]
Base64,
}

#[derive(Debug, Deserialize)]
pub struct RabbitMqMessage {
pub payload_bytes: u64,
pub redelivered: bool,
pub exchange: String,
pub routing_key: String,
pub message_count: u64,
pub payload: String,
pub payload_encoding: RabbitMqMessageEncoding,
}
1 change: 1 addition & 0 deletions src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod _generic;
pub mod binding;
pub mod connection;
pub mod exchange;
pub mod message;
pub mod node;
pub mod overview;
pub mod permission;
Expand Down
18 changes: 17 additions & 1 deletion src/api/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::api::binding::RabbitMqBinding;
use crate::errors::RabbitMqClientError;
use crate::RabbitMqClient;
use async_trait::async_trait;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;

Expand Down Expand Up @@ -53,6 +54,7 @@ pub trait QueueApi {

#[async_trait]
impl QueueApi for RabbitMqClient {
#[tracing::instrument(skip(self))]
async fn list_queues(
&self,
vhost: Option<String>,
Expand All @@ -69,6 +71,7 @@ impl QueueApi for RabbitMqClient {
handle_response(response).await
}

#[tracing::instrument(skip(self))]
async fn get_queue(
&self,
vhost: String,
Expand All @@ -86,6 +89,7 @@ impl QueueApi for RabbitMqClient {
handle_response(response).await
}

#[tracing::instrument(skip(self))]
async fn get_queue_bindings(
&self,
vhost: String,
Expand All @@ -103,6 +107,7 @@ impl QueueApi for RabbitMqClient {
handle_response(response).await
}

#[tracing::instrument(skip(self))]
async fn create_queue(
&self,
vhost: String,
Expand All @@ -121,6 +126,7 @@ impl QueueApi for RabbitMqClient {
}
}

#[tracing::instrument(skip(self))]
async fn update_queue(
&self,
vhost: String,
Expand All @@ -140,6 +146,7 @@ impl QueueApi for RabbitMqClient {
handle_empty_response(response).await
}

#[tracing::instrument(skip(self))]
async fn delete_queue(&self, vhost: String, name: String) -> Result<(), RabbitMqClientError> {
let response = self
.client
Expand All @@ -153,6 +160,7 @@ impl QueueApi for RabbitMqClient {
handle_empty_response(response).await
}

#[tracing::instrument(skip(self))]
async fn purge_queue(&self, vhost: String, name: String) -> Result<(), RabbitMqClientError> {
let response = self
.client
Expand All @@ -166,6 +174,7 @@ impl QueueApi for RabbitMqClient {
handle_empty_response(response).await
}

#[tracing::instrument(skip(self))]
async fn set_queue_actions(
&self,
vhost: String,
Expand All @@ -190,7 +199,7 @@ impl QueueApi for RabbitMqClient {
pub struct RabbitMqQueue {
pub name: String,
pub node: String,
pub arguments: HashMap<String, String>,
pub arguments: HashMap<String, RabbitMqArgument>,
pub state: String,
#[serde(rename = "type")]
pub kind: String,
Expand All @@ -208,6 +217,13 @@ pub struct RabbitMqQueue {
pub message_stats: Option<RabbitMqQueueMessageStats>,
}

#[derive(Debug, Deserialize)]
#[serde(untagged)]
pub enum RabbitMqArgument {
String(String),
Decimal(Decimal),
}

#[derive(Debug, Deserialize)]
pub struct RabbitMqQueueMessageStats {
#[serde(default)]
Expand Down
6 changes: 6 additions & 0 deletions src/api/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub trait UserApi {

#[async_trait]
impl UserApi for RabbitMqClient {
#[tracing::instrument(skip(self))]
async fn who_am_i(&self) -> Result<RabbitMqWhoAmI, RabbitMqClientError> {
let response = self
.client
Expand All @@ -43,6 +44,7 @@ impl UserApi for RabbitMqClient {
handle_response(response).await
}

#[tracing::instrument(skip(self))]
async fn list_users(&self) -> Result<Vec<RabbitMqUser>, RabbitMqClientError> {
let response = self
.client
Expand All @@ -53,6 +55,7 @@ impl UserApi for RabbitMqClient {
handle_response(response).await
}

#[tracing::instrument(skip(self))]
async fn list_users_without_permissions(
&self,
) -> Result<Vec<RabbitMqUser>, RabbitMqClientError> {
Expand All @@ -68,6 +71,7 @@ impl UserApi for RabbitMqClient {
handle_response(response).await
}

#[tracing::instrument(skip(self))]
async fn bulk_delete_users(
&self,
users: RabbitMqUsersBulkDeleteRequest,
Expand All @@ -85,6 +89,7 @@ impl UserApi for RabbitMqClient {
handle_empty_response(response).await
}

#[tracing::instrument(skip(self))]
async fn list_user_permissions(
&self,
user: String,
Expand All @@ -101,6 +106,7 @@ impl UserApi for RabbitMqClient {
handle_response(response).await
}

#[tracing::instrument(skip(self))]
async fn list_user_topic_permissions(
&self,
user: String,
Expand Down
Loading

0 comments on commit 4084a1f

Please sign in to comment.