Skip to content
This repository has been archived by the owner on Oct 23, 2022. It is now read-only.

Commit

Permalink
OPT: use Redis pool instead of a single client
Browse files Browse the repository at this point in the history
  • Loading branch information
eigenein committed Jul 27, 2022
1 parent 072b31e commit 1044182
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 40 deletions.
38 changes: 14 additions & 24 deletions rusty-shared-redis/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ use std::time;

use anyhow::{Context, Result};
use async_std::future::timeout;
use fred::pool::RedisPool;
use fred::prelude::*;
use fred::types::{MultipleKeys, MultipleValues, PerformanceConfig, RedisKey};
use tracing::{debug, instrument};

pub struct Redis {
pub client: RedisClient,
pub pool: RedisPool,
script_hashes: ScriptHashes,
}

Expand All @@ -24,8 +25,8 @@ impl Redis {
/// Thus, I put a short timeout on each `EVALSHA` call.
const EVALSHA_TIMEOUT: time::Duration = time::Duration::from_secs(5);

#[instrument(skip_all, fields(url = url))]
pub async fn connect(url: &str) -> Result<Self> {
#[instrument(skip_all, fields(url = url, client_name = client_name))]
pub async fn connect(url: &str, client_name: &str) -> Result<Self> {
let config = {
let mut config = RedisConfig::from_url(url)?;
config.blocking = Blocking::Error;
Expand All @@ -37,28 +38,17 @@ impl Redis {
config
};

let client = RedisClient::new(config);
connect(&client).await?;
// TODO: client.client_setname("fred").await?; // FIXME: use bin crate name.
let script_hashes = load_scripts(&client).await?;
let pool = RedisPool::new(config, 2)?;
connect(&pool).await?;
pool.client_setname(client_name).await?;
let script_hashes = load_scripts(&pool).await?;

Ok(Self {
client,
pool,
script_hashes,
})
}

#[instrument(skip_all)]
pub async fn clone(&self) -> Result<Self> {
let client = self.client.clone_new();
connect(&client).await?;
let this = Self {
client,
script_hashes: self.script_hashes.clone(),
};
Ok(this)
}

#[instrument(skip_all, fields(key = ?key, group_name = group_name))]
pub async fn create_consumer_group<K: Into<RedisKey> + Debug>(
&self,
Expand All @@ -67,7 +57,7 @@ impl Redis {
) -> Result<bool> {
timeout(
Self::EVALSHA_TIMEOUT,
self.client
self.pool
.evalsha(&self.script_hashes.create_consumer_group, key, group_name),
)
.await
Expand All @@ -85,7 +75,7 @@ impl Redis {
{
timeout(
Self::EVALSHA_TIMEOUT,
self.client
self.pool
.evalsha(&self.script_hashes.set_if_greater, key, value),
)
.await
Expand All @@ -103,7 +93,7 @@ impl Redis {
{
timeout(
Self::EVALSHA_TIMEOUT,
self.client
self.pool
.evalsha(&self.script_hashes.set_if_not_equal, key, value),
)
.await
Expand All @@ -113,7 +103,7 @@ impl Redis {
}

#[instrument(skip_all)]
async fn connect(client: &RedisClient) -> Result<()> {
async fn connect(client: &RedisPool) -> Result<()> {
client.connect(None);
debug!("awaiting connection…");
client
Expand All @@ -125,7 +115,7 @@ async fn connect(client: &RedisClient) -> Result<()> {
}

#[instrument(skip_all)]
async fn load_scripts(client: &RedisClient) -> Result<ScriptHashes> {
async fn load_scripts(client: &RedisPool) -> Result<ScriptHashes> {
let set_if_greater = client.script_load(SET_IF_GREATER_SCRIPT).await?;
let create_consumer_group = client.script_load(CREATE_CONSUMER_GROUP).await?;
let set_if_not_equal = client.script_load(SET_IF_NOT_EQUAL_SCRIPT).await?;
Expand Down
10 changes: 5 additions & 5 deletions rusty-tractive-telegram-bot/src/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ impl Listener {
#[allow(clippy::mutable_key_type)]
let response: XReadResponse<RedisKey, String, String, String> = self
.redis
.client
.pool
.xreadgroup_map(
&self.group_name,
&self.consumer_name,
Expand Down Expand Up @@ -158,7 +158,7 @@ impl Listener {

match self
.redis
.client
.pool
.get::<Option<i64>, _>(&self.keys.live_location_message_id)
.await?
{
Expand All @@ -182,7 +182,7 @@ impl Listener {
debug!(message_id, "updating the live location message ID…");
if self
.redis
.client
.pool
.set::<Option<()>, _, _>(
&self.keys.live_location_message_id,
message_id,
Expand All @@ -200,7 +200,7 @@ impl Listener {
.await?;
self.delete_old_messages().await?;
self.redis
.client
.pool
.rpush(&self.keys.pinned_message_ids, message_id)
.await?;
} else {
Expand All @@ -219,7 +219,7 @@ impl Listener {
async fn delete_old_messages(&self) -> Result<()> {
while let Some(message_id) = self
.redis
.client
.pool
.lpop::<Option<i64>, _>(&self.keys.pinned_message_ids, None)
.await?
{
Expand Down
6 changes: 4 additions & 2 deletions rusty-tractive-telegram-bot/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@ mod middleware;
mod opts;
mod prelude;

static BIN_NAME: &str = env!("CARGO_BIN_NAME");

#[async_std::main]
async fn main() -> Result<()> {
let opts: Opts = Opts::parse();
let _guard = rusty_shared_tracing::init(opts.sentry, env!("CARGO_BIN_NAME"))?;
let _guard = rusty_shared_tracing::init(opts.sentry, BIN_NAME)?;

let bot_api = BotApi::new(opts.service.bot_token, Duration::from_secs(5))?;
let me = methods::GetMe.call(&bot_api).await?;
let redis = rusty_shared_redis::Redis::connect(&opts.redis.redis_url).await?;
let redis = rusty_shared_redis::Redis::connect(&opts.redis.redis_url, BIN_NAME).await?;

let listener = {
let bot_api = bot_api.clone();
Expand Down
6 changes: 4 additions & 2 deletions rusty-tractive/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ mod models;
mod opts;
mod service;

static BIN_NAME: &str = env!("CARGO_BIN_NAME");

#[async_std::main]
async fn main() -> Result<()> {
let opts: Opts = Opts::parse();
let _guard = rusty_shared_tracing::init(opts.sentry, env!("CARGO_BIN_NAME"))?;
let _guard = rusty_shared_tracing::init(opts.sentry, BIN_NAME)?;

let service = Service {
api: Api::new()?,
redis: rusty_shared_redis::Redis::connect(&opts.redis.redis_url).await?,
redis: rusty_shared_redis::Redis::connect(&opts.redis.redis_url, BIN_NAME).await?,
heartbeat: opts.heartbeat.get_heartbeat()?,
opts: opts.service,
};
Expand Down
17 changes: 10 additions & 7 deletions rusty-tractive/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Service {
#[tracing::instrument(skip_all, fields(self.email = ?self.opts.email))]
async fn get_authentication(&self) -> Result<(String, String)> {
let key = format!("rusty:tractive:{}:authentication", self.opts.email);
let authentication: HashMap<String, String> = self.redis.client.hgetall(&key).await?;
let authentication: HashMap<String, String> = self.redis.pool.hgetall(&key).await?;
let result = match (authentication.get("user_id"), authentication.get("access_token")) {
(Some(user_id), Some(access_token)) => {
debug!("using the cached token");
Expand All @@ -76,13 +76,13 @@ impl Service {
Ok(result)
}

#[instrument(skip_all, fields(user_id = ?token.user_id))]
#[instrument(skip_all, fields(key = key, user_id = ?token.user_id))]
async fn store_access_token(&self, key: &str, token: &Token) -> Result<()> {
let values = vec![
("user_id", &token.user_id),
("access_token", &token.access_token),
];
let transaction = self.redis.client.multi(true).await?;
let transaction = self.redis.pool.multi(true).await?;
transaction.hset(key, values).await?;
transaction
.expire_at(key, token.expires_at.timestamp())
Expand All @@ -100,6 +100,7 @@ impl Service {
if let Some(position) = payload.position {
self.on_position_update(&tracker_id, position).await?;
}
info!("👍 completed");
Ok(())
}

Expand All @@ -120,7 +121,7 @@ impl Service {
}
info!("⌚ pushing new entry…");
self.redis
.client
.pool
.xadd(
hardware_stream_key(tracker_id),
false,
Expand All @@ -129,7 +130,8 @@ impl Service {
hardware.into_vec(),
)
.await
.context("failed to push the hardware stream entry")
.context("failed to push the hardware stream entry")?;
Ok(())
}

#[instrument(skip_all)]
Expand Down Expand Up @@ -157,7 +159,7 @@ impl Service {
}
info!("🎯 pushing new entry…");
self.redis
.client
.pool
.xadd(
position_stream_key(tracker_id),
false,
Expand All @@ -166,6 +168,7 @@ impl Service {
PositionEntry::from(position).into_vec(),
)
.await
.context("failed to push the position stream entry")
.context("failed to push the position stream entry")?;
Ok(())
}
}

0 comments on commit 1044182

Please sign in to comment.