diff --git a/bio2zarr/bed2zarr.py b/bio2zarr/bed2zarr.py index a2c9169..c5e77b0 100644 --- a/bio2zarr/bed2zarr.py +++ b/bio2zarr/bed2zarr.py @@ -1,6 +1,9 @@ import dataclasses +import json import logging import pathlib +import shutil +from enum import Enum import numcodecs import pandas as pd @@ -13,35 +16,220 @@ DEFAULT_ZARR_COMPRESSOR = numcodecs.Blosc(cname="zstd", clevel=7) +class BedType(Enum): + BED3 = 3 + BED4 = 4 + BED5 = 5 + BED6 = 6 + BED7 = 7 + BED8 = 8 + BED9 = 9 + BED12 = 12 + + +class BedColumns(Enum): + contig = 0 + start = 1 + end = 2 + name = 3 + score = 4 + strand = 5 + thickStart = 6 + thickEnd = 7 + itemRgb = 8 + blockCount = 9 + blockSizes = 10 + blockStarts = 11 + + @dataclasses.dataclass -class BedMetadata(core.JsonDataclass): +class ZarrArraySpec: + name: str + dtype: str + shape: list + chunks: list + dimensions: tuple + description: str + compressor: dict + filters: list + + def __post_init__(self): + self.shape = tuple(self.shape) + self.chunks = tuple(self.chunks) + self.dimensions = tuple(self.dimensions) + self.filters = tuple(self.filters) + + @staticmethod + def new(**kwargs): + spec = ZarrArraySpec( + **kwargs, compressor=DEFAULT_ZARR_COMPRESSOR.get_config(), filters=[] + ) + spec._choose_compressor_settings() + return spec + + def _choose_compressor_settings(self): + pass + + @staticmethod + def from_field( + bed_field, + *, + num_records, + records_chunk_size, + ): + shape = [num_records] + dimensions = ["records"] + chunks = [records_chunk_size] + return ZarrArraySpec.new( + name=bed_field.name, + dtype=bed_field.dtype, + shape=shape, + chunks=chunks, + dimensions=dimensions, + description=bed_field.description, + ) + + +ZARR_SCHEMA_FORMAT_VERSION = "0.1" + + +@dataclasses.dataclass +class BedZarrSchema(core.JsonDataclass): + format_version: str fields: list + bed_type: str + records_chunk_size: int + + def validate(self): + """ + Checks that the schema is well-formed and within required limits. + """ + + @staticmethod + def fromdict(d): + if d["format_version"] != ZARR_SCHEMA_FORMAT_VERSION: + raise ValueError( + "BedZarrSchema format version mismatch: " + f"{d['format_version']} != {ZARR_SCHEMA_FORMAT_VERSION}" + ) + ret = BedZarrSchema(**d) + return ret + + @staticmethod + def fromjson(s): + return BedZarrSchema.fromdict(json.loads(s)) + + @staticmethod + def generate(num_records, bed_type, records_chunk_size=None): + if records_chunk_size is None: + records_chunk_size = 1000 + logger.info("Generating schema with chunks=%d", records_chunk_size) + + def spec_from_field(field): + return ZarrArraySpec.from_field( + field, + num_records=num_records, + records_chunk_size=records_chunk_size, + ) + + fields = mkfields(bed_type) + specs = [spec_from_field(field) for field in fields] + return BedZarrSchema( + format_version=ZARR_SCHEMA_FORMAT_VERSION, + fields=specs, + bed_type=bed_type.name, + records_chunk_size=records_chunk_size, + ) @dataclasses.dataclass class BedField: category: str name: str + alias: str + description: str + dtype: str + + +def guess_bed_file_type(path): + """Check the number of columns in a BED file and guess BED + type.""" + first = pd.read_table(path, header=None, nrows=1) + num_cols = first.shape[1] + if num_cols < 3: + raise ValueError(f"Expected at least 3 columns in BED file, got {num_cols}") + if num_cols > 12: + raise ValueError(f"Expected at most 12 columns in BED file, got {num_cols}") + if num_cols in (10, 11): + raise ValueError(f"BED10 and BED11 are prohibited, got {num_cols} columns") + return BedType(num_cols) + + +BZW_METADATA_FORMAT_VERSION = "0.1" + + +@dataclasses.dataclass +class BedZarrWriterMetadata(core.JsonDataclass): + format_version: str + fields: list + bed_type: str + schema: BedZarrSchema + + @staticmethod + def fromdict(d): + if d["format_version"] != BZW_METADATA_FORMAT_VERSION: + raise ValueError( + "BedZarrWriterMetadata format version mismatch: " + f"{d['format_version']} != {BZW_METADATA_FORMAT_VERSION}" + ) + ret = BedZarrWriterMetadata(**d) + ret.schema = BedZarrSchema.fromdict(d["schema"]) + return ret class BedZarrWriter: def __init__(self, path): self.path = pathlib.Path(path) + self.wip_path = self.path / "wip" self.metadata = None self.data = None + self.bed_path = None + self.bed_type = None + + @property + def schema(self): + return self.metadata.schema def init( self, - bed_path, + data, *, - show_progress=False, + bed_path, + bed_type, + schema, ): - self.read(bed_path) - fields = mandatory_bed_field_definitions() - # FIXME: add optional fields; depends on the number of columns - # in the bed file. BED3 is the minimum. - self.metadata = BedMetadata(fields) + self.data = data + self.bed_type = bed_type + self.bed_path = bed_path + + if self.path.exists(): + raise ValueError("Zarr path already exists") + schema.validate() + + fields = mkfields(self.bed_type) + self.metadata = BedZarrWriterMetadata( + format_version=BZW_METADATA_FORMAT_VERSION, + bed_type=self.bed_type.name, + fields=fields, + schema=schema, + ) self.path.mkdir() + self.wip_path.mkdir() + logger.info("Writing WIP metadata") + with open(self.wip_path / "metadata.json", "w") as f: + json.dump(self.metadata.asdict(), f, indent=4) + + def write(self): store = zarr.DirectoryStore(self.path) root = zarr.group(store=store) root.attrs.update( @@ -50,56 +238,158 @@ def init( "source": f"bio2zarr-{provenance.__version__}", } ) - self.encode_mandatory_fields(root) - - # FIXME: error checking, number of columns, etc. - def read(self, bed_path): - logger.info("Reading bed file %s", bed_path) - first = pd.read_table(bed_path, header=None, nrows=1) - num_cols = len(first.columns) - if num_cols < 3: - raise ValueError(f"Expected at least 3 columns in bed file, got {num_cols}") - - # FIXME: support chunked reading - self.data = pd.read_table(bed_path, header=None).rename( - columns={0: "chrom", 1: "chromStart", 2: "chromEnd"} - ) + d = {i: f.name for i, f in enumerate(self.schema.fields)} + self.data.rename(columns=d, inplace=True) + + # 1. Create contig name <-> contig id mapping - def encode_mandatory_fields(self, root): - for field, dtype in zip( - ["chrom", "chromStart", "chromEnd"], ["str", "int", "int"] - ): - # FIXME: Check schema for chunks + # 2. update self.data.contig based on mapping (-> also means + # schema needs modification since we change from string to + # int!) + + self.encode_fields(root) + + def finalise(self): + logger.debug("Removing %s", self.wip_path) + shutil.rmtree(self.wip_path) + + def load_metadata(self): + if self.metadata is None: + with open(self.wip_path / "metadata.json") as f: + self.metadata = BedZarrWriterMetadata.fromdict(json.load(f)) + + # FIXME: fields 9-12 are multi-dimensional + def encode_fields(self, root): + for field in self.schema.fields: + object_codec = None + if field.dtype == "O": + object_codec = numcodecs.VLenUTF8() array = root.array( - field, - self.data[field].values, - dtype=dtype, + field.name, + self.data[field.name].values, + shape=field.shape, + dtype=field.dtype, compressor=DEFAULT_ZARR_COMPRESSOR, - chunks=(1000,), + chunks=(self.schema.records_chunk_size,), + object_codec=object_codec, ) + array.attrs["_ARRAY_DIMENSIONS"] = ["records"] logger.debug("%s done", field) # See https://samtools.github.io/hts-specs/BEDv1.pdf def mandatory_bed_field_definitions(): - def make_field_def(name): + def make_field_def(name, alias, dtype, description=""): return BedField( category="mandatory", name=name, + alias=alias, + description=description, + dtype=dtype, ) fields = [ - make_field_def("chrom"), - make_field_def("chromStart"), - make_field_def("chromEnd"), + make_field_def("contig", "chrom", "O", "Chromosome name"), + make_field_def("start", "chromStart", "i8", "Feature start position"), + make_field_def("end", "chromEnd", "i8", "Feature end position"), ] return fields +def optional_bed_field_definitions(num_fields=0): + def make_field_def(name, alias, dtype=None, *, description=""): + return BedField( + category="optional", + name=name, + alias=alias, + description=description, + dtype=dtype, + ) + + fields = [ + make_field_def("name", "name", "O", description="Feature description"), + make_field_def("score", "score", "i2", description="A numerical value"), + make_field_def("strand", "strand", "O", description="Feature strand"), + make_field_def( + "thickStart", "thickStart", "i8", description="Thick start position" + ), + make_field_def("thickEnd", "thickEnd", "i8", description="Thick end position"), + make_field_def("itemRgb", "itemRgb", description="Display"), + make_field_def("blockCount", "blockCount", description="Number of blocks"), + make_field_def("blockSizes", "blockSizes", description="Block sizes"), + make_field_def( + "blockStarts", "blockStarts", description="Block start positions" + ), + ] + + return fields[:num_fields] + + +def mkschema(bed_path, out): + bed_type = guess_bed_file_type(bed_path) + data = pd.read_table(bed_path, header=None) + spec = BedZarrSchema.generate( + data.shape[0], + bed_type, + ) + out.write(spec.asjson()) + + +def mkfields(bed_type): + mandatory = mandatory_bed_field_definitions() + optional = optional_bed_field_definitions(bed_type.value - BedType.BED3.value) + return mandatory + optional + + +def bed2zarr_init( + bed_path, + bed_type, + zarr_path, + *, + schema_path=None, + records_chunk_size=None, +): + data = pd.read_table(bed_path, header=None) + + if schema_path is None: + schema = BedZarrSchema.generate( + data.shape[0], + bed_type, + records_chunk_size=records_chunk_size, + ) + else: + logger.info("Reading schema from %s", schema_path) + if records_chunk_size is not None: + raise ValueError( + "Cannot specify schema along with chunk sizes" + ) # NEEDS TEST + with open(schema_path) as f: + schema = BedZarrSchema.fromjson(f.read()) + + # 2. init store + bedzw = BedZarrWriter(zarr_path) + bedzw.init( + data, + bed_path=bed_path, + bed_type=bed_type, + schema=schema, + ) + return bedzw + + def bed2zarr( bed_path, zarr_path, - show_progress=False, + schema_path=None, + records_chunk_size=None, ): - writer = BedZarrWriter(zarr_path) - writer.init(bed_path, show_progress=show_progress) + bed_type = guess_bed_file_type(bed_path) + bedzw = bed2zarr_init( + bed_path, + bed_type, + zarr_path, + schema_path=schema_path, + records_chunk_size=records_chunk_size, + ) + bedzw.write() + bedzw.finalise() diff --git a/bio2zarr/cli.py b/bio2zarr/cli.py index fe60971..9179a08 100644 --- a/bio2zarr/cli.py +++ b/bio2zarr/cli.py @@ -583,8 +583,7 @@ def plink2zarr(): @new_zarr_path @verbose @force -@progress -def bed2zarr_main(bed_path, zarr_path, verbose, force, progress): +def bed2zarr_main(bed_path, zarr_path, verbose, force): """ Convert BED file to the Zarr format. The BED regions will be converted to binary-encoded arrays whose length is equal to the @@ -599,7 +598,6 @@ def bed2zarr_main(bed_path, zarr_path, verbose, force, progress): bed2zarr.bed2zarr( bed_path, zarr_path, - show_progress=progress, ) diff --git a/tests/test_bed.py b/tests/test_bed.py index 72d8ad9..57df4e7 100644 --- a/tests/test_bed.py +++ b/tests/test_bed.py @@ -1,9 +1,113 @@ +import pandas as pd import pytest -import zarr + from bio2zarr import bed2zarr +ALLOWED_BED_FORMATS = [3, 4, 5, 6, 7, 8, 9, 12] +DISALLOWED_BED_FORMATS = [2, 10, 11, 13] +ALL_BED_FORMATS = ALLOWED_BED_FORMATS + DISALLOWED_BED_FORMATS + + +@pytest.fixture() +def bed_data(request): + data = [ + [ + "chr22", + 1000, + 5000, + "cloneA", + 960, + "+", + 1000, + 5000, + 0, + 2, + "567,488", + "0,3512", + "foo", + ], + [ + "chr22", + 2000, + 6000, + "cloneB", + 900, + "-", + 2000, + 6000, + 0, + 2, + "433,399", + "0,3601", + "bar", + ], + ] + return [row[0 : request.param] for row in data] + + +@pytest.fixture() +def bed_df(bed_data): + return pd.DataFrame(bed_data) + + +@pytest.fixture() +def bed_path(bed_data, tmp_path): + out = tmp_path / "sample_mask.bed" + with open(out, "w") as fh: + for row in bed_data: + fh.write("\t".join(map(str, row)) + "\n") + return out + + +@pytest.fixture() +def schema_path(bed_path, tmp_path_factory): + out = tmp_path_factory.mktemp("data") / "example.schema.json" + with open(out, "w") as f: + bed2zarr.mkschema(bed_path, f) + return out + + +@pytest.fixture() +def schema(schema_path): + with open(schema_path) as f: + return bed2zarr.BedZarrSchema.fromjson(f.read()) + + +class TestDefaultSchema: + @pytest.mark.parametrize("bed_data", ALLOWED_BED_FORMATS, indirect=True) + def test_format_version(self, schema): + assert schema.format_version == bed2zarr.ZARR_SCHEMA_FORMAT_VERSION + + +class TestSchema: + @pytest.mark.parametrize("bed_data", ALLOWED_BED_FORMATS, indirect=True) + def test_generate_schema(self, bed_df, request): + bedspec = request.node.callspec.params["bed_data"] + bed_type = bed2zarr.BedType(bedspec) + schema = bed2zarr.BedZarrSchema.generate(bed_df.shape[0], bed_type) + assert schema.bed_type == bed_type.name + assert schema.records_chunk_size == 1000 + + +class TestBedData: + @pytest.mark.parametrize("bed_data", ALL_BED_FORMATS, indirect=True) + def test_guess_bed_type_from_path(self, bed_path, request): + bedspec = request.node.callspec.params["bed_data"] + if bedspec in [2, 10, 11, 13]: + with pytest.raises(ValueError): + bed2zarr.guess_bed_file_type(bed_path) + else: + bed_type = bed2zarr.guess_bed_file_type(bed_path) + assert bed_type.value == bedspec + + @pytest.mark.parametrize("bed_data", ALLOWED_BED_FORMATS, indirect=True) + def test_bed_fields(self, bed_path, request): + bedspec = request.node.callspec.params["bed_data"] + fields = bed2zarr.mkfields(bed2zarr.BedType(bedspec)) + assert len(fields) == bedspec + -class TestBed: +class TestSampleMaskBed: bed_path = "tests/data/bed/sample_mask.bed.gz" @pytest.fixture() @@ -12,7 +116,7 @@ def zarr(self, tmp_path): return out def test_bed2zarr(self, zarr): - bed2zarr.bed2zarr(bed_path=self.bed_path, zarr_path=zarr, show_progress=True) + bed2zarr.bed2zarr(bed_path=self.bed_path, zarr_path=zarr) class Test1kgBed: @@ -24,4 +128,4 @@ def zarr(self, tmp_path): return out def test_bed2zarr(self, zarr): - bed2zarr.bed2zarr(bed_path=self.bed_path, zarr_path=zarr, show_progress=True) + bed2zarr.bed2zarr(bed_path=self.bed_path, zarr_path=zarr)