diff --git a/gnssanalysis/gn_download.py b/gnssanalysis/gn_download.py index d44ce13..37c5acd 100644 --- a/gnssanalysis/gn_download.py +++ b/gnssanalysis/gn_download.py @@ -12,6 +12,7 @@ from itertools import repeat as _repeat import logging import os as _os +from copy import deepcopy as _deepcopy import random as _random import shutil import click as _click @@ -24,7 +25,7 @@ import ftplib as _ftplib from ftplib import FTP_TLS as _FTP_TLS from pathlib import Path as _Path -from typing import Optional as _Optional, Union as _Union, Tuple as _Tuple +from typing import Optional as _Optional, Tuple as _Tuple, List as _List from urllib import request as _request from urllib.error import HTTPError as _HTTPError @@ -34,10 +35,14 @@ from boto3.s3.transfer import TransferConfig from .gn_datetime import GPSDate, dt2gpswk, gpswkD2dt +from .gn_utils import ensure_folders MB = 1024 * 1024 CDDIS_FTP = "gdc.cddis.eosdis.nasa.gov" +PRODUCT_BASE_URL = "https://peanpod.s3.ap-southeast-2.amazonaws.com/aux/products/" +IGS_FILES_URL = "https://files.igs.org/pub/" +BERN_URL = "http://ftp.aiub.unibe.ch/" # s3client = boto3.client('s3', region_name='eu-central-1') @@ -142,7 +147,7 @@ def request_metadata(url: str, max_retries: int = 5, metadata_header: str = "x-a return None -def download_url(url: str, destfile: _Union[str, _os.PathLike], max_retries: int = 5) -> _Optional[_Path]: +def download_url(url: str, destfile: str | _os.PathLike, max_retries: int = 5) -> _Optional[_Path]: logging.info(f'requesting "{url}"') for retry in range(1, max_retries + 1): try: @@ -180,38 +185,6 @@ def gen_uncomp_filename(comp_filename: str) -> str: return comp_filename -def gen_prod_filename(dt, pref, suff, f_type, wkly_file=False, repro3=False): - """ - Generate a product filename based on the inputs - """ - gpswk, gpswkD = dt2gpswk(dt, both=True) - - if repro3: - if f_type == "erp": - f = f'{pref.upper()}0R03FIN_{dt.year}{dt.strftime("%j")}0000_01D_01D_{f_type.upper()}.{f_type.upper()}.gz' - elif f_type == "clk": - f = f'{pref.upper()}0R03FIN_{dt.year}{dt.strftime("%j")}0000_01D_30S_{f_type.upper()}.{f_type.upper()}.gz' - elif f_type == "bia": - f = f'{pref.upper()}0R03FIN_{dt.year}{dt.strftime("%j")}0000_01D_01D_OSB.{f_type.upper()}.gz' - elif f_type == "sp3": - f = f'{pref.upper()}0R03FIN_{dt.year}{dt.strftime("%j")}0000_01D_05M_ORB.{f_type.upper()}.gz' - elif f_type == "snx": - f = f'{pref.upper()}0R03FIN_{dt.year}{dt.strftime("%j")}0000_01D_01D_SOL.{f_type.upper()}.gz' - elif f_type == "rnx": - f = f'BRDC00{pref.upper()}_R_{dt.year}{dt.strftime("%j")}0000_01D_MN.rnx.gz' - elif (pref == "igs") & (f_type == "snx") & wkly_file: - f = f"{pref}{str(dt.year)[2:]}P{gpswk}.{f_type}.Z" - elif (pref == "igs") & (f_type == "snx"): - f = f"{pref}{str(dt.year)[2:]}P{gpswkD}.{f_type}.Z" - elif f_type == "rnx": - f = f'BRDC00{pref.upper()}_R_{dt.year}{dt.strftime("%j")}0000_01D_MN.rnx.gz' - elif wkly_file: - f = f"{pref}{gpswk}{suff}.{f_type}.Z" - else: - f = f"{pref}{gpswkD}{suff}.{f_type}.Z" - return f, gpswk - - def generate_uncompressed_filename(filename: str) -> str: """Returns a string of the uncompressed filename given the [assumed compressed] filename @@ -396,16 +369,14 @@ def generate_product_filename( return product_filename, gps_date, reference_start -def check_whether_to_download( - filename: str, download_dir: _Path, if_file_present: str = "prompt_user" -) -> _Union[_Path, None]: +def check_whether_to_download(filename: str, download_dir: _Path, if_file_present: str = "prompt_user") -> _Path | None: """Determine whether to download given file (filename) to the desired location (download_dir) based on whether it is already present and what action to take if it is (if_file_present) :param str filename: Filename of the downloaded file - :param _Path download_dir: Path obj to download directory - :param str if_file_present: How to handle files that are already present ["replace","dont_replace","prompt_user"], defaults to "prompt_user" - :return _Union[_Path, None]: Path obj to the downloaded file if file should be downloaded, otherwise returns None + :param _Path download_dir: Where to download files (local directory) + :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" + :return _Path | None: pathlib.Path to the downloaded file if file should be downloaded, otherwise returns None """ # Flag to determine whether to download: download = None @@ -455,12 +426,12 @@ def attempt_ftps_download( ) -> _Path: """Attempt download of file (filename) given the ftps client object (ftps) to chosen location (download_dir) - :param _Path download_dir: Path obj to download directory + :param _Path download_dir: Where to download files (local directory) :param _ftplib.FTP_TLS ftps: FTP_TLS client pointed at download source :param str filename: Filename to assign for the downloaded file :param str type_of_file: How to label the file for STDOUT messages, defaults to None - :param str if_file_present: How to handle files that are already present ["replace","dont_replace","prompt_user"], defaults to "prompt_user" - :return _Path: Path obj to the downloaded file + :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" + :return _Path: The pathlib.Path of the downloaded file """ "" logging.info(f"Attempting FTPS Download of {type_of_file} file - {filename} to {download_dir}") @@ -480,12 +451,12 @@ def attempt_url_download( ) -> _Path: """Attempt download of file given URL (url) to chosen location (download_dir) - :param Path download_dir: Path obj to download directory + :param _Path download_dir: Where to download files (local directory) :param str url: URL to download :param str filename: Filename to assign for the downloaded file, defaults to None :param str type_of_file: How to label the file for STDOUT messages, defaults to None - :param str if_file_present: How to handle files that are already present ["replace","dont_replace","prompt_user"], defaults to "prompt_user" - :return Path: Path obj to the downloaded file + :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" + :return _Path: The pathlib.Path of the downloaded file """ # If the filename is not provided, use the filename from the URL if not filename: @@ -656,7 +627,7 @@ def connect_cddis(verbose=False): if verbose: logging.info("\nConnecting to CDDIS server...") - ftps = _FTP_TLS("gdc.cddis.eosdis.nasa.gov") + ftps = _FTP_TLS(CDDIS_FTP) ftps.login() ftps.prot_p() @@ -680,108 +651,6 @@ def ftp_tls(url: str, **kwargs) -> None: ftps.quit() -@_contextmanager -def ftp_tls_cddis(connection: _FTP_TLS = None, **kwargs) -> None: - """Establish an ftp tls connection to CDDIS. Opens a new connection if one does not already exist. - - :param connection: Active connection which is passed through to allow reuse - """ - if connection is None: - with ftp_tls(CDDIS_FTP, **kwargs) as ftps: - yield ftps - else: - yield connection - - -def select_mr_file(mr_files, f_typ, ac): - """ - Given a list of most recent files, find files matching type and AC of interest - """ - if ac == "any": - search_str = f".{f_typ}.Z" - mr_typ_files = [f for f in mr_files if f.endswith(search_str)] - else: - search_str_end = f".{f_typ}.Z" - search_str_sta = f"{ac}" - mr_typ_files = [f for f in mr_files if ((f.startswith(search_str_sta)) & (f.endswith(search_str_end)))] - - return mr_typ_files - - -def find_mr_file(dt, f_typ, ac, ftps): - """Given connection to the ftps server, find the most recent file of type f_typ and analysis centre ac""" - c_gpswk = dt2gpswk(dt) - - ftps.cwd(f"gnss/products/{c_gpswk}") - mr_files = ftps.nlst() - mr_typ_files = select_mr_file(mr_files, f_typ, ac) - - if mr_typ_files == []: - while mr_typ_files == []: - logging.info(f"GPS Week {c_gpswk} too recent") - logging.info(f"No {ac} {f_typ} files found in GPS week {c_gpswk}") - logging.info(f"Moving to GPS week {int(c_gpswk) - 1}") - c_gpswk = str(int(c_gpswk) - 1) - ftps.cwd(f"../{c_gpswk}") - mr_files = ftps.nlst() - mr_typ_files = select_mr_file(mr_files, f_typ, ac) - mr_file = mr_typ_files[-1] - return mr_file, ftps, c_gpswk - - -def download_most_recent( - dest, f_type, ftps=None, ac="any", dwn_src="cddis", f_dict_out=False, gpswkD_out=False, ftps_out=False -): - """ - Download the most recent version of a product file - """ - # File types should be converted to lists if not already a list - if isinstance(f_type, list): - f_types = f_type - else: - f_types = [f_type] - - # Create directory if doesn't exist: - if not _Path(dest).is_dir(): - _Path(dest).mkdir(parents=True) - - # Create list to hold filenames that will be downloaded: - if f_dict_out: - f_dict = {f_typ: [] for f_typ in f_types} - if gpswkD_out: - gpswk_dict = {f_typ + "_gpswkD": [] for f_typ in f_types} - # Connect to ftps if not already: - if not ftps: - # Connect to chosen server - if dwn_src == "cddis": - ftps = connect_cddis() - - for f_typ in f_types: - logging.info(f"\nSearching for most recent {ac} {f_typ}...\n") - - dt = (_np.datetime64("today") - 1).astype(_datetime.datetime) - mr_file, ftps, c_gpswk = find_mr_file(dt, f_typ, ac, ftps) - check_n_download(mr_file, dwndir=dest, ftps=ftps, uncomp=True) - ftps.cwd(f"/") - if f_dict_out: - f_uncomp = gen_uncomp_filename(mr_file) - if f_uncomp not in f_dict[f_typ]: - f_dict[f_typ].append(f_uncomp) - c_gpswkD = mr_file[3:8] - if gpswkD_out: - gpswk_dict[f_typ + "_gpswkD"].append(c_gpswkD) - - ret_vars = [] - if f_dict_out: - ret_vars.append(f_dict) - if gpswkD_out: - ret_vars.append(gpswk_dict) - if ftps_out: - ret_vars.append(ftps) - - return ret_vars - - def download_file_from_cddis( filename: str, ftp_folder: str, @@ -791,16 +660,19 @@ def download_file_from_cddis( if_file_present: str = "prompt_user", note_filetype: str = None, ) -> _Path: - """Downloads a single file from the cddis ftp server. - - :param filename: Name of the file to download - :ftp_folder: Folder where the file is stored on the remote - :output_folder: Folder to store the output file - :ftps: Optional active connection object which is reused - :max_retries: Number of retries before raising error - :uncomp: If true, uncompress files on download - """ - with ftp_tls("gdc.cddis.eosdis.nasa.gov") as ftps: + """Downloads a single file from the CDDIS ftp server + + :param str filename: Name of the file to download + :param str ftp_folder: Folder where the file is stored on the remote server + :param _Path output_folder: Local folder to store the output file + :param int max_retries: Number of retries before raising error, defaults to 3 + :param bool decompress: If true, decompresses files on download, defaults to True + :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" + :param str note_filetype: How to label the file for STDOUT messages, defaults to None + :raises e: Raise any error that is run into by ftplib + :return _Path: The pathlib.Path of the downloaded file + """ + with ftp_tls(CDDIS_FTP) as ftps: ftps.cwd(ftp_folder) retries = 0 download_done = False @@ -834,7 +706,7 @@ def download_file_from_cddis( return download_filepath -def download_multiple_files_from_cddis(files: [str], ftp_folder: str, output_folder: _Path) -> None: +def download_multiple_files_from_cddis(files: _List[str], ftp_folder: str, output_folder: _Path) -> None: """Downloads multiple files in a single folder from cddis in a thread pool. :param files: List of str filenames @@ -846,325 +718,201 @@ def download_multiple_files_from_cddis(files: [str], ftp_folder: str, output_fol list(executor.map(download_file_from_cddis, files, _repeat(ftp_folder), _repeat(output_folder))) -# TODO: Deprecate? Only supports legacy filenames -def download_prod( - dates, - dest, - ac="igs", - suff="", - f_type="sp3", - dwn_src="cddis", - ftps=False, - f_dict=False, - wkly_file=False, - repro3=False, -): - """ - Function used to get the product file/s from download server of choice, default: CDDIS - - Input: - dest - destination (str) - ac - Analysis Center / product of choice (e.g. igs, igr, cod, jpl, gfz, default = igs) - suff - optional suffix added to file name (e.g. _0 or _06 for ultra-rapid products) - f_type - file type to download (e.g. clk, cls, erp, sp3, sum, default = sp3) - dwn_src - Download Source (e.g. cddis, ga) - ftps - Optionally input active ftps connection object - wkly_file - optionally grab the weekly file rather than the daily - repro3 - option to download the REPRO3 version of the file - - """ - - # Convert input to list of datetime dates (if not already) - if (type(dates) == list) & (type(dates[0]) == _datetime.date): - dt_list = dates - else: - dt_list = dates_type_convert(dates) - - # File types should be converted to lists also, if not already so - if isinstance(f_type, list): - f_types = f_type - else: - f_types = [f_type] - - # Create directory if doesn't exist: - if not _Path(dest).is_dir(): - _Path(dest).mkdir(parents=True) - - # Create list to hold filenames that will be downloaded: - if f_dict: - f_dict = {f_typ: [] for f_typ in f_types} - - # Connect to ftps if not already: - if not ftps: - # Connect to chosen server - if dwn_src == "cddis": - logging.info("\nGathering product files...") - ftps = connect_cddis(verbose=True) - p_gpswk = 0 - else: - p_gpswk = 0 - - for dt in dt_list: - for f_typ in f_types: - if dwn_src == "cddis": - if repro3: - f, gpswk = gen_prod_filename(dt, pref=ac, suff=suff, f_type=f_typ, repro3=True) - elif (ac == "igs") and (f_typ == "erp"): - f, gpswk = gen_prod_filename(dt, pref=ac, suff="7", f_type=f_typ, wkly_file=True) - elif f_typ == "snx": - mr_file, ftps, gpswk = find_mr_file(dt, f_typ, ac, ftps) - f = mr_file - elif wkly_file: - f, gpswk = gen_prod_filename(dt, pref=ac, suff=suff, f_type=f_typ, wkly_file=True) - else: - f, gpswk = gen_prod_filename(dt, pref=ac, suff=suff, f_type=f_typ) - - if not check_file_present(comp_filename=f, dwndir=dest): - # gpswk = dt2gpswk(dt) - if gpswk != p_gpswk: - ftps.cwd("/") - ftps.cwd(f"gnss/products/{gpswk}") - if repro3: - ftps.cwd(f"repro3") - - if f_typ == "rnx": - ftps.cwd("/") - ftps.cwd(f"gnss/data/daily/{dt.year}/brdc") - success = check_n_download( - f, dwndir=dest, ftps=ftps, uncomp=True, remove_crx=True, no_check=True - ) - ftps.cwd("/") - ftps.cwd(f"gnss/products/{gpswk}") - else: - success = check_n_download( - f, dwndir=dest, ftps=ftps, uncomp=True, remove_crx=True, no_check=True - ) - p_gpswk = gpswk - else: - success = True - if f_dict and success: - f_uncomp = gen_uncomp_filename(f) - if f_uncomp not in f_dict[f_typ]: - f_dict[f_typ].append(f_uncomp) - +def download_product_from_cddis( + download_dir: _Path, + start_epoch: _datetime, + end_epoch: _datetime, + file_ext: str, + limit: int = None, + long_filename: _Optional[bool] = None, + analysis_center: str = "IGS", + solution_type: str = "ULT", + sampling_rate: str = "15M", + project_type: str = "OPS", + timespan: _datetime.timedelta = _datetime.timedelta(days=2), + if_file_present: str = "prompt_user", +) -> None: + """Download the file/s from CDDIS based on start and end epoch, to the download directory (download_dir) + + :param _Path download_dir: Where to download files (local directory) + :param _datetime start_epoch: Start date/time of files to find and download + :param _datetime end_epoch: End date/time of files to find and download + :param str file_ext: Extension of files to download (e.g. SP3, CLK, ERP, etc) + :param int limit: Variable to limit the number of files downloaded, defaults to None + :param bool long_filename: Search for IGS long filename, if None use start_epoch to determine, defaults to None + :param str analysis_center: Which analysis center's files to download (e.g. COD, GFZ, WHU, etc), defaults to "IGS" + :param str solution_type: Which solution type to download (e.g. ULT, RAP, FIN), defaults to "ULT" + :param str sampling_rate: Sampling rate of file to download, defaults to "15M" + :param str project_type: Project type of file to download (e.g. ), defaults to "OPS" + :param _datetime.timedelta timespan: Timespan of the file/s to download, defaults to _datetime.timedelta(days=2) + :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" + :raises FileNotFoundError: Raise error if the specified file cannot be found on CDDIS + """ + # Download the correct IGS FIN ERP files + if file_ext == "ERP" and analysis_center == "IGS" and solution_type == "FIN": # get the correct start_epoch + start_epoch = GPSDate(str(start_epoch)) + start_epoch = gpswkD2dt(f"{start_epoch.gpswk}0") + timespan = _datetime.timedelta(days=7) + # Details for debugging purposes: + logging.debug("Attempting CDDIS Product download/s") + logging.debug(f"Start Epoch - {start_epoch}") + logging.debug(f"End Epoch - {end_epoch}") + if long_filename == None: + long_filename = long_filename_cddis_cutoff(start_epoch) + + reference_start = _deepcopy(start_epoch) + product_filename, gps_date, reference_start = generate_product_filename( + reference_start, + file_ext, + long_filename=long_filename, + analysis_center=analysis_center, + timespan=timespan, + solution_type=solution_type, + sampling_rate=sampling_rate, + project=project_type, + ) + logging.debug( + f"Generated filename: {product_filename}, with GPS Date: {gps_date.gpswkD} and reference: {reference_start}" + ) + with ftp_tls(CDDIS_FTP) as ftps: + try: + ftps.cwd(f"gnss/products/{gps_date.gpswk}") + except _ftplib.all_errors as e: + logging.info(f"{reference_start} too recent") + logging.info(f"ftp_lib error: {e}") + product_filename, gps_date, reference_start = generate_product_filename( + reference_start, + file_ext, + shift=-6, + long_filename=long_filename, + analysis_center=analysis_center, + timespan=timespan, + solution_type=solution_type, + sampling_rate=sampling_rate, + project=project_type, + ) + ftps.cwd(f"gnss/products/{gps_date.gpswk}") + + all_files = ftps.nlst() + if not (product_filename in all_files): + logging.info(f"{product_filename} not in gnss/products/{gps_date.gpswk} - too recent") + raise FileNotFoundError + + # reference_start will be changed in the first run through while loop below + reference_start -= _datetime.timedelta(hours=24) + count = 0 + remain = end_epoch - reference_start + while remain.total_seconds() > timespan.total_seconds(): + if count == limit: + remain = _datetime.timedelta(days=0) else: - for dt in dt_list: - for f_typ in f_types: - f = gen_prod_filename(dt, pref=ac, suff=suff, f_type=f_typ) - success = check_n_download( - f, dwndir=dest, ftps=ftps, uncomp=True, remove_crx=True, no_check=True - ) - if f_dict and success: - f_uncomp = gen_uncomp_filename(f) - if f_uncomp not in f_dict[f_typ]: - f_dict[f_typ].append(f_uncomp) - if f_dict: - return f_dict - - -def download_pea_prods( - dest, - most_recent=True, - dates=None, - ac="igs", - out_dict=False, - trop_vmf3=False, - brd_typ="igs", - snx_typ="igs", - clk_sel="clk", - repro3=False, -): - """ - Download necessary pea product files for date/s provided - """ - if dest[-1] != "/": - dest += "/" - - if most_recent: - snx_vars_out = download_most_recent( - dest=dest, f_type="snx", ac=snx_typ, dwn_src="cddis", f_dict_out=True, gpswkD_out=True, ftps_out=True + product_filename, gps_date, reference_start = generate_product_filename( + reference_start, + file_ext, + shift=24, # Shift at the start of the loop - speeds up total download time + long_filename=long_filename, + analysis_center=analysis_center, + timespan=timespan, + solution_type=solution_type, + sampling_rate=sampling_rate, + project=project_type, + ) + download_filepath = check_whether_to_download( + filename=product_filename, download_dir=download_dir, if_file_present=if_file_present + ) + if download_filepath: + download_file_from_cddis( + filename=product_filename, + ftp_folder=f"gnss/products/{gps_date.gpswk}", + output_folder=download_dir, + if_file_present=if_file_present, + note_filetype=file_ext, + ) + count += 1 + remain = end_epoch - reference_start + + +def download_atx(download_dir: _Path, reference_frame: str = "IGS20", if_file_present: str = "prompt_user") -> _Path: + """Download the ATX file necessary for running the PEA provided the download directory (download_dir) + + :param _Path download_dir: Where to download files (local directory) + :param str reference_frame: Coordinate reference frame file to download, defaults to "IGS20" + :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" + :raises ValueError: If an invalid option is given for reference_frame variable + :return _Path: The pathlib.Path of the downloaded file + """ + match reference_frame: + case "IGS20": + atx_filename = "igs20.atx" + case "IGb14": + atx_filename = "igs14.atx" + case _: + raise ValueError("Invalid value passed for reference_frame var. Must be either 'IGS20' or 'IGb14'") + + ensure_folders([download_dir]) + + url_igs = IGS_FILES_URL + f"station/general/{atx_filename}" + url_bern = BERN_URL + "BSWUSER54/REF/I20.ATX" + + try: + download_filepath = attempt_url_download( + download_dir=download_dir, + url=url_igs, + filename=atx_filename, + type_of_file="ATX", + if_file_present=if_file_present, ) - f_dict, gpswkD_out, ftps = snx_vars_out - - clk_vars_out = download_most_recent( - dest=dest, f_type=clk_sel, ac=ac, dwn_src="cddis", f_dict_out=True, gpswkD_out=True, ftps_out=True + except: + download_filepath = attempt_url_download( + download_dir=download_dir, + url=url_bern, + filename=atx_filename, + type_of_file="ATX", + if_file_present=if_file_present, ) - f_dict_update, gpswkD_out, ftps = clk_vars_out - f_dict.update(f_dict_update) - gpswkD = gpswkD_out["clk_gpswkD"][0] - - if most_recent == True: - num = 1 - else: - num = most_recent - - dt0 = gpswkD2dt(gpswkD) - dtn = dt0 - _datetime.timedelta(days=num - 1) + return download_filepath - if dtn == dt0: - dt_list = [dt0] - else: - dates = _pd.date_range(start=str(dtn), end=str(dt0), freq="1D") - dates = list(dates) - dates.reverse() - dt_list = sorted(dates_type_convert(dates)) - else: - dt_list = sorted(dates_type_convert(dates)) - - dest_pth = _Path(dest) - # Output dict for the files that are downloaded - if not out_dict: - out_dict = {"dates": dt_list, "atxfiles": ["igs14.atx"], "blqfiles": ["OLOAD_GO.BLQ"]} - - # Get the ATX file if not present already: - if not (dest_pth / "igs14.atx").is_file(): - if not dest_pth.is_dir(): - dest_pth.mkdir(parents=True) - url = "https://files.igs.org/pub/station/general/igs14.atx" - check_n_download_url(url, dwndir=dest) - - # Get the BLQ file if not present already: - if not (dest_pth / "OLOAD_GO.BLQ").is_file(): - url = "https://peanpod.s3-ap-southeast-2.amazonaws.com/pea/examples/EX03/products/OLOAD_GO.BLQ" - check_n_download_url(url, dwndir=dest) - - # For the troposphere, have two options: gpt2 or vmf3. If flag is set to True, download 6-hourly trop files: - if trop_vmf3: - # If directory for the Tropospheric model files doesn't exist, create it: - if not (dest_pth / "grid5").is_dir(): - (dest_pth / "grid5").mkdir(parents=True) - for dt in dt_list: - year = dt.strftime("%Y") - # Create urls to the four 6-hourly files associated with the tropospheric model - begin_url = f"https://vmf.geo.tuwien.ac.at/trop_products/GRID/5x5/VMF3/VMF3_OP/{year}/" - f_begin = "VMF3_" + dt.strftime("%Y%m%d") + ".H" - urls = [begin_url + f_begin + en for en in ["00", "06", "12", "18"]] - urls.append(begin_url + "VMF3_" + (dt + _datetime.timedelta(days=1)).strftime("%Y%m%d") + ".H00") - # Run through model files, downloading if they are not in directory - for url in urls: - if not (dest_pth / f"grid5/{url[-17:]}").is_file(): - check_n_download_url(url, dwndir=str(dest_pth / "grid5")) - else: - # Otherwise, check for GPT2 model file or download if necessary: - if not (dest_pth / "gpt_25.grd").is_file(): - url = "https://peanpod.s3-ap-southeast-2.amazonaws.com/pea/examples/EX03/products/gpt_25.grd" - check_n_download_url(url, dwndir=dest) - - if repro3: - snx_typ = ac - standards = ["sp3", "erp", clk_sel] - ac_typ_dict = {ac_sel: [] for ac_sel in [ac, brd_typ, snx_typ]} - for typ in standards: - ac_typ_dict[ac].append(typ) - ac_typ_dict[brd_typ].append("rnx") - - if not most_recent: - f_dict = {} - ac_typ_dict[snx_typ].append("snx") - - # Download product files of each type from CDDIS for the given dates: - for ac in ac_typ_dict: - if most_recent: - f_dict_update = download_prod( - dates=dt_list, dest=dest, ac=ac, f_type=ac_typ_dict[ac], dwn_src="cddis", f_dict=True, ftps=ftps - ) - elif repro3: - f_dict_update = download_prod( - dates=dt_list, dest=dest, ac=ac, f_type=ac_typ_dict[ac], dwn_src="cddis", f_dict=True, repro3=True - ) - else: - f_dict_update = download_prod( - dates=dt_list, dest=dest, ac=ac, f_type=ac_typ_dict[ac], dwn_src="cddis", f_dict=True - ) - f_dict.update(f_dict_update) - f_types = [] - for el in list(ac_typ_dict.values()): - for typ in el: - f_types.append(typ) - if most_recent: - f_types.append("snx") +def download_satellite_metadata_snx(download_dir: _Path, if_file_present: str = "prompt_user") -> _Path: + """Download the most recent IGS satellite metadata file - # Prepare the output dictionary based on the downloaded files: - for f_type in f_types: - if f_type == "rnx": - out_dict[f"navfiles"] = sorted(f_dict[f_type]) - out_dict[f"{f_type}files"] = sorted(f_dict[f_type]) + :param _Path download_dir: Where to download files (local directory) + :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" + :return _Path: The pathlib.Path of the downloaded file + """ + ensure_folders([download_dir]) + download_filepath = attempt_url_download( + download_dir=download_dir, + url=IGS_FILES_URL + "station/general/igs_satellite_metadata.snx", + filename="igs_satellite_metadata.snx", + type_of_file="IGS satellite metadata", + if_file_present=if_file_present, + ) + return download_filepath - return out_dict +def download_yaw_files(download_dir: _Path, if_file_present: str = "prompt_user") -> _List[_Path]: + """Download yaw rate / bias files needed to for Ginan's PEA -def download_rinex3(dates, stations, dest, dwn_src="cddis", ftps=False, f_dict=False): - """ - Function used to get the RINEX3 observation file from download server of choice, default: CDDIS + :param _Path download_dir: Where to download files (local directory) + :param str if_file_present: What to do if file already present: "replace", "dont_replace", defaults to "prompt_user" + :return _List[_Path]: Return list of download files """ - if dest[-1] != "/": - dest += "/" - # Convert input to list of datetime dates (if not already) - dt_list = dates_type_convert(dates) - - if isinstance(stations, str): - stations = [stations] - - # Create directory if doesn't exist: - if not _Path(dest).is_dir(): - _Path(dest).mkdir(parents=True) - - if f_dict: - f_dict = {"rnxfiles": []} - - # Connect to ftps if not already: - if not ftps: - # Connect to chosen server - if dwn_src == "cddis": - logging.info("\nGathering RINEX files...") - ftps = connect_cddis(verbose=True) - p_date = 0 - - for dt in dt_list: - for station in stations: - f_pref = f"{station}_R_" - f_suff_crx = f"0000_01D_30S_MO.crx.gz" - f = f_pref + dt.strftime("%Y%j") + f_suff_crx - - if not check_file_present(comp_filename=f, dwndir=dest): - if p_date == dt: - try: - success = check_n_download( - f, dwndir=dest, ftps=ftps, uncomp=True, remove_crx=True, no_check=True - ) - except: - logging.error(f"Download of {f} failed - file not found") - success = False - else: - ftps.cwd("/") - ftps.cwd(f"gnss/data/daily{dt.strftime('/%Y/%j/%yd/')}") - try: - success = check_n_download( - f, dwndir=dest, ftps=ftps, uncomp=True, remove_crx=True, no_check=True - ) - except: - logging.error(f"Download of {f} failed - file not found") - success = False - p_date = dt - else: - success = True - if f_dict and success: - f_dict["rnxfiles"].append(gen_uncomp_filename(f)) - else: - for dt in dt_list: - for station in stations: - f_pref = f"{station}_R_" - f_suff_crx = f"0000_01D_30S_MO.crx.gz" - f = f_pref + dt.strftime("%Y%j") + f_suff_crx - if not check_file_present(comp_filename=f, dwndir=dest): - success = check_n_download(f, dwndir=dest, ftps=ftps, uncomp=True, remove_crx=True, no_check=True) - else: - success = True - if f_dict and success: - f_dict["rnxfiles"].append(gen_uncomp_filename(f)) - if f_dict: - return f_dict + ensure_folders([download_dir]) + download_filepaths = [] + files = ["bds_yaw_modes.snx.gz", "qzss_yaw_modes.snx.gz", "sat_yaw_bias_rate.snx.gz"] + for filename in files: + download_filepath = attempt_url_download( + download_dir=download_dir, + url=PRODUCT_BASE_URL + "tables/" + filename, + filename=filename, + type_of_file="Yaw Model SNX", + if_file_present=if_file_present, + ) + if download_filepath: + download_filepaths.append(decompress_file(download_filepath, delete_after_decompression=True)) + + return download_filepaths def get_vars_from_file(path): diff --git a/gnssanalysis/gn_utils.py b/gnssanalysis/gn_utils.py index dd7062b..6dbd974 100644 --- a/gnssanalysis/gn_utils.py +++ b/gnssanalysis/gn_utils.py @@ -5,7 +5,7 @@ import click as _click -from typing import List +from typing import List as _List def diffutil_verify_input(input): @@ -52,6 +52,54 @@ def get_filetype(path): return suffix +def configure_logging(verbose: bool, output_logger: bool = False) -> _logging.Logger | None: + """Configure the logger object with the level of verbosity requested and output if desired + + :param bool verbose: Verbosity of logger object to use for encoding logging strings, True: DEBUG, False: INFO + :param bool output_logger: Flag to indicate whether to output the Logger object, defaults to False + :return _logging.Logger | None: Return the logger object or None (based on output_logger) + """ + if verbose: + logging_level = _logging.DEBUG + else: + logging_level = _logging.INFO + _logging.basicConfig(format="%(asctime)s [%(funcName)s] %(levelname)s: %(message)s") + _logging.getLogger().setLevel(logging_level) + if output_logger: + return _logging.getLogger() + else: + return None + + +def ensure_folders(paths: _List[_pathlib.Path]): + """Ensures the folders in the input list exist in the file system - if not, create them + + :param _List[_pathlib.Path] paths: list of pathlib.Path/s to check + """ + for path in paths: + if not isinstance(path, _pathlib.Path): + path = _pathlib.Path(path) + if not path.is_dir(): + path.mkdir(parents=True, exist_ok=True) + + +def delete_entire_directory(directory: _pathlib.Path): + """Recursively delete a directory, including all subdirectories and files in subdirectories + + :param Path directory: Directory to recursively delete + """ + # First, iterate through all the files and subdirectories + for item in directory.iterdir(): + if item.is_dir(): + # Recursively delete subdirectories + delete_entire_directory(item) + else: + # Delete files + item.unlink() + # Finally, delete the empty directory itself + directory.rmdir() + + @_click.group(invoke_without_command=True) @_click.option( "-i", @@ -577,12 +625,7 @@ def orbq( """ from gnssanalysis import gn_io, gn_aux, gn_diffaux - _logging.basicConfig(level="INFO") # seems that logging can only be configured before the first logging call - logger = _logging.getLogger() - # if verbose: - # logger.setLevel(_logging.INFO) - # else: - # _logging.disable() + logger = configure_logging(verbose=True, output_logger=True) sp3_a = gn_io.sp3.read_sp3(input[0], nodata_to_nan=nodata_to_nan) sp3_b = gn_io.sp3.read_sp3(input[1], nodata_to_nan=nodata_to_nan) @@ -770,12 +813,8 @@ def clkq( """ from gnssanalysis import gn_io, gn_aux, gn_diffaux, gn_const - _logging.basicConfig(level="INFO") # seems that logging can only be configured before the first logging call - logger = _logging.getLogger() - if verbose: - logger.setLevel(_logging.INFO) - else: - _logging.disable() + logger = configure_logging(verbose=verbose, output_logger=True) + clk_a, clk_b = gn_io.clk.read_clk(input_clk_paths[0]), gn_io.clk.read_clk(input_clk_paths[1]) if reject_re is not None: logger.log(msg=f"Excluding satellites based on regex expression: '{reject_re}'", level=_logging.INFO) diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 0000000..c1deead --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,66 @@ +import unittest +import logging +from unittest.mock import patch, mock_open +from pyfakefs.fake_filesystem_unittest import TestCase +from pathlib import Path, PosixPath + +from gnssanalysis.gn_utils import delete_entire_directory +import gnssanalysis.gn_utils as ga_utils + + +class TestUtils(TestCase): + def setUp(self): + self.setUpPyfakefs() + # Create directory + self.test_dir_1 = "/test_dir_1" + self.test_dir_2 = "/test_dir_2/a/b/" + Path(self.test_dir_1).mkdir(exist_ok=True) + Path(self.test_dir_2).mkdir(exist_ok=True, parents=True) + + def tearDown(self): + # Clean up test directory after tests: + if Path(self.test_dir_1).is_dir(): + delete_entire_directory(Path(self.test_dir_1)) + if Path(self.test_dir_2).is_dir(): + delete_entire_directory(Path(self.test_dir_2)) + + def test_ensure_folders(self): + + # Verify directories that do and dont exist: + self.assertTrue(Path(self.test_dir_1).is_dir()) + self.assertFalse((Path(self.test_dir_1) / "a/").is_dir()) + self.assertFalse((Path(self.test_dir_1) / "a/b/").is_dir()) + self.assertTrue(Path(self.test_dir_2).is_dir()) + self.assertFalse((Path(self.test_dir_2) / "c/d/").is_dir()) + + # Use ensure_folders function to create various + ga_utils.ensure_folders([self.test_dir_1, self.test_dir_1 + "/a/b/", self.test_dir_2]) + + # Verify directories that do and dont exist: + self.assertTrue(Path(self.test_dir_1).is_dir()) + self.assertTrue((Path(self.test_dir_1) / "a/").is_dir()) + self.assertTrue((Path(self.test_dir_1) / "a/b/").is_dir()) + self.assertTrue(Path(self.test_dir_2).is_dir()) + self.assertFalse((Path(self.test_dir_2) / "c/d/").is_dir()) + + def test_configure_logging(self): + + # Set up verbose logger: + logger_verbose = ga_utils.configure_logging(verbose=True, output_logger=True) + + # Verify + self.assertEqual(type(logger_verbose), logging.RootLogger) + self.assertEqual(logger_verbose.level, 10) + + # Set up not verbose logger: + logger_not_verbose = ga_utils.configure_logging(verbose=False, output_logger=True) + + # Verify + self.assertEqual(type(logger_not_verbose), logging.RootLogger) + self.assertEqual(logger_not_verbose.level, 20) + + # Set up logger without output: + logger_not_output = ga_utils.configure_logging(verbose=True, output_logger=False) + + # Verify + self.assertEqual(logger_not_output, None)