Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: store URIs for ingested VCFs #15

Merged
merged 17 commits into from
Nov 29, 2024
Merged
15 changes: 14 additions & 1 deletion .github/workflows/checks.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Checks
on: [push, pull_request]
jobs:
test:
python-test:
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -34,6 +34,19 @@ jobs:

- name: Check style
run: python3 -m ruff check . && ruff format --check .
rust-test:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./rust
steps:
- uses: actions/checkout@v4

- run: rustup update stable && rustup default stable

- run: cargo build --verbose

- run: cargo test --verbose
rust-lint:
runs-on: ubuntu-latest
defaults:
Expand Down
15 changes: 14 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ From a VCF, ingest a VRS ID and the corresponding VCF-called location (i.e. suff
% vrsix load chr1.vcf
```

By default, this uses the input file's location in the local file system as a URI, but a custom URI can be declared:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

possibly add it as more declarative:

... If you want to use custom URI instead, specify the URI as a second argument

what was the reasoning behind adding this as an argument vs a flag? Ig it's trivial in terms of functionality but just curious

Copy link
Contributor Author

@jsstevenson jsstevenson Nov 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I didn't feel super strongly about it... I think it felt a little bit more ergonomic to do it this way, but now that you mention it I'm kind of torn

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since there's not much else needed in terms of flag, I'm good passing in as an optional argument for now.


```shell
% vrsix load chr1.vcf gs://my_stuff/chr1.vcf
```

Given a VRS ID, retrieve VCF-associated data (output format TBD)

```shell
Expand Down Expand Up @@ -62,7 +68,14 @@ cd rust/
cargo fmt
```

Run tests with `pytest`:
Run tests from the project root with `pytest`:
```shell
pytest
```

Some granular tests are written directly into the Rust backend as well:

```shell
cd rust/
cargo test
```
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ sqlx = { version = "0.8.2", features = ["runtime-tokio", "sqlite"] }
tokio = { version = "1.40.0", features = ["full"] }
pyo3 = { version = "0.21.1", features = ["abi3-py310", "experimental-async"]}
log = "0.4.22"
tempfile = "3.14.0"
7 changes: 5 additions & 2 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use std::path::PathBuf;
use tokio::runtime::Runtime;

#[pyfunction]
pub fn vcf_to_sqlite(vcf_path: PathBuf, db_url: String) -> PyResult<()> {
#[pyo3(signature = (vcf_path, db_url, vcf_uri = None))]
pub fn vcf_to_sqlite(vcf_path: PathBuf, db_url: String, vcf_uri: Option<String>) -> PyResult<()> {
let uri_value =
vcf_uri.unwrap_or_else(|| format!("file://{}", vcf_path.to_string_lossy().into_owned()));
let rt = Runtime::new().unwrap();
rt.block_on(load::load_vcf(vcf_path, &db_url))?;
rt.block_on(load::load_vcf(vcf_path, &db_url, uri_value))?;
Ok(())
}

Expand Down
55 changes: 48 additions & 7 deletions rust/src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ use tokio::{

async fn load_allele(db_row: DbRow, pool: &SqlitePool) -> Result<(), Box<dyn std::error::Error>> {
let mut conn = pool.acquire().await?;
let result = sqlx::query("INSERT INTO vrs_locations (vrs_id, chr, pos) VALUES (?, ?, ?)")
.bind(db_row.vrs_id)
.bind(db_row.chr)
.bind(db_row.pos)
.execute(&mut *conn)
.await;
let result =
sqlx::query("INSERT INTO vrs_locations (vrs_id, chr, pos, uri_id) VALUES (?, ?, ?, ?);")
.bind(db_row.vrs_id)
.bind(db_row.chr)
.bind(db_row.pos)
.bind(db_row.uri_id)
.execute(&mut *conn)
.await;
if let Err(err) = result {
if let Some(db_error) = err.as_database_error() {
if let Some(sqlite_error) = db_error.try_downcast_ref::<SqliteError>() {
Expand Down Expand Up @@ -93,7 +95,25 @@ async fn get_reader(
}
}

pub async fn load_vcf(vcf_path: PathBuf, db_url: &str) -> PyResult<()> {
async fn load_file_uri(uri: &str, pool: &SqlitePool) -> Result<i64, Box<dyn std::error::Error>> {
let mut conn = pool.acquire().await?;

let insert_result = sqlx::query("INSERT OR IGNORE INTO file_uris (uri) VALUES (?);")
.bind(uri)
.execute(&mut *conn)
.await?;
if insert_result.rows_affected() > 0 {
Ok(insert_result.last_insert_rowid())
} else {
let row_id: (i64,) = sqlx::query_as("SELECT id FROM file_uris WHERE uri = ?;")
.bind(uri)
.fetch_one(&mut *conn)
.await?;
Ok(row_id.0)
}
}

pub async fn load_vcf(vcf_path: PathBuf, db_url: &str, uri: String) -> PyResult<()> {
let start = Instant::now();

if !vcf_path.exists() || !vcf_path.is_file() {
Expand All @@ -118,6 +138,10 @@ pub async fn load_vcf(vcf_path: PathBuf, db_url: &str) -> PyResult<()> {
VrsixDbError::new_err(format!("Failed database connection/call: {}", e))
})?;

let uri_id = load_file_uri(&uri, &db_pool)
.await
.map_err(|e| VrsixDbError::new_err(format!("Failed to insert file URI `{uri}`: {e}")))?;

while let Some(record) = records.try_next().await? {
let vrs_ids = get_vrs_ids(record.info(), &header)?;
let chrom = record.reference_sequence_name();
Expand All @@ -131,6 +155,7 @@ pub async fn load_vcf(vcf_path: PathBuf, db_url: &str) -> PyResult<()> {
.to_string(),
chr: chrom.strip_prefix("chr").unwrap_or(chrom).to_string(),
pos: pos.try_into().unwrap(),
uri_id,
};
load_allele(row, &db_pool).await.map_err(|e| {
error!("Failed to load row {:?}", e);
Expand All @@ -143,3 +168,19 @@ pub async fn load_vcf(vcf_path: PathBuf, db_url: &str) -> PyResult<()> {
info!("Time taken: {:?}", duration);
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;

#[tokio::test]
async fn test_load_file_uri() {
let temp_file = NamedTempFile::new().expect("Failed to create temp file");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

help me out here, so what is this file testing? I'm unfamiliar with Rust so not sure how the db file is being created

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good q. I think I just wanted to get a minimal test happening inside the Rust code, because failures that happen on the Rust side are often fairly opaque in the context of the Python integration tests (if it's not something you're explicitly looking for, it just looks like a panic without much detail). This test case literally just loads a single entry into the file_uris table and then verifies that it works and it gets the ID you expect for it. (the expect call on this specific line is sort of a red herring, the compiler is happy when you cover edge cases like a failure to create a tempfile when constructing an input for a test).

I'd originally just written end to end tests from the Python side to make sure things work the whole way through, but if/as this grows in complexity, it's probably smarter to focus more testing within the Rust code because it's much more granular/interpretable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha and for my understanding, how is the sqlite:/// interpreted? I was expecting to see an file exntension like .sqlite on the filename for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just there as a scheme declaration to indicate a sqlite file, I guess. I think it's an expectation imposed by the DB client library. Just checked and this test fails if you change it.

I don't know if there's really an accepted file extension for sqlite files (i just googled it and this stackoverflow answer is very unsatisfying). It's definitely not something we try to validate anywhere, although we could if it seems worthwhile.

let db_url = format!("sqlite://{}", temp_file.path().to_str().unwrap());
crate::sqlite::setup_db(&db_url).await.unwrap();
let db_pool = get_db_connection(&db_url).await.unwrap();
let uri_id = load_file_uri("file:///sdlfkjd", &db_pool).await.unwrap();
assert!(uri_id == 1);
}
}
25 changes: 23 additions & 2 deletions rust/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ pub async fn setup_db(db_url: &str) -> Result<(), Error> {
let db = get_db_connection(db_url).await?;
let result = sqlx::query(
"
CREATE TABLE IF NOT EXISTS file_uris (
id INTEGER PRIMARY KEY,
uri TEXT UNIQUE
);
CREATE TABLE IF NOT EXISTS vrs_locations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vrs_id TEXT NOT NULL,
chr TEXT NOT NULL,
pos INTEGER NOT NULL,
UNIQUE(vrs_id,chr,pos)
);",
uri_id INTEGER NOT NULL,
FOREIGN KEY (uri_id) REFERENCES file_uris(id),
UNIQUE(vrs_id, chr, pos, uri_id)
);
",
)
.execute(&db)
.await?;
Expand All @@ -39,4 +46,18 @@ pub struct DbRow {
pub vrs_id: String,
pub chr: String,
pub pos: i64,
pub uri_id: i64,
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;

#[tokio::test]
async fn test_setup_db() {
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let db_url = format!("sqlite://{}", temp_file.path().to_str().unwrap());
setup_db(&db_url).await.expect("Setup DB failed");
}
}
25 changes: 17 additions & 8 deletions src/vrsix/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,40 @@ def cli() -> None:

@cli.command()
@click.argument(
"vcfs",
"vcf",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path
),
nargs=-1,
nargs=1,
)
@click.argument(
"uri",
default=None,
required=False,
)
@click.option(
"--db-location",
type=click.Path(
file_okay=True, dir_okay=True, readable=True, writable=True, path_type=Path
),
)
def load(vcfs: tuple[Path], db_location: Path | None) -> None:
def load(vcf: Path, uri: str, db_location: Path | None) -> None:
"""Index the VRS annotations in a VCF by loading it into the sqlite DB.

Optionally provide a custom file URI to describe how to retrieve VCF records after
index lookup:

% vrsix load input.vcf gs://my_storage/input.vcf

\f
:param vcf_path: path to VCF to ingest
"""
if db_location and db_location.is_dir():
db_location = db_location / "vrs_vcf_index.db"
for vcf in vcfs:
start = timer()
load_vcf.load_vcf(vcf, db_location)
end = timer()
_logger.info("Processed `%s` in %s seconds", vcf, end - start)
start = timer()
load_vcf.load_vcf(vcf, db_location, uri)
end = timer()
_logger.info("Processed `%s` in %s seconds", vcf, end - start)


@cli.command()
Expand Down
8 changes: 6 additions & 2 deletions src/vrsix/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
_logger = logging.getLogger(__name__)


def load_vcf(vcf_path: Path, db_location: Path | None = None) -> None:
def load_vcf(
vcf_path: Path, db_location: Path | None = None, vcf_uri: str | None = None
) -> None:
"""Load VRS-annotated VCF into sqlite database.

:param vcf_path: path to VCF (must exist) to ingest
:param db_location: path to sqlite DB
:param vcf_uri: optional URI for original input. Not validated or parsed in any way.
Used by fetching lib to re-acquire data.
"""
sqlite_uri = (
f"sqlite://{DEFAULT_SQLITE_LOCATION}"
if db_location is None
else f"sqlite://{db_location}"
)
_logger.debug("Using sqlite file located at %s", sqlite_uri)
vcf_to_sqlite(vcf_path, sqlite_uri)
vcf_to_sqlite(vcf_path, sqlite_uri, vcf_uri)
43 changes: 33 additions & 10 deletions tests/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,39 @@ def test_load(fixture_dir: Path, temp_dir: Path, input_filename: str):
results = conn.execute("SELECT * FROM vrs_locations").fetchall()
assert len(results) == 10
assert results == [
(1, "dwwiZdvVtfAmomu0OBsiHue1O-bw5SpG", "1", 783006),
(2, "MiasxyXMXtOpsZgGelL3c4QgtflCNLHD", "1", 783006),
(3, "5cY2k53xdW7WeHw2WG1HA7jl50iH-r9p", "1", 783175),
(4, "jHaXepIvlbnapfPtH_62y-Qm81hCrBYn", "1", 783175),
(5, "-NGsjBEx0UbPF3uYjStZ_2r-m2LbUtUB", "1", 784860),
(6, "HLinVo6Q-i-PryQOiq8QAtOeC9oQ9Q3p", "1", 784860),
(7, "qdyeeiC3cLfXeT23zxT9-qlJNN64MKVB", "1", 785417),
(8, "cNWXR3OLq9D3L19vQFvbHw-aH0vlA5cN", "1", 785417),
(9, "DVMcfA37Llc9QUOA0XfLJbJ-agKyGpGo", "1", 797392),
(10, "OTiBHLE2WW93M4-4zGVrWSqP2GBj8-qM", "1", 797392),
(1, "dwwiZdvVtfAmomu0OBsiHue1O-bw5SpG", "1", 783006, 1),
(2, "MiasxyXMXtOpsZgGelL3c4QgtflCNLHD", "1", 783006, 1),
(3, "5cY2k53xdW7WeHw2WG1HA7jl50iH-r9p", "1", 783175, 1),
(4, "jHaXepIvlbnapfPtH_62y-Qm81hCrBYn", "1", 783175, 1),
(5, "-NGsjBEx0UbPF3uYjStZ_2r-m2LbUtUB", "1", 784860, 1),
(6, "HLinVo6Q-i-PryQOiq8QAtOeC9oQ9Q3p", "1", 784860, 1),
(7, "qdyeeiC3cLfXeT23zxT9-qlJNN64MKVB", "1", 785417, 1),
(8, "cNWXR3OLq9D3L19vQFvbHw-aH0vlA5cN", "1", 785417, 1),
(9, "DVMcfA37Llc9QUOA0XfLJbJ-agKyGpGo", "1", 797392, 1),
(10, "OTiBHLE2WW93M4-4zGVrWSqP2GBj8-qM", "1", 797392, 1),
]
conn.close()


def test_load_specify_uri(fixture_dir: Path, temp_dir: Path):
input_file = fixture_dir / "input.vcf"
temp_db = temp_dir / "tmp.db"
load.load_vcf(input_file, temp_db)

conn = sqlite3.connect(temp_db)
results = conn.execute("SELECT * FROM vrs_locations").fetchall()
assert len(results) == 10
assert results == [
(1, "dwwiZdvVtfAmomu0OBsiHue1O-bw5SpG", "1", 783006, 1),
(2, "MiasxyXMXtOpsZgGelL3c4QgtflCNLHD", "1", 783006, 1),
(3, "5cY2k53xdW7WeHw2WG1HA7jl50iH-r9p", "1", 783175, 1),
(4, "jHaXepIvlbnapfPtH_62y-Qm81hCrBYn", "1", 783175, 1),
(5, "-NGsjBEx0UbPF3uYjStZ_2r-m2LbUtUB", "1", 784860, 1),
(6, "HLinVo6Q-i-PryQOiq8QAtOeC9oQ9Q3p", "1", 784860, 1),
(7, "qdyeeiC3cLfXeT23zxT9-qlJNN64MKVB", "1", 785417, 1),
(8, "cNWXR3OLq9D3L19vQFvbHw-aH0vlA5cN", "1", 785417, 1),
(9, "DVMcfA37Llc9QUOA0XfLJbJ-agKyGpGo", "1", 797392, 1),
(10, "OTiBHLE2WW93M4-4zGVrWSqP2GBj8-qM", "1", 797392, 1),
]
conn.close()

Expand Down