Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: store URIs for ingested VCFs #15

Merged
merged 17 commits into from
Nov 29, 2024
Merged
15 changes: 14 additions & 1 deletion .github/workflows/checks.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Checks
on: [push, pull_request]
jobs:
test:
python-test:
runs-on: ubuntu-latest
strategy:
matrix:
Expand Down Expand Up @@ -34,6 +34,19 @@ jobs:

- name: Check style
run: python3 -m ruff check . && ruff format --check .
rust-test:
runs-on: ubuntu-latest
defaults:
run:
working-directory: ./rust
steps:
- uses: actions/checkout@v4

- run: rustup update stable && rustup default stable

- run: cargo build --verbose

- run: cargo test --verbose
rust-lint:
runs-on: ubuntu-latest
defaults:
Expand Down
36 changes: 22 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,38 +1,39 @@
# vrsix: Indexing VRS-Annotated VCFs

Proof of concept for sqlite-based indexing of ANViL-hosted VCFs annotated with VRS IDs and attributes.
## Overview

`vrsix` provides a file-based indexing strategy to support fast lookup of AnVIL-hosted VCFs using IDs and annotations drawn from the [GA4GH Variation Representation Specification](https://www.ga4gh.org/product/variation-representation/).

See the [vrsix Terra workflow](https://github.com/gks-anvil/vrsix-workflow) for a readymade Terra implementation.

## Usage

From a VCF, ingest a VRS ID and the corresponding VCF-called location (i.e. sufficient inputs for a tabix lookup), and store them in a sqlite database.

```shell
% vrsix load chr1.vcf
vrsix load chr1.vcf
```

Given a VRS ID, retrieve VCF-associated data (output format TBD)
All instances of variations are stored with an associated file URI to support later retrieval. By default, this URI is simply the input VCF's location in the file system, but you may declare a custom URI instead as an optional argument:

```shell
% vrsix fetch-by-id --db-location=sqlite.db dwwiZdvVtfAmomu0OBsiHue1O-bw5SpG
ga4gh:VA.dwwiZdvVtfAmomu0OBsiHue1O-bw5SpG,1,783006
vrsix load chr1.vcf gs://my_stuff/chr1.vcf
```

Or fetch all rows within a coordinate range:
By default, all records are ingested into a sqlite file located at `~/.local/share/vrsix.db`. This can be overridden with either the environment variable `VRS_VCF_INDEX`, or with an optional flag to the CLI:

```shell
% vrsix fetch-by-range --db-location=sqlite.db 1 783000 783200
ga4gh:VA.dwwiZdvVtfAmomu0OBsiHue1O-bw5SpG,1,783006
ga4gh:VA.MiasxyXMXtOpsZgGelL3c4QgtflCNLHD,1,783006
ga4gh:VA.5cY2k53xdW7WeHw2WG1HA7jl50iH-r9p,1,783175
ga4gh:VA.jHaXepIvlbnapfPtH_62y-Qm81hCrBYn,1,783175
vrsix load --db-location=./vrsix.db input.vcf
```

## Set up for development
## Development

Ensure that a recent version of the [Rust toolchain](https://www.rust-lang.org/tools/install) is available.

Create a virtual environment and install developer dependencies:

```shell
python3 -m virtualenv venv
python3 -m venv venv
source venv/bin/activate
python3 -m pip install -e '.[dev,tests]'
```
Expand Down Expand Up @@ -62,7 +63,14 @@ cd rust/
cargo fmt
```

Run tests with `pytest`:
Run tests from the project root with `pytest`:
```shell
pytest
```

Some granular tests are written directly into the Rust backend as well:

```shell
cd rust/
cargo test
```
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ sqlx = { version = "0.8.2", features = ["runtime-tokio", "sqlite"] }
tokio = { version = "1.40.0", features = ["full"] }
pyo3 = { version = "0.21.1", features = ["abi3-py310", "experimental-async"]}
log = "0.4.22"
tempfile = "3.14.0"
7 changes: 5 additions & 2 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@ use std::path::PathBuf;
use tokio::runtime::Runtime;

#[pyfunction]
pub fn vcf_to_sqlite(vcf_path: PathBuf, db_url: String) -> PyResult<()> {
#[pyo3(signature = (vcf_path, db_url, vcf_uri = None))]
pub fn vcf_to_sqlite(vcf_path: PathBuf, db_url: String, vcf_uri: Option<String>) -> PyResult<()> {
let uri_value =
vcf_uri.unwrap_or_else(|| format!("file://{}", vcf_path.to_string_lossy().into_owned()));
let rt = Runtime::new().unwrap();
rt.block_on(load::load_vcf(vcf_path, &db_url))?;
rt.block_on(load::load_vcf(vcf_path, &db_url, uri_value))?;
Ok(())
}

Expand Down
57 changes: 50 additions & 7 deletions rust/src/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ use tokio::{

async fn load_allele(db_row: DbRow, pool: &SqlitePool) -> Result<(), Box<dyn std::error::Error>> {
let mut conn = pool.acquire().await?;
let result = sqlx::query("INSERT INTO vrs_locations (vrs_id, chr, pos) VALUES (?, ?, ?)")
.bind(db_row.vrs_id)
.bind(db_row.chr)
.bind(db_row.pos)
.execute(&mut *conn)
.await;
let result =
sqlx::query("INSERT INTO vrs_locations (vrs_id, chr, pos, uri_id) VALUES (?, ?, ?, ?);")
.bind(db_row.vrs_id)
.bind(db_row.chr)
.bind(db_row.pos)
.bind(db_row.uri_id)
.execute(&mut *conn)
.await;
if let Err(err) = result {
if let Some(db_error) = err.as_database_error() {
if let Some(sqlite_error) = db_error.try_downcast_ref::<SqliteError>() {
Expand Down Expand Up @@ -93,7 +95,25 @@ async fn get_reader(
}
}

pub async fn load_vcf(vcf_path: PathBuf, db_url: &str) -> PyResult<()> {
async fn load_file_uri(uri: &str, pool: &SqlitePool) -> Result<i64, Box<dyn std::error::Error>> {
let mut conn = pool.acquire().await?;

let insert_result = sqlx::query("INSERT OR IGNORE INTO file_uris (uri) VALUES (?);")
.bind(uri)
.execute(&mut *conn)
.await?;
if insert_result.rows_affected() > 0 {
Ok(insert_result.last_insert_rowid())
} else {
let row_id: (i64,) = sqlx::query_as("SELECT id FROM file_uris WHERE uri = ?;")
.bind(uri)
.fetch_one(&mut *conn)
.await?;
Ok(row_id.0)
}
}

pub async fn load_vcf(vcf_path: PathBuf, db_url: &str, uri: String) -> PyResult<()> {
let start = Instant::now();

if !vcf_path.exists() || !vcf_path.is_file() {
Expand All @@ -118,6 +138,10 @@ pub async fn load_vcf(vcf_path: PathBuf, db_url: &str) -> PyResult<()> {
VrsixDbError::new_err(format!("Failed database connection/call: {}", e))
})?;

let uri_id = load_file_uri(&uri, &db_pool)
.await
.map_err(|e| VrsixDbError::new_err(format!("Failed to insert file URI `{uri}`: {e}")))?;

while let Some(record) = records.try_next().await? {
let vrs_ids = get_vrs_ids(record.info(), &header)?;
let chrom = record.reference_sequence_name();
Expand All @@ -131,6 +155,7 @@ pub async fn load_vcf(vcf_path: PathBuf, db_url: &str) -> PyResult<()> {
.to_string(),
chr: chrom.strip_prefix("chr").unwrap_or(chrom).to_string(),
pos: pos.try_into().unwrap(),
uri_id,
};
load_allele(row, &db_pool).await.map_err(|e| {
error!("Failed to load row {:?}", e);
Expand All @@ -143,3 +168,21 @@ pub async fn load_vcf(vcf_path: PathBuf, db_url: &str) -> PyResult<()> {
info!("Time taken: {:?}", duration);
Ok(())
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;

#[tokio::test]
async fn test_load_file_uri() {
let temp_file = NamedTempFile::new().expect("Failed to create temp file");

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

help me out here, so what is this file testing? I'm unfamiliar with Rust so not sure how the db file is being created

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, good q. I think I just wanted to get a minimal test happening inside the Rust code, because failures that happen on the Rust side are often fairly opaque in the context of the Python integration tests (if it's not something you're explicitly looking for, it just looks like a panic without much detail). This test case literally just loads a single entry into the file_uris table and then verifies that it works and it gets the ID you expect for it. (the expect call on this specific line is sort of a red herring, the compiler is happy when you cover edge cases like a failure to create a tempfile when constructing an input for a test).

I'd originally just written end to end tests from the Python side to make sure things work the whole way through, but if/as this grows in complexity, it's probably smarter to focus more testing within the Rust code because it's much more granular/interpretable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha and for my understanding, how is the sqlite:/// interpreted? I was expecting to see an file exntension like .sqlite on the filename for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's just there as a scheme declaration to indicate a sqlite file, I guess. I think it's an expectation imposed by the DB client library. Just checked and this test fails if you change it.

I don't know if there's really an accepted file extension for sqlite files (i just googled it and this stackoverflow answer is very unsatisfying). It's definitely not something we try to validate anywhere, although we could if it seems worthwhile.

let db_url = format!("sqlite://{}", temp_file.path().to_str().unwrap());
crate::sqlite::setup_db(&db_url).await.unwrap();
let db_pool = get_db_connection(&db_url).await.unwrap();
let uri_id = load_file_uri("file:///arbitrary/file/location.vcf", &db_pool)
.await
.unwrap();
assert!(uri_id == 1);
}
}
25 changes: 23 additions & 2 deletions rust/src/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,20 @@ pub async fn setup_db(db_url: &str) -> Result<(), Error> {
let db = get_db_connection(db_url).await?;
let result = sqlx::query(
"
CREATE TABLE IF NOT EXISTS file_uris (
id INTEGER PRIMARY KEY,
uri TEXT UNIQUE
);
CREATE TABLE IF NOT EXISTS vrs_locations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vrs_id TEXT NOT NULL,
chr TEXT NOT NULL,
pos INTEGER NOT NULL,
UNIQUE(vrs_id,chr,pos)
);",
uri_id INTEGER NOT NULL,
FOREIGN KEY (uri_id) REFERENCES file_uris(id),
UNIQUE(vrs_id, chr, pos, uri_id)
);
",
)
.execute(&db)
.await?;
Expand All @@ -39,4 +46,18 @@ pub struct DbRow {
pub vrs_id: String,
pub chr: String,
pub pos: i64,
pub uri_id: i64,
}

#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;

#[tokio::test]
async fn test_setup_db() {
let temp_file = NamedTempFile::new().expect("Failed to create temp file");
let db_url = format!("sqlite://{}", temp_file.path().to_str().unwrap());
setup_db(&db_url).await.expect("Setup DB failed");
}
}
27 changes: 18 additions & 9 deletions src/vrsix/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Provide CLI utility for interfacing with data load and fetch operations."""
"""Provide CLI utility for interfacing with data loading operations."""

import logging
from pathlib import Path
Expand Down Expand Up @@ -28,28 +28,37 @@ def cli() -> None:

@cli.command()
@click.argument(
"vcfs",
"vcf",
type=click.Path(
exists=True, file_okay=True, dir_okay=False, readable=True, path_type=Path
),
nargs=-1,
nargs=1,
)
@click.argument(
"uri",
default=None,
required=False,
)
@click.option(
"--db-location",
type=click.Path(
file_okay=True, dir_okay=True, readable=True, writable=True, path_type=Path
),
)
def load(vcfs: tuple[Path], db_location: Path | None) -> None:
def load(vcf: Path, uri: str, db_location: Path | None) -> None:
"""Index the VRS annotations in a VCF by loading it into the sqlite DB.

Optionally provide a custom file URI to describe how to retrieve VCF records after
index lookup:

% vrsix load input.vcf gs://my_storage/input.vcf

\f
:param vcf_path: path to VCF to ingest
"""
if db_location and db_location.is_dir():
db_location = db_location / "vrs_vcf_index.db"
for vcf in vcfs:
start = timer()
load_vcf.load_vcf(vcf, db_location)
end = timer()
_logger.info("Processed `%s` in %s seconds", vcf, end - start)
start = timer()
load_vcf.load_vcf(vcf, db_location, uri)
end = timer()
_logger.info("Processed `%s` in %s seconds", vcf, end - start)
8 changes: 6 additions & 2 deletions src/vrsix/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,20 @@
_logger = logging.getLogger(__name__)


def load_vcf(vcf_path: Path, db_location: Path | None = None) -> None:
def load_vcf(
vcf_path: Path, db_location: Path | None = None, vcf_uri: str | None = None
) -> None:
"""Load VRS-annotated VCF into sqlite database.

:param vcf_path: path to VCF (must exist) to ingest
:param db_location: path to sqlite DB
:param vcf_uri: optional URI for original input. Not validated or parsed in any way.
Used by fetching lib to re-acquire data.
"""
sqlite_uri = (
f"sqlite://{DEFAULT_SQLITE_LOCATION}"
if db_location is None
else f"sqlite://{db_location}"
)
_logger.debug("Using sqlite file located at %s", sqlite_uri)
vcf_to_sqlite(vcf_path, sqlite_uri)
vcf_to_sqlite(vcf_path, sqlite_uri, vcf_uri)
4 changes: 1 addition & 3 deletions src/vrsix/sqlite.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,5 @@
from pathlib import Path

DEFAULT_SQLITE_LOCATION = Path(
os.environ.get(
"VRS_VCF_INDEX", Path.home() / ".local" / "share" / "vrs_vcf_index.db"
)
os.environ.get("VRS_VCF_INDEX", Path.home() / ".local" / "share" / "vrsix.db")
)
49 changes: 39 additions & 10 deletions tests/test_load.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,49 @@ def test_load(fixture_dir: Path, temp_dir: Path, input_filename: str):
results = conn.execute("SELECT * FROM vrs_locations").fetchall()
assert len(results) == 10
assert results == [
(1, "dwwiZdvVtfAmomu0OBsiHue1O-bw5SpG", "1", 783006),
(2, "MiasxyXMXtOpsZgGelL3c4QgtflCNLHD", "1", 783006),
(3, "5cY2k53xdW7WeHw2WG1HA7jl50iH-r9p", "1", 783175),
(4, "jHaXepIvlbnapfPtH_62y-Qm81hCrBYn", "1", 783175),
(5, "-NGsjBEx0UbPF3uYjStZ_2r-m2LbUtUB", "1", 784860),
(6, "HLinVo6Q-i-PryQOiq8QAtOeC9oQ9Q3p", "1", 784860),
(7, "qdyeeiC3cLfXeT23zxT9-qlJNN64MKVB", "1", 785417),
(8, "cNWXR3OLq9D3L19vQFvbHw-aH0vlA5cN", "1", 785417),
(9, "DVMcfA37Llc9QUOA0XfLJbJ-agKyGpGo", "1", 797392),
(10, "OTiBHLE2WW93M4-4zGVrWSqP2GBj8-qM", "1", 797392),
(1, "dwwiZdvVtfAmomu0OBsiHue1O-bw5SpG", "1", 783006, 1),
(2, "MiasxyXMXtOpsZgGelL3c4QgtflCNLHD", "1", 783006, 1),
(3, "5cY2k53xdW7WeHw2WG1HA7jl50iH-r9p", "1", 783175, 1),
(4, "jHaXepIvlbnapfPtH_62y-Qm81hCrBYn", "1", 783175, 1),
(5, "-NGsjBEx0UbPF3uYjStZ_2r-m2LbUtUB", "1", 784860, 1),
(6, "HLinVo6Q-i-PryQOiq8QAtOeC9oQ9Q3p", "1", 784860, 1),
(7, "qdyeeiC3cLfXeT23zxT9-qlJNN64MKVB", "1", 785417, 1),
(8, "cNWXR3OLq9D3L19vQFvbHw-aH0vlA5cN", "1", 785417, 1),
(9, "DVMcfA37Llc9QUOA0XfLJbJ-agKyGpGo", "1", 797392, 1),
(10, "OTiBHLE2WW93M4-4zGVrWSqP2GBj8-qM", "1", 797392, 1),
]
conn.close()


def test_load_specify_uri(fixture_dir: Path, temp_dir: Path):
input_file = fixture_dir / "input.vcf"
temp_db = temp_dir / "tmp.db"
input_uri = "gs://my/input/file.vcf"
load.load_vcf(input_file, temp_db, input_uri)

conn = sqlite3.connect(temp_db)
results = conn.execute("SELECT * FROM vrs_locations").fetchall()
assert len(results) == 10
assert results == [
(1, "dwwiZdvVtfAmomu0OBsiHue1O-bw5SpG", "1", 783006, 1),
(2, "MiasxyXMXtOpsZgGelL3c4QgtflCNLHD", "1", 783006, 1),
(3, "5cY2k53xdW7WeHw2WG1HA7jl50iH-r9p", "1", 783175, 1),
(4, "jHaXepIvlbnapfPtH_62y-Qm81hCrBYn", "1", 783175, 1),
(5, "-NGsjBEx0UbPF3uYjStZ_2r-m2LbUtUB", "1", 784860, 1),
(6, "HLinVo6Q-i-PryQOiq8QAtOeC9oQ9Q3p", "1", 784860, 1),
(7, "qdyeeiC3cLfXeT23zxT9-qlJNN64MKVB", "1", 785417, 1),
(8, "cNWXR3OLq9D3L19vQFvbHw-aH0vlA5cN", "1", 785417, 1),
(9, "DVMcfA37Llc9QUOA0XfLJbJ-agKyGpGo", "1", 797392, 1),
(10, "OTiBHLE2WW93M4-4zGVrWSqP2GBj8-qM", "1", 797392, 1),
]

results = conn.execute("SELECT * FROM file_uris").fetchall()
assert len(results) == 1
assert results == [(1, input_uri)]

conn.close()


def test_nonexistent_file(temp_dir: Path):
input_vcf = Path() / "input.vcf" # doesn't exist
temp_db = temp_dir / "tmp.db"
Expand Down
Loading