Skip to content

Commit

Permalink
Merge branch 'main' into NPI-3669-utility-func-for-minimal-sp3-genera…
Browse files Browse the repository at this point in the history
…tion
  • Loading branch information
treefern committed Jan 7, 2025
2 parents 8f45be9 + f96026c commit d5b7ea4
Show file tree
Hide file tree
Showing 6 changed files with 769 additions and 101 deletions.
279 changes: 224 additions & 55 deletions gnssanalysis/gn_io/igslog.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
"""IGS log files parser"""

import logging
import glob as _glob
import re as _re
from multiprocessing import Pool as _Pool
from typing import Union, List, Tuple

import numpy as _np
import pandas as _pd
Expand All @@ -11,9 +13,10 @@
from .. import gn_frame as _gn_frame
from .. import gn_io as _gn_io
from .. import gn_transform as _gn_transform
from tqdm import tqdm

_REGEX_ID = _re.compile(
logger = logging.getLogger(__name__)

_REGEX_ID_V1 = _re.compile(
rb"""
(?:Four\sCharacter\sID|Site\sID)\s+\:\s*(\w{4}).*\W+
.*\W+
Expand All @@ -23,7 +26,17 @@
_re.IGNORECASE | _re.VERBOSE,
)

_REGEX_LOC = _re.compile(
_REGEX_ID_V2 = _re.compile(
rb"""
(?:Nine\sCharacter\sID|Site\sID)\s+\:\s*(\w{4}).*\W+
.*\W+
(?:\s{25}.+\W+|)
IERS.+\:\s*(\w{9}|)
""",
_re.IGNORECASE | _re.VERBOSE,
)

_REGEX_LOC_V1 = _re.compile(
rb"""
2.+\W+City\sor\sTown\s+\:\s*(\w[^\(\n\,/\?]+|).*\W+
State.+\W+Country\s+\:\s*([^\(\n\,\d]+|).*\W+(?:\s{25}.+\W+|)
Expand All @@ -38,6 +51,21 @@
_re.IGNORECASE | _re.VERBOSE,
)

_REGEX_LOC_V2 = _re.compile(
rb"""
2.+\W+City\sor\sTown\s+\:\s*(\w[^\(\n\,/\?]+|).*\W+
State.+\W+Country\sor\sRegion\s+\:\s*([^\(\n\,\d]+|).*\W+(?:\s{25}.+\W+|)
Tectonic.+\W+(?:\s{25}.+\W+|).+\W+
X.{22}\:?\s*([\d\-\+\.\,]+|).*\W+
Y.{22}\:?\s*([\d\-\+\.\,]+|).*\W+
Z.{22}\:?\s*([\d\-\+\.\,]+|).*\W+
Latitude.+\:\s*([\d\.\,\-\+]+|).*\W+
Longitud.+\:\s*([\d\.\,\-\+]+|).*\W+
Elevatio.+\:\s*([\d\.\,\-\+]+|).*
""",
_re.IGNORECASE | _re.VERBOSE,
)


_REGEX_REC = _re.compile(
rb"""
Expand Down Expand Up @@ -77,12 +105,21 @@
_REGEX_LOGNAME = r"(?:.*\/)(\w{4})(?:\w+_(\d{8})|_(\d{8})\-?\w?|(\d{8})|_.*|\d+|)"


def find_recent_logs(logs_glob_path: str, rnx_glob_path=None) -> _pd.DataFrame:
"""Takes glob expression to get the list of log files,
parses names into site and date and selects the ones
with most recent date
/data/station_logs/station_logs_IGS/*/*.log
/data/acs/pea/proc/exs/data/*.rnx"""
class LogVersionError(Exception):
"""
Log file does not conform to known IGS version standard
"""

pass


def find_recent_logs(logs_glob_path: str, rnx_glob_path: str = None) -> _pd.DataFrame:
"""Takes glob expression to create list of logs, parses names into site and date and selects most recent ones
:param str logs_glob_path: A glob expression for log files, e.g. /data/station_logs_IGS/*/*.log
:param str rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None
:return _pd.DataFrame: Returns a dataframe containing information from all station logs processed
"""
paths = _pd.Series(_glob.glob(pathname=logs_glob_path, recursive=False), name="PATH")

logs_df = paths.str.extract(expand=True, pat=_REGEX_LOGNAME)
Expand All @@ -101,39 +138,136 @@ def find_recent_logs(logs_glob_path: str, rnx_glob_path=None) -> _pd.DataFrame:
return recent_logs_df


def parse_igs_log(filename_array: _np.ndarray) -> _np.ndarray:
"""Parses igs log and outputs ndarray with parsed data
Expects ndarray of the form [CODE DATE PATH]"""
file_code, __, file_path = filename_array
_REGEX_VERSION_1 = _re.compile(rb"""(site log\))""")
_REGEX_VERSION_2 = _re.compile(rb"""(site log v2)""")

with open(file_path, "rb") as file:
data = file.read()

blk_id = _REGEX_ID.search(data)
if blk_id is None:
tqdm.write(f"ID rejected from {file_path}")
def determine_log_version(data: bytes) -> str:
"""Given the byes object that results from reading an IGS log file, determine the version ("v1.0" or "v2.0")
:param bytes data: IGS log file bytes object to determine the version of
:return str: Return the version number: "v1.0" or "v2.0" (or "Unknown" if file does not conform to standard)
"""

result_v1 = _REGEX_VERSION_1.search(data)
if result_v1:
return "v1.0"

result_v2 = _REGEX_VERSION_2.search(data)
if result_v2:
return "v2.0"

raise LogVersionError("Log file does not conform to any known IGS version")


def extract_id_block(data: bytes, file_path: str, file_code: str, version: str = None) -> Union[List[str], _np.array]:
"""Extract the site ID block given the bytes object read from an IGS site log file
:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:param str file_code: Code from the filename_array passed to the parse_igs_log() function
:param str version: Version number of log file (e.g. "v2.0") - determined if version=None, defaults to None
:raises LogVersionError: Raises an error if an unknown version string is passed in
:return bytes: The site ID block of the IGS site log
"""
if version == None:
version = determine_log_version(data)

if version == "v1.0":
_REGEX_ID = _REGEX_ID_V1
elif version == "v2.0":
_REGEX_ID = _REGEX_ID_V2
else:
raise LogVersionError("Incorrect version string passed to the extract_id_block() function")

id_block = _REGEX_ID.search(data)
if id_block is None:
logger.warning(f"ID rejected from {file_path}")
return _np.array([]).reshape(0, 12)

blk_id = [blk_id[1].decode().upper(), blk_id[2].decode().upper()] # no .groups() thus 1 and 2
code = blk_id[0]
id_block = [id_block[1].decode().upper(), id_block[2].decode().upper()] # no .groups() thus 1 and 2
code = id_block[0]
if code != file_code:
tqdm.write(f"{code}!={file_code} at {file_path}")
logger.warning(f"{code}!={file_code} at {file_path}")
return _np.array([]).reshape(0, 12)
return id_block

blk_loc = _REGEX_LOC.search(data)
if blk_loc is None:
tqdm.write(f"LOC rejected from {file_path}")

def extract_location_block(data: bytes, file_path: str, version: str = None) -> Union[_re.Match[bytes], _np.array]:
"""Extract the location block given the bytes object read from an IGS site log file
:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:param str version: Version number of log file (e.g. "v2.0") - determined if version=None, defaults to None
:raises LogVersionError: Raises an error if an unknown version string is passed in
:return bytes: The location block of the IGS site log
"""
if version == None:
version = determine_log_version(data)

if version == "v1.0":
_REGEX_LOC = _REGEX_LOC_V1
elif version == "v2.0":
_REGEX_LOC = _REGEX_LOC_V2
else:
raise LogVersionError("Incorrect version string passed to extract_location_block() function")

location_block = _REGEX_LOC.search(data)
if location_block is None:
logger.warning(f"LOC rejected from {file_path}")
return _np.array([]).reshape(0, 12)
return location_block


blk_rec = _REGEX_REC.findall(data)
if blk_rec == []:
tqdm.write(f"REC rejected from {file_path}")
def extract_receiver_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes]], _np.array]:
"""Extract the location block given the bytes object read from an IGS site log file
:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:return List[Tuple[bytes]]: The receiver block of the data. Each list element specifies an receiver
"""
receiver_block = _REGEX_REC.findall(data)
if receiver_block == []:
logger.warning(f"REC rejected from {file_path}")
return _np.array([]).reshape(0, 12)
return receiver_block


blk_ant = _REGEX_ANT.findall(data)
if blk_ant == []:
tqdm.write(f"ANT rejected from {file_path}")
def extract_antenna_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes]], _np.array]:
"""Extract the antenna block given the bytes object read from an IGS site log file
:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:return List[Tuple[bytes]]: The antenna block of the data. Each list element specifies an antenna
"""
antenna_block = _REGEX_ANT.findall(data)
if antenna_block == []:
logger.warning(f"ANT rejected from {file_path}")
return _np.array([]).reshape(0, 12)
return antenna_block


def parse_igs_log(filename_array: _np.ndarray) -> Union[_np.ndarray, None]:
"""Parses igs log and outputs ndarray with parsed data
:param _np.ndarray filename_array: Metadata on input log file. Expects ndarray of the form [CODE DATE PATH]
:return _np.ndarray: Returns array with data from the IGS log file parsed
"""
file_code, _, file_path = filename_array

with open(file_path, "rb") as file:
data = file.read()

try:
version = determine_log_version(data)
except LogVersionError as e:
logger.warning(f"Error: {e}, skipping parsing the log file")
return

blk_id = extract_id_block(data, version, file_path, file_code)
blk_loc = extract_location_block(data, version, file_path)
blk_rec = extract_receiver_block(data, file_path)
blk_ant = extract_antenna_block(data, file_path)

blk_loc = [group.decode(encoding="utf8", errors="ignore") for group in blk_loc.groups()]
blk_rec = _np.asarray(blk_rec, dtype=str)
Expand Down Expand Up @@ -166,9 +300,12 @@ def parse_igs_log(filename_array: _np.ndarray) -> _np.ndarray:
return _np.concatenate([blk_uni, file_path_arr], axis=1)


def igslogdate2datetime64(stacked_rec_ant_dt: _np.ndarray):
"""2010-01-01T00:00
- can be any non-space character. If parsing fails - None"""
def igslogdate2datetime64(stacked_rec_ant_dt: _np.ndarray) -> _np.datetime64:
"""Function to convert datetimes for IGS log files to np.datetime64 objects, e.g. 2010-01-01T00:00
:param _np.ndarray stacked_rec_ant_dt: Array of IGS log datetimes to convert but need to be non-space values
:return _np.datetime64: Return datetime64 object - if parsing fails returns None
"""
dt_array_float = (
_pd.Series(stacked_rec_ant_dt)
.str.extract(pat=r"(\d{4})\S?(\d{2})\S?(\d+)\D?(?:(\d{1,2})\:(\d{1,2})\D?|)")
Expand Down Expand Up @@ -212,28 +349,39 @@ def igslogdate2datetime64(stacked_rec_ant_dt: _np.ndarray):
return dt_datetime64


def translate_series(series, translation):
"""changes values in the series according to the dictionary of old_value-new_value"""
def translate_series(series: _pd.Series, translation: dict) -> _pd.Series:
"""Changes values in the series according to the dictionary of input_value:output_value
:param _pd.Series series: _pd.Series to translate
:param dict translation: Dictionary that defines the translation (mapping) to carry out
:return _pd.Series: Return a _pd.Series with the resultant translation (mapping)
"""
series = series.copy()
series.index = series.values
series.update(translation)
return series


def gather_metadata(logs_glob_path="/data/station_logs/station_logs_IGS/*/*.log", rnx_glob_path=None, num_threads=1):
"""parses logiles found with glob expression"""
def gather_metadata(
logs_glob_path: str = "/data/station_logs/station_logs_IGS/*/*.log", rnx_glob_path: str = None, num_threads: int = 1
) -> List[_pd.DataFrame]:
"""Parses log files found with glob expressions into pd.DataFrames
:param str logs_glob_path: A glob expression for log files, defaults to "/data/station_logs_IGS/*/*.log"
:param str rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None
:param int num_threads: Number of threads to run, defaults to 1
:return List[_pd.DataFrame]: List of DataFrames with [ID, Receiver, Antenna] data
"""
parsed_filenames = find_recent_logs(logs_glob_path=logs_glob_path, rnx_glob_path=rnx_glob_path).values

total = parsed_filenames.shape[0]
if num_threads == 1:
gather = []
for file in tqdm(parsed_filenames, miniters=total // 100, total=total):
for file in parsed_filenames:
gather.append(parse_igs_log(file))
else:
with _Pool(processes=num_threads) as pool:
gather = list(
tqdm(pool.imap_unordered(parse_igs_log, parsed_filenames), total=total, miniters=total // 100)
)
gather = list(pool.imap_unordered(parse_igs_log, parsed_filenames))

gather_raw = _np.concatenate(gather)

Expand Down Expand Up @@ -356,8 +504,12 @@ def gather_metadata(logs_glob_path="/data/station_logs/station_logs_IGS/*/*.log"
return id_loc_df, rec_df, ant_df


def frame2snx_string(frame_of_day):
"""frame_of_day dataframe to ESTIMATE sinex block"""
def frame2snx_string(frame_of_day: _pd.DataFrame) -> str:
"""Convert frame_of_day dataframe to ESTIMATE sinex block
:param _pd.DataFrame frame_of_day: Dataframe defining the reference frame of the day of interest
:return str: Returns a sinex block string from the frame definition
"""
code_pt = frame_of_day.index.to_series().str.split("_", expand=True) # .to_frame().values
code_pt.columns = ["CODE", "PT"]
frame_dt = _gn_datetime.j20002datetime(frame_of_day.attrs["REF_EPOCH"])
Expand Down Expand Up @@ -409,8 +561,14 @@ def frame2snx_string(frame_of_day):
return buf


def meta2sting(id_loc_df, rec_df, ant_df):
"""Converts three metadata dataframe to sinex blocks (string)"""
def meta2string(id_loc_df: _pd.DataFrame, rec_df: _pd.DataFrame, ant_df: _pd.DataFrame) -> str:
"""Converts the three metadata dataframes (Site ID, Receiver, Antenna) to sinex block
:param _pd.DataFrame id_loc_df: Dataframe detailing Site IDs / Locations
:param _pd.DataFrame rec_df: Dataframe detailing Receiver information
:param _pd.DataFrame ant_df: Dataframe detailing Antenna information
:return str: Returns a Sinex block str (in standard IGS Sinex format)
"""
rec_df["S/N"] = rec_df["S/N"].str.slice(0, 5)
rec_df["FW"] = rec_df["FW"].str.slice(0, 11)

Expand Down Expand Up @@ -505,15 +663,26 @@ def meta2sting(id_loc_df, rec_df, ant_df):


def write_meta_gather_master(
logs_glob_path="/data/station_logs/*/*.log",
rnx_glob_path="/data/acs/pea/proc/exs/data/*.rnx",
frame_datetime=None,
frame_snx_path="/data/ITRF/itrf2014/ITRF2014-IGS-TRF.SNX.gz",
frame_soln_path="/data/ITRF/itrf2014/ITRF2014-soln-gnss.snx",
frame_psd_path="/data/ITRF/itrf2014/ITRF2014-psd-gnss.snx",
out_path="/data/meta_gather.snx",
num_threads=None,
):
logs_glob_path: str,
rnx_glob_path: str,
frame_snx_path: str,
frame_soln_path: str,
frame_psd_path: str,
frame_datetime: _np.datetime64 = None,
out_path: str = "/data/meta_gather.snx",
num_threads: int = 1,
) -> None:
"""Create a SNX file of stations, based on given reference frame projected to a datetime using site logs + rnxs
:param str logs_glob_path: A glob path to find desired log files, e.g. "/data/site_logs/*/*.log"
:param str rnx_glob_path: A glob path to find desired RNX files (optional), e.g. "/data/rinex-files/*.rnx"
:param str frame_snx_path: Path to reference frame sinex file, e.g. "/data/itrf2014/ITRF2014-IGS-TRF.SNX.gz"
:param str frame_soln_path: Path to solution file of reference frame, e.g. "/data/itrf2014/ITRF2014-soln-gnss.snx"
:param str frame_psd_path: Path to post-seismic deformation file, e.g. "/data/itrf2014/ITRF2014-psd-gnss.snx"
:param _np.datetime64 frame_datetime: Datetime to project the dataframe to, defaults to None
:param str out_path: Path of file to output, defaults to "/data/meta_gather.snx"
:param int num_threads: Number of threads to run on parsing log / rnx files, defaults to 1
"""
if frame_datetime is None:
frame_datetime = _np.datetime64("today")
else:
Expand Down Expand Up @@ -568,7 +737,7 @@ def write_meta_gather_master(
]
)
# ant/rec
buf.extend(meta2sting(id_loc_df, rec_df, ant_df))
buf.extend(meta2string(id_loc_df, rec_df, ant_df))
# projected coordinates
if gather_itrf is not None:
buf.extend(frame2snx_string(gather_itrf))
Expand Down
Loading

0 comments on commit d5b7ea4

Please sign in to comment.