Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor search transaction optimizations to use SQLX migrations #101

Merged
merged 6 commits into from
Feb 11, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions src/commands/search_tx_optimizations.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use anyhow::{bail, Result};
use clap::Args;
use sqlx::{PgPool, Row};
use sqlx::{migrate::Migrator, PgPool};

static MIGRATOR: Migrator = sqlx::migrate!("sql/migrations");

#[derive(Debug, Args)]
#[command(about = "Command to apply or drop search transaction optimizations in the archive database.")]
Expand Down Expand Up @@ -52,11 +54,9 @@
}

async fn apply_optimizations(&self, pool: &PgPool) -> Result<()> {
println!("Applying search transaction optimizations on Archive Database (this may take few minutes)...");
println!("Applying search transaction optimizations on Archive Database (this may take a few minutes)...");

// Load and execute the SQL from the file
let sql = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/sql/migrations/apply_search_tx_optimizations.sql"));
self.execute_sql_file(pool, sql, "-- NEXT --").await?;
MIGRATOR.run(pool).await?;

println!("Optimizations applied successfully.");
Ok(())
Expand All @@ -65,29 +65,31 @@
async fn drop_optimizations(&self, pool: &PgPool) -> Result<()> {
println!("Dropping search transaction optimizations from Archive Database...");

// Load and execute the SQL from the file
let sql = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/sql/migrations/drop_search_tx_optimizations.sql"));
self.execute_sql_file(pool, sql, ";").await?;
MIGRATOR.undo(pool, 0).await?;

println!("Optimizations dropped successfully.");
Ok(())
}

async fn check_if_optimizations_applied(&self, pool: &PgPool) -> Result<bool> {
// select if table exists
let result = sqlx::query(
"SELECT EXISTS (SELECT 1 FROM information_schema.tables WHERE table_name = 'user_commands_aggregated')",
)
.fetch_one(pool)
.await?;
Ok(result.get(0))
}
// Check if the migrations table exists
let table_exists: Option<String> =
sqlx::query_scalar("SELECT to_regclass('_sqlx_migrations')::text").fetch_one(pool).await?;

Check warning on line 77 in src/commands/search_tx_optimizations.rs

View workflow job for this annotation

GitHub Actions / cspell

Unknown word (regclass)

async fn execute_sql_file(&self, pool: &PgPool, file_content: &str, split_by: &str) -> Result<()> {
let statements: Vec<&str> = file_content.split(split_by).filter(|stmt| !stmt.trim().is_empty()).collect();
for stmt in statements {
sqlx::query(stmt).execute(pool).await?;
if table_exists.is_none() {
// The table doesn't exist so the optimizations have not been applied
return Ok(false);
}
Ok(())

// select the latest migration version in the DB
let result: Option<i64> = sqlx::query_scalar("SELECT MAX(version) FROM _sqlx_migrations").fetch_one(pool).await?;
let db_latest_version = result.unwrap_or(0);

// get the latest migration from MIGRATOR
let latest_version = MIGRATOR.iter().fold(0, |acc, m| acc.max(m.version));

// check if the latest migration version is the same as the latest version in
// the MIGRATOR
Ok(latest_version == db_latest_version)
}
}
Loading