diff --git a/iris/_tests/test_data.py b/iris/_tests/test_data.py index 342a872..4b11721 100644 --- a/iris/_tests/test_data.py +++ b/iris/_tests/test_data.py @@ -1,4 +1,5 @@ from __future__ import annotations +from typing import Sequence import pytest import pathlib import urlpath @@ -6,10 +7,13 @@ import iris +_obsid_b2 = 3893012099 + + @pytest.mark.parametrize("time_start", [None]) @pytest.mark.parametrize("time_stop", [None]) @pytest.mark.parametrize("description", [""]) -@pytest.mark.parametrize("obs_id", [None, 3882010194]) +@pytest.mark.parametrize("obs_id", [None, _obsid_b2]) @pytest.mark.parametrize("limit", [5]) def test_query_hek( time_start: None | astropy.time.Time, @@ -31,7 +35,7 @@ def test_query_hek( @pytest.mark.parametrize("time_start", [None]) @pytest.mark.parametrize("time_stop", [None]) @pytest.mark.parametrize("description", [""]) -@pytest.mark.parametrize("obs_id", [None, 3882010194]) +@pytest.mark.parametrize("obs_id", [None, _obsid_b2]) @pytest.mark.parametrize("limit", [5]) @pytest.mark.parametrize("spectrograph", [True]) @pytest.mark.parametrize("sji", [True]) @@ -66,7 +70,7 @@ def test_urls_hek( argnames="urls", argvalues=[ iris.data.urls_hek( - obs_id=3882010194, + obs_id=_obsid_b2, limit=1, sji=False, ), @@ -88,3 +92,33 @@ def test_download( assert len(urls) == 1 for file in result: assert file.exists() + + +@pytest.mark.parametrize( + argnames="archives", + argvalues=[ + iris.data.download( + urls=iris.data.urls_hek( + obs_id=_obsid_b2, + limit=1, + sji=False, + ) + ) + ], +) +@pytest.mark.parametrize("directory", [None]) +@pytest.mark.parametrize("overwrite", [False]) +def test_decompress( + archives: Sequence[pathlib.Path], + directory: pathlib.Path, + overwrite: bool, +): + result = iris.data.decompress( + archives=archives, + directory=directory, + overwrite=overwrite, + ) + assert isinstance(result, list) + for file in result: + assert file.exists() + assert file.suffix == ".fits" diff --git a/iris/data.py b/iris/data.py index f280194..ad93a8b 100644 --- a/iris/data.py +++ b/iris/data.py @@ -3,7 +3,9 @@ """ from __future__ import annotations +from typing import Sequence import pathlib +import shutil import requests import urlpath import astropy.time @@ -12,6 +14,7 @@ "query_hek", "urls_hek", "download", + "decompress", ] @@ -201,7 +204,7 @@ def download( Examples -------- - Download the most recent "A1: QS monitoring" SJI files + Download the most recent "A1: QS monitoring" spectrograph files .. jupyter-execute:: @@ -210,7 +213,7 @@ def download( urls = iris.data.urls_hek( description="A1: QS monitoring", limit=1, - spectrograph=False, + sji=False, ) iris.data.download(urls) @@ -233,3 +236,65 @@ def download( result.append(file) return result + + +def decompress( + archives: Sequence[pathlib.Path], + directory: None | pathlib.Path = None, + overwrite: bool = False, +) -> list[pathlib.Path]: + """ + Decompress a list of ``.tar.gz`` files. + + Each ``.tar.gz`` file is decompressed and the ``.fits`` files within the + archive are appended to the returned list. + + Parameters + ---------- + archives + A list of ``.tar.gz`` files to decompress. + directory + A filesystem directory to place the decompressed results. + If :obj:`None`, the directory of the ``.tar.gz`` archive will be used. + overwrite + If the file already exists, it will be overwritten. + + Examples + -------- + Download the most recent "A1: QS monitoring" spectrograph files and + decompress into a list of ``.fits`` files. + + .. jupyter-execute:: + + import iris + + # Find the URL of the .tar.gz archive + urls = iris.data.urls_hek( + description="A1: QS monitoring", + limit=1, + sji=False, + ) + + # Download the .tar.gz archive + archives = iris.data.download(urls) + + # Decompress the .tar.gz archive into a list of fits files + iris.data.decompress(archives) + """ + + result = [] + + for archive in archives: + + if directory is None: + directory = archive.parent + + destination = directory / pathlib.Path(archive.stem).stem + + if overwrite or not destination.exists(): + shutil.unpack_archive(archive, extract_dir=destination) + + files = sorted(destination.rglob("*.fits")) + result = result + files + + return result