Skip to content

Commit

Permalink
NPI-3689 reworked functions for iau2000 download
Browse files Browse the repository at this point in the history
  • Loading branch information
treefern committed Jan 21, 2025
1 parent 08f725b commit caad5c5
Showing 1 changed file with 165 additions and 48 deletions.
213 changes: 165 additions & 48 deletions gnssanalysis/gn_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import ftplib as _ftplib
from ftplib import FTP_TLS as _FTP_TLS
from pathlib import Path as _Path
from typing import Any, Generator, List, Optional, Tuple, Union
from typing import Any, Generator, List, Literal, Optional, Tuple, Union
from urllib import request as _request
from urllib.error import HTTPError as _HTTPError

Expand All @@ -37,7 +37,7 @@
import pandas as _pd
from boto3.s3.transfer import TransferConfig

from .gn_datetime import GPSDate, dt2gpswk, gpswkD2dt
from .gn_datetime import GPSDate, gpswkD2dt
from .gn_utils import ensure_folders

MB = 1024 * 1024
Expand Down Expand Up @@ -150,7 +150,16 @@ def request_metadata(url: str, max_retries: int = 5, metadata_header: str = "x-a
return None


def download_url(url: str, destfile: Union[str, _os.PathLike], max_retries: int = 5) -> Optional[_Path]:
def download_url(
url: str, destfile: Union[str, _os.PathLike], max_retries: int = 5, raise_on_failure: bool = False
) -> Optional[_Path]:
"""
TODO finish docstring
:param bool raise_on_failure: Raise exceptions on errors, rather than returning None.
:raises Exception: On download failure. If original exception was an _HTTPError, raised Exception will wrap it.
:return Optional[_Path]: On success: local path to downloaded file. On errors: None, unless raise_on_failure is
set to True.
"""
logging.info(f'requesting "{url}"')
for retry in range(1, max_retries + 1):
try:
Expand All @@ -163,13 +172,19 @@ def download_url(url: str, destfile: Union[str, _os.PathLike], max_retries: int
except _HTTPError as err:
logging.error(f" HTTP Error {err.code}: {err.reason}")
if err.code == 404:
if raise_on_failure:
raise Exception(f"Download failed for URL '{url}'", err)
return None # File Not Found on the server so no point in retrying
t_seconds = 2**retry
logging.error(f"Retry No. {retry} in {t_seconds} seconds")
_time.sleep(t_seconds)
if retry >= max_retries:
logging.error(f"Maximum number of retries reached: {max_retries}. File not downloaded")
if raise_on_failure:
raise Exception(f"Download failed for URL '{url}' after {max_retries} retries.", err)
return None
if raise_on_failure:
raise Exception(f"Fell out of download block for URL '{url}' after {max_retries} retries.")
logging.error("Maximum retries exceeded in download_url with no clear outcome, returning None")
return None

Expand Down Expand Up @@ -490,7 +505,8 @@ def attempt_ftps_download(
:param str filename: Filename to assign for the downloaded file
:param str type_of_file: How to label the file for STDOUT messages, defaults to None
:param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user"
:return _Path or None: The pathlib.Path of the downloaded file if successful, otherwise returns None
:return _Path or None: The pathlib.Path of the downloaded file if successful, or None if the download was skipped
(based on if_file_present).
"""
""
logging.info(f"Attempting FTPS Download of {type_of_file} file - {filename} to {download_dir}")
Expand All @@ -506,7 +522,12 @@ def attempt_ftps_download(


def attempt_url_download(
download_dir: _Path, url: str, filename: str = None, type_of_file: str = None, if_file_present: str = "prompt_user"
download_dir: _Path,
url: str,
filename: str = None,
type_of_file: str = None,
if_file_present: str = "prompt_user",
raise_on_failure: bool = False,
) -> Union[_Path, None]:
"""Attempt download of file given URL (url) to chosen location (download_dir)
Expand All @@ -515,6 +536,7 @@ def attempt_url_download(
:param str filename: Filename to assign for the downloaded file, defaults to None
:param str type_of_file: How to label the file for STDOUT messages, defaults to None
:param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user"
TODO docstring for this param
:return _Path or None: The pathlib.Path of the downloaded file if successful, otherwise returns None
"""
# If the download_filename is not provided, use the filename from the URL
Expand All @@ -526,7 +548,7 @@ def attempt_url_download(
filename=filename, download_dir=download_dir, if_file_present=if_file_present
)
if download_filepath:
download_filepath = download_url(url, download_filepath)
download_filepath = download_url(url, download_filepath, raise_on_failure=raise_on_failure)
return download_filepath


Expand Down Expand Up @@ -729,13 +751,13 @@ def download_file_from_cddis(
:param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user"
:param str note_filetype: How to label the file for STDOUT messages, defaults to None
:raises e: Raise any error that is run into by ftplib
:return _Path or None: The pathlib.Path of the downloaded file if successful, otherwise returns None
:return _Path or None: The pathlib.Path of the downloaded file (or decompressed output of it). Returns None if the
file already existed and was skipped.
"""
with ftp_tls(CDDIS_FTP) as ftps:
ftps.cwd(ftp_folder)
retries = 0
download_done = False
while not download_done and retries <= max_retries:
while retries <= max_retries:
try:
download_filepath = attempt_ftps_download(
download_dir=output_folder,
Expand All @@ -744,25 +766,31 @@ def download_file_from_cddis(
type_of_file=note_filetype,
if_file_present=if_file_present,
)
if decompress and download_filepath:
download_filepath = decompress_file(
input_filepath=download_filepath, delete_after_decompression=True
)
download_done = True
if download_filepath:
logging.info(f"Downloaded {download_filepath.name}")
if not download_filepath: # File already existed and was skipped
return None
# File was downloaded
logging.info(f"Downloaded {download_filepath.name}")
if decompress: # Does it need unpacking?
# Decompress, and return the path of the resultant file
logging.info(f"Decompressing downloaded file {download_filepath.name}")
return decompress_file(input_filepath=download_filepath, delete_after_decompression=True)
# File doesn't need unpacking, return downloaded path
return download_filepath
except _ftplib.all_errors as e:
retries += 1
if retries > max_retries:
logging.error(f"Failed to download {filename} and reached maximum retry count ({max_retries}).")
logging.error(f"Failed to download {filename} and reached maximum retry count ({max_retries}).", e)
if (output_folder / filename).is_file():
(output_folder / filename).unlink()
raise e

logging.debug(f"Received an error ({e}) while try to download {filename}, retrying({retries}).")
# Add some backoff time (exponential random as it appears to be contention based?)
_time.sleep(_random.uniform(0.0, 2.0**retries))
return download_filepath

# Fell out of loop and context manager without returning a result or raising an exception.
# Shouldn't be possible, raise exception if it somehow happens.
raise Exception("Failed to download file or raise exception. Some logic is broken.")


def download_multiple_files_from_cddis(files: List[str], ftp_folder: str, output_folder: _Path) -> None:
Expand Down Expand Up @@ -903,59 +931,148 @@ def download_product_from_cddis(
return download_filepaths


def download_iau2000_file(
download_dir: _Path, start_epoch: _datetime, if_file_present: str = "prompt_user"
def download_iau2000_varient(
download_dir: _Path,
iau2000_file_varient: Literal["standard", "daily"],
if_file_present: str = "prompt_user",
) -> Union[_Path, None]:
"""Download relevant IAU2000 file from CDDIS or IERS based on start_epoch of data
"""
Downloads IAU2000 file based on the varient requested ("daily" or "standard" file).
Added in approximately version 0.0.58
:param _Path download_dir: Where to download files (local directory)
:param _datetime start_epoch: Start epoch of data in file
:param _Path download_dir: Where to download file (local directory).
:param Literal["standard", "daily"] iau2000_file_varient: name of file varient to download. Specifies whether to
download the recent "daily" file, or the historical "standard" file.
:param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user"
:return _Path or None: The pathlib.Path of the downloaded file if successful, otherwise returns None
:raises Exception: On download failures. In some cases the underlying exception will be wrapped in the one raised.
:return Union[_Path, None]: _Path to the downloaded file, or None if not downloaded (based on if_file_present
setting).
"""
ensure_folders([download_dir])
# Download most recent daily IAU2000 file if running for a session within the past week (data is within 3 months)
if _datetime.datetime.now() - start_epoch < _datetime.timedelta(weeks=1):
filetype = "EOP IAU2000"

if iau2000_file_varient == "daily":
# Daily (recent) file spanning the last three months. Updated daily (as the name suggests).
url_dir = "daily/"
iau2000_filename = "finals2000A.daily"
download_filename = "finals.daily.iau2000.txt"
logging.info("Attempting Download of finals2000A.daily file")
# Otherwise download the IAU2000 file dating back to 1992
else:
# logging.info("Attempting Download of finals2000A.daily file")
elif iau2000_file_varient == "standard":
# Standard (historic) IAU2000 file dating back to 1992:
url_dir = "standard/"
iau2000_filename = "finals2000A.data"
download_filename = "finals.data.iau2000.txt"
logging.info("Attempting Download of finals2000A.data file")
filetype = "EOP IAU2000"
# logging.info("Attempting Download of finals2000A.data file")
else:
raise ValueError(f"Unrecognised IAU2000 file varient requested: {iau2000_file_varient}")

# Can we skip this download, based on existing file and value of if_file_present?
if not check_whether_to_download(
filename=download_filename, download_dir=download_dir, if_file_present=if_file_present
):
return None
return None # No output path for this varient given we didn't download it.

# Attempt download from the CDDIS website first, if that fails try IERS
# Eugene: should try IERS first and then CDDIS?
# Default source IERS, then fall back to CDDIS (based on Eugene's comment)
try:
logging.info("Downloading IAU2000 file from CDDIS")
download_filepath = download_file_from_cddis(
filename=iau2000_filename,
ftp_folder="products/iers/",
output_folder=download_dir,
decompress=False,
if_file_present=if_file_present,
note_filetype=filetype
)
download_filepath = download_filepath.rename(download_dir / download_filename)
except:
logging.info("Failed CDDIS download - Downloading IAU2000 file from IERS")
download_filepath = attempt_url_download(
logging.info("Downloading IAU2000 file from IERS")
# We set raise_on_failure, so a return of None indicates the file did not need downloading (bit redundant).
return attempt_url_download(
download_dir=download_dir,
url="https://datacenter.iers.org/products/eop/rapid/" + url_dir + iau2000_filename,
filename=download_filename,
type_of_file=filetype,
if_file_present=if_file_present,
raise_on_failure=True,
)
return download_filepath
except Exception as ex:
logging.error("Failed to download IAU2000 file from IERS.", ex)
try:
logging.info("Downloading IAU2000 file from CDDIS")
download_filepath_or_none = download_file_from_cddis(
filename=iau2000_filename,
ftp_folder="products/iers/",
output_folder=download_dir,
decompress=False,
if_file_present=if_file_present,
note_filetype=filetype,
# Already raises on failure by default
)
if download_filepath_or_none is None: # File already existed and was not re-downloaded.
return None
# Download succeeded. Rename to standard filename, and return the path
return download_filepath_or_none.rename(download_dir / download_filename)
except Exception as ex:
logging.error("Failed to download IAU2000 file from CDDIS.", ex)


def get_iau2000_file_varients_for_dates(
start_epoch: Union[_datetime.datetime, None] = None,
end_epoch: Union[_datetime.datetime, None] = None,
) -> set[Literal["standard", "daily"]]:
"""
Works out which varient(s) of IAU2000 files are needed, based on the given start and or end epoch.
The returned varient(s) can be iterated over and passed to the download_iau2000_varient() function to fetch the
file(s).
Added in approximately version 0.0.58
Specifications for the file formats:
finals.data https://datacenter.iers.org/versionMetadata.php?filename=latestVersionMeta/10_FINALS.DATA_IAU2000_V2013_0110.txt
finals.daily https://datacenter.iers.org/versionMetadata.php?filename=latestVersionMeta/13_FINALS.DAILY_IAU2000_V2013_0113.txt
:param Union[_datetime.datetime, None] start_epoch: Start of date range. Optional if end_epoch is provided.
:param Union[_datetime.datetime, None] end_epoch: End of date range. Optional if start_epoch is provided.
:return set[Literal["standard", "daily"]]: Set of IAU2000 file varient names needed for your date / date range
"""
if not (start_epoch or end_epoch):
raise ValueError("start_epoch, end_epoch or both, must be provided")

needed_varients: set[Literal["standard", "daily"]] = set()
now = _datetime.datetime.now()
# TODO double check if they do it by calendar months, or days. If it's calendar months we need to be more conservative; 80 days perhaps.
three_months_ago = now - _datetime.timedelta(days=80)
one_week_ago = now - _datetime.timedelta(weeks=1)
eight_days_ago = now - _datetime.timedelta(days=8)

# To be safe, until we confirm that data makes it into the 'standard' file on day 7
one_week_ago = eight_days_ago

# If the time range overlaps with less than a week ago, you need the daily file (the historical one won't have
# new enough data).
# Otherwise, the historical file will be more complete.
# TODO but do you actually benefit from the historical file if you're not going more than 3 months back?

# Keeping it simple
# If your timerange includes dates as recent as last week, you need the daily file
# If your timerange includes dates older than three months ago, you need the standard file
# NOTE: you may need both!

if (start_epoch and start_epoch >= one_week_ago) or (end_epoch and end_epoch >= one_week_ago):
needed_varients.add("daily")

if (start_epoch and start_epoch < three_months_ago) or (end_epoch and end_epoch < three_months_ago):
needed_varients.add("standard")

return needed_varients


# TODO DEPRECATED
def download_iau2000_file(
download_dir: _Path,
start_epoch: _datetime.datetime,
if_file_present: str = "prompt_user",
) -> Union[_Path, None]:
"""
Compatibility wrapper around new functions
DEPRECATED since approximately version 0.0.58
"""
varients = get_iau2000_file_varients_for_dates(start_epoch=start_epoch)
if len(varients) != 1:
raise NotImplementedError(
"Legacy wrapper for IAU2000 file download failed. Exactly one file varient should be returned based on "
f"a single date ({start_epoch})."
)
varient = varients.pop()
download_iau2000_varient(download_dir=download_dir, iau2000_file_varient=varient, if_file_present=if_file_present)


def download_atx(
Expand Down

0 comments on commit caad5c5

Please sign in to comment.