Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: NPI-3678 Fixes for IGS site log v2 parser #67

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 114 additions & 36 deletions gnssanalysis/gn_io/igslog.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import glob as _glob
import re as _re
from multiprocessing import Pool as _Pool
from typing import Union, List, Tuple
from typing import Literal, Union, List, Tuple, get_args

import numpy as _np
import pandas as _pd
Expand All @@ -16,6 +16,18 @@

logger = logging.getLogger(__name__)

# Defines what IGS Site Log format versions we currently support.
# Example logs for the first two versions can be found at:
# Version 1: https://files.igs.org/pub/station/general/blank.log
# Version 2: https://files.igs.org/pub/station/general/blank_v2.0.log
SUPPORTED_IGS_LOG_VERSION_STRINGS = Literal["v1.0", "v2.0"]

# Regex to extract the version string i.e. '' for v1, 'v2.0' for v2, etc. from the *first line* of a site log file.
# Uses a named capture group 'log_version' to make error handling cleaner when checking for a result.
_REGEX_IGS_SITE_LOG_VERSION_STRING = _re.compile(
rb"""(?:Site Information Form \(site log) ?(?P<log_version>|v\d.\d)\)$"""
)

_REGEX_ID_V1 = _re.compile(
rb"""
(?:Four\sCharacter\sID|Site\sID)\s+\:\s*(\w{4}).*\W+
Expand Down Expand Up @@ -113,11 +125,11 @@ class LogVersionError(Exception):
pass


def find_recent_logs(logs_glob_path: str, rnx_glob_path: str = None) -> _pd.DataFrame:
def find_recent_logs(logs_glob_path: str, rnx_glob_path: Union[str, None] = None) -> _pd.DataFrame:
"""Takes glob expression to create list of logs, parses names into site and date and selects most recent ones

:param str logs_glob_path: A glob expression for log files, e.g. /data/station_logs_IGS/*/*.log
:param str rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None
:param Union[str, None] rnx_glob_path: A glob expression for rnx files, e.g. /data/pea/exs/data/*.rnx, defaults to None
:return _pd.DataFrame: Returns a dataframe containing information from all station logs processed
"""
paths = _pd.Series(_glob.glob(pathname=logs_glob_path, recursive=False), name="PATH")
Expand All @@ -138,29 +150,68 @@ def find_recent_logs(logs_glob_path: str, rnx_glob_path: str = None) -> _pd.Data
return recent_logs_df


_REGEX_VERSION_1 = _re.compile(rb"""(site log\))""")
_REGEX_VERSION_2 = _re.compile(rb"""(site log v2)""")
def ensure_supported_site_log_version(version_string: str):
"""
As the name implies, checks whether the given IGS Site Log version string is one we currently support, and raises
an exception if not.
NOTE: This function expects version 1 to be expressed as "v1.0", not an empty string '' as will be parsed by the regex.

:param str version_string: the version string to test
:raises LogVersionError: if the version string is not one we support.
"""
if not version_string in get_args(SUPPORTED_IGS_LOG_VERSION_STRINGS):
raise LogVersionError(f"IGS Log version '{version_string}' is not currently supported")

# No issue, implicitly return
pass


def determine_log_version(data: bytes) -> str:
"""Given the byes object that results from reading an IGS log file, determine the version ("v1.0" or "v2.0")
"""Given the byes object that results from reading an IGS site log file, determine the version e.g. "v1.0" or "v2.0"

:param bytes data: IGS log file bytes object to determine the version of
:return str: Return the version number: "v1.0" or "v2.0" (or "Unknown" if file does not conform to standard)
:return str: The version number string e.g. "v1.0", "v2.0", etc. Note: 'v1.0' is returned for version 1, which
could also be represented as an empty string as it is not present in the version 1 format.
:raises LogVersionError: If log file version doesn't match a known version string.
"""
first_data_line = data.split(b"\n")[0]

# Manual string manipulation based approach (as alternative to regex)
# Throw away everything up to and including '(site log', then the leading space if present before the >v1 version
# number, and finally the closing paren.
# I.e. from ' XXXXMRCCC Site Information Form (site log v2.0)', extract 'v2.0'.
# We work in binary for simplicity, but convert the eventual result into a string
# log_version = first_data_line.split(b"(site log")[1].strip(b" )").decode("utf-8")

# Try to find version string in first line of IGS Site Log file
if not (log_version_regex_match := _REGEX_IGS_SITE_LOG_VERSION_STRING.match(data)):
raise LogVersionError(
"Regex failed to find IGS Site Log version in first line of file: "
f"'{first_data_line.decode('utf-8', errors='ignore')}'"
)

result_v1 = _REGEX_VERSION_1.search(data)
if result_v1:
return "v1.0"
# Check that we actually got data in the relevant regex capture group (group name is 'log_version')
if not (log_version_str := log_version_regex_match.groupdict().get("log_version")):
raise LogVersionError(
"Regex found IGS Site Log version line, but failed to extract version from it: "
f"'{first_data_line.decode('utf-8', errors='ignore')}'"
)

result_v2 = _REGEX_VERSION_2.search(data)
if result_v2:
return "v2.0"
log_version_str = log_version_str.decode("utf-8")

raise LogVersionError("Log file does not conform to any known IGS version")
# Special case for version 1, which will be parsed as an empty string
if len(log_version_str) == 0:
log_version_str = "v1.0"

if not ensure_supported_site_log_version(log_version_str):
raise LogVersionError(f"IGS Log version '{log_version_str}' is not currently supported")

def extract_id_block(data: bytes, file_path: str, file_code: str, version: str = None) -> Union[List[str], _np.array]:
return log_version_str


def extract_id_block(
data: bytes, file_path: str, file_code: str, version: Union[str, None] = None
) -> Union[List[str], _np.ndarray]:
"""Extract the site ID block given the bytes object read from an IGS site log file

:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
Expand All @@ -171,14 +222,19 @@ def extract_id_block(data: bytes, file_path: str, file_code: str, version: str =
:return bytes: The site ID block of the IGS site log
"""
if version == None:
version = determine_log_version(data)
version = determine_log_version(data) # Raises exception if version not supported
# Check the version passed in is supported. Raises exception if not
ensure_supported_site_log_version(version)

if version == "v1.0":
_REGEX_ID = _REGEX_ID_V1
elif version == "v2.0":
_REGEX_ID = _REGEX_ID_V2
else:
raise LogVersionError("Incorrect version string passed to the extract_id_block() function")
raise NotImplementedError(
f"Internal error: version string passed in {version} was in set of supported versions, "
"but this function was not able to process it"
)

id_block = _REGEX_ID.search(data)
if id_block is None:
Expand All @@ -193,38 +249,52 @@ def extract_id_block(data: bytes, file_path: str, file_code: str, version: str =
return id_block


def extract_location_block(data: bytes, file_path: str, version: str = None) -> Union[_re.Match[bytes], _np.array]:
def extract_location_block(data: bytes, file_path: str, version: Union[str, None] = None) -> _np.ndarray:
"""Extract the location block given the bytes object read from an IGS site log file

:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:param str version: Version number of log file (e.g. "v2.0") - determined if version=None, defaults to None
:param str version: Version number of log file (e.g. "v2.0") - will be determined from input data unless
provided here.
:raises LogVersionError: Raises an error if an unknown version string is passed in
:return bytes: The location block of the IGS site log
:return _np.ndarray: The location block of the IGS site log, as a numpy NDArray of strings
"""
if version == None:
version = determine_log_version(data)
version = determine_log_version(data) # Raises exception if version not supported
# Check the version passed in is supported. Raises exception if not
ensure_supported_site_log_version(version)

# Could do something like this, but that would be overkill for two sets of regex.
# _REGEX_LOC = get_location_regex_for_version(version)

if version == "v1.0":
_REGEX_LOC = _REGEX_LOC_V1
elif version == "v2.0":
_REGEX_LOC = _REGEX_LOC_V2
else:
raise LogVersionError("Incorrect version string passed to extract_location_block() function")
raise NotImplementedError(
f"Internal error: version string passed in {version} was in set of supported versions, "
"but this function was not able to process it"
)

location_block = _REGEX_LOC.search(data)

# Did regex return data?
if location_block is None:
logger.warning(f"LOC rejected from {file_path}")
return _np.array([]).reshape(0, 12)
return location_block
return _np.array([]).reshape(0, 12) # Return empty ndarray of correct shape
# Decode data from regex and present it as ndarray
match_group_strings = [group.decode(encoding="utf-8", errors="ignore") for group in location_block.groups()]
return _np.asarray(match_group_strings)


def extract_receiver_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes]], _np.array]:
def extract_receiver_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes]], _np.ndarray]:
"""Extract the location block given the bytes object read from an IGS site log file

:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:return List[Tuple[bytes]]: The receiver block of the data. Each list element specifies an receiver
:return List[Tuple[bytes]] or _np.ndarray: The receiver block of the data. Each list element specifies an receiver.
If regex doesn't match, an empty numpy NDArray is returned instead.
"""
receiver_block = _REGEX_REC.findall(data)
if receiver_block == []:
Expand All @@ -233,12 +303,13 @@ def extract_receiver_block(data: bytes, file_path: str) -> Union[List[Tuple[byte
return receiver_block


def extract_antenna_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes]], _np.array]:
def extract_antenna_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes]], _np.ndarray]:
"""Extract the antenna block given the bytes object read from an IGS site log file

:param bytes data: The bytes object returned from an open() call on a IGS site log in "rb" mode
:param str file_path: The path to the file from which the "data" bytes object was obtained
:return List[Tuple[bytes]]: The antenna block of the data. Each list element specifies an antenna
:return List[Tuple[bytes]] or _np.ndarray: The antenna block of the data. Each list element specifies an antenna.
If regex doesn't match, an empty numpy NDArray is returned instead.
"""
antenna_block = _REGEX_ANT.findall(data)
if antenna_block == []:
Expand All @@ -247,12 +318,18 @@ def extract_antenna_block(data: bytes, file_path: str) -> Union[List[Tuple[bytes
return antenna_block


def parse_igs_log(filename_array: _np.ndarray) -> Union[_np.ndarray, None]:
def parse_igs_log(filename_array: _np.ndarray, continue_on_log_version_error: bool = True) -> Union[_np.ndarray, None]:
"""Parses igs log and outputs ndarray with parsed data

:param _np.ndarray filename_array: Metadata on input log file. Expects ndarray of the form [CODE DATE PATH]
:return _np.ndarray: Returns array with data from the IGS log file parsed
:param bool continue_on_log_version_error: Log warning but don't raise exception on LogVersionError. This is
default behaviour, set to False to raise the exception instead.
:return _np.ndarray or None: Returns array with data from the IGS log file parsed. Returns None on LogVersionError
unless continue_on_log_version_error is set to False, in which case the exception is raised instead.
:raises LogVersionError: (Only if continue_on_log_version_error is set to False, default is True), raises on
unknown IGS Log version string, or data that doesn't match log version standard.
"""
# Split filename_array out into its three components (CODE, DATE, PATH), discarding the second element (DATE):
file_code, _, file_path = filename_array

with open(file_path, "rb") as file:
Expand All @@ -261,24 +338,25 @@ def parse_igs_log(filename_array: _np.ndarray) -> Union[_np.ndarray, None]:
try:
version = determine_log_version(data)
except LogVersionError as e:
logger.warning(f"Error: {e}, skipping parsing the log file")
return
if continue_on_log_version_error:
logger.warning(f"Error: {e}, skipping parsing the log file")
return None
raise e # Re-raise the exception

blk_id = extract_id_block(data, version, file_path, file_code)
blk_loc = extract_location_block(data, version, file_path)
blk_loc_or_empty_as_array = extract_location_block(data, version, file_path)
blk_rec = extract_receiver_block(data, file_path)
blk_ant = extract_antenna_block(data, file_path)

blk_loc = [group.decode(encoding="utf8", errors="ignore") for group in blk_loc.groups()]
blk_rec = _np.asarray(blk_rec, dtype=str)
blk_ant = _np.asarray(blk_ant, dtype=str)

len_recs = blk_rec.shape[0]
len_ants = blk_ant.shape[0]

blk_id_loc = _np.asarray([0] + blk_id + blk_loc, dtype=object)[_np.newaxis]
blk_id_loc = _np.asarray([0] + blk_id + blk_loc_or_empty_as_array, dtype=object)[_np.newaxis]

code = [code]
code: str = file_code # TODO check whether this was the intended usage
blk_rec = _np.concatenate(
[
_np.asarray([1] * len_recs, dtype=object)[:, _np.newaxis],
Expand Down
Loading