Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NPI-3669 Migrate SP3 transform and trim, introduce minimal SP3 creation utility script #63

Merged
merged 9 commits into from
Jan 8, 2025
157 changes: 153 additions & 4 deletions gnssanalysis/gn_io/sp3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
import logging
import io as _io
import os as _os
Expand All @@ -9,6 +10,7 @@
import pandas as _pd
from scipy import interpolate as _interpolate

from .. import filenames
from .. import gn_aux as _gn_aux
from .. import gn_const as _gn_const
from .. import gn_datetime as _gn_datetime
Expand Down Expand Up @@ -226,6 +228,65 @@ def remove_offline_sats(sp3_df: _pd.DataFrame, df_friendly_name: str = ""):
return sp3_df


def filter_by_svs(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very useful function and I've tested it now on a sample SP3 file - works great!

sp3_df: _pd.DataFrame,
filter_by_count: Optional[int] = None,
filter_by_name: Optional[list[str]] = None,
filter_to_sat_letter: Optional[str] = None,
) -> _pd.DataFrame:
"""
Utility function to trim an SP3 DataFrame down, intended for creating small sample SP3 files for
unit testing (but could be used for other purposes).
Can filter to a specific number of SVs, to specific SV names, and to a specific constellation.

These filters can be used together (though filter by name and filter by sat letter i.e. constellation, does
not make sense).
E.g. you may filter sats to a set of possible SV names, and also to a maximum of n sats. Or you might filter to
a specific constellation, then cap at a max of n sats.

:param _pd.DataFrame sp3_df: input SP3 DataFrame to perform filtering on
:param Optional[int] filter_by_count: max number of sats to return
:param Optional[list[str]] filter_by_name: names of sats to constrain to
:param Optional[str] filter_to_sat_letter: name of constellation (single letter) to constrain to
:return _pd.DataFrame: new SP3 DataFrame after filtering
"""

# Get all SV names
all_sv_names = sp3_df.index.get_level_values(1).unique().array
total_svs = len(all_sv_names)
logger.info(f"Total SVs: {total_svs}")

# Drop SVs which don't match given names
if filter_by_name:
# Make set of every SV name to drop (exclude everything besides what we want to keep)
exclusion_list: list[str] = list(set(all_sv_names) - set(filter_by_name))
sp3_df = sp3_df.drop(exclusion_list, level=1)

# Drop SVs which don't match a given constellation letter (i.e. 'G', 'E', 'R', 'C')
if filter_to_sat_letter:
if len(filter_to_sat_letter) != 1:
raise ValueError(
"Name of sat constellation to filter to, must be a single char. E.g. you cannot enter 'GR'"
)
# Make set of every SV name to drop (exclude everything besides what we want to keep)
other_constellation_sats = [sv for sv in all_sv_names if not filter_to_sat_letter.upper() in sv]
sp3_df = sp3_df.drop(other_constellation_sats, level=1)

# Drop SVs beyond n (i.e. keep only the first n SVs)
if filter_by_count:
if filter_by_count < 0:
raise ValueError("Cannot filter to a negative number of SVs!")
if total_svs <= filter_by_count:
raise ValueError(
f"Cannot filter to max of {filter_by_count} sats, as there are only {total_svs} sats total!"
)
# Exclusion list built by taking all sats *beyond* the amount we want to keep.
exclusion_list = all_sv_names[filter_by_count:]
sp3_df = sp3_df.drop(exclusion_list, level=1)

return sp3_df


def mapparm(old: Tuple[float, float], new: Tuple[float, float]) -> Tuple[float, float]:
"""
Evaluate the offset and scale factor needed to map values from the old range to the new range.
Expand Down Expand Up @@ -578,6 +639,8 @@ def getVelPoly(sp3Df: _pd.DataFrame, deg: int = 35) -> _pd.DataFrame:
def gen_sp3_header(sp3_df: _pd.DataFrame) -> str:
"""
Generate the header for an SP3 file based on the given DataFrame.
NOTE: much of the header information is drawn from the DataFrame attrs structure. If this has not been
updated as the DataFrame has been transformed, the header will not reflect the data.

:param pandas.DataFrame sp3_df: The DataFrame containing the SP3 data.
:return str: The generated SP3 header as a string.
Expand Down Expand Up @@ -663,6 +726,8 @@ def gen_sp3_content(
# Rather than:
# PG01... X Y Z CLK ... VX VY VZ ...
# ?
# TODO raise warnings if VEL columns are still present, and drop them before writing out, to ensure we remain
# compliant with the spec.

out_buf = buf if buf is not None else _io.StringIO()
if sort_outputs:
Expand Down Expand Up @@ -892,7 +957,7 @@ def sp3merge(
:param Union[List[str], None] clkpaths: The list of paths to the clk files, or None if no clk files are provided.
:param bool nodata_to_nan: Flag indicating whether to convert nodata values to NaN.

:return pd.DataFrame: The merged sp3 DataFrame.
:return DataFrame: The merged sp3 DataFrame.
"""
sp3_dfs = [read_sp3(sp3_file, nodata_to_nan=nodata_to_nan) for sp3_file in sp3paths]
# Create a new attrs dictionary to be used for the output DataFrame
Expand All @@ -908,14 +973,98 @@ def sp3merge(
return merged_sp3


def sp3_hlm_trans(a: _pd.DataFrame, b: _pd.DataFrame) -> tuple[_pd.DataFrame, list]:
def transform_sp3(src_sp3: str, dest_sp3: str, transform_fn, *args, **kwargs):
"""
Apply a transformation to an sp3 file, by reading the file from the given path, applying the supplied
transformation function and args, and writing out a new file to the path given.

:param str src_sp3: Path of the source SP3 file to read in.
:param str dest_sp3: Path to write out the new SP3 file to.
:param callable transform_fn: The transformation function to apply to the SP3 data once loaded. *args
and **kwargs following, are passed to this function.
"""
treefern marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"Reading file: " + str(src_sp3))
sp3_df = read_sp3(src_sp3)
transformed_df = transform_fn(sp3_df, *args, **kwargs)
write_sp3(transformed_df, dest_sp3)


def trim_df(
sp3_df: _pd.DataFrame,
trim_start: timedelta = timedelta(),
trim_end: timedelta = timedelta(),
keep_first_delta_amount: Optional[timedelta] = None,
):
"""
Trim data from the start and end of an sp3 dataframe

:param DataFrame sp3_df: The input SP3 DataFrame.
treefern marked this conversation as resolved.
Show resolved Hide resolved
:param timedelta trim_start: Amount of time to trim off the start of the dataframe.
:param timedelta trim_end: Amount of time to trim off the end of the dataframe.
:param Optional[timedelta] keep_first_delta_amount: If supplied, trim the dataframe to this length. Not
compatible with trim_start and trim_end.
:return DataFrame: Dataframe trimmed to the requested time range, or requested initial amount

"""
treefern marked this conversation as resolved.
Show resolved Hide resolved
time_axis = sp3_df.index.get_level_values(0)
# Work out the new time range that we care about
first_time = min(time_axis)
first_keep_time = first_time + trim_start.total_seconds()
last_time = max(time_axis)
last_keep_time = last_time - trim_end.total_seconds()

# Operating in mode of trimming from start, to start + x amount of time in. As opposed to trimming a delta from each end.
if keep_first_delta_amount:
first_keep_time = first_time
last_keep_time = first_time + keep_first_delta_amount.total_seconds()
if trim_start.total_seconds() != 0 or trim_end.total_seconds() != 0:
raise ValueError("keep_first_delta_amount option is not compatible with start/end time options")

# Slice to the subset that we actually care about
trimmed_df = sp3_df.loc[first_keep_time:last_keep_time]
trimmed_df.index = trimmed_df.index.remove_unused_levels()
return trimmed_df


def trim_to_first_n_epochs(
sp3_df: _pd.DataFrame,
epoch_count: int,
sp3_filename: Optional[str] = None,
sp3_sample_rate: Optional[timedelta] = None,
) -> _pd.DataFrame:
"""
Utility function to trim an SP3 dataframe to the first n epochs, given either the filename, or sample rate

:param DataFrame sp3_df: The input SP3 DataFrame.
:param int epoch_count: Trim to this many epochs from start of SP3 data (i.e. first n epochs).
:param Optional[str] sp3_filename: Name of SP3 file, just used to derive sample_rate.
:param Optional[timedelta] sp3_sample_rate: Sample rate of the SP3 data. Alternatively this can be
derived from a filename.
:return DataFrame: Dataframe trimmed to the requested number of epochs.
"""
treefern marked this conversation as resolved.
Show resolved Hide resolved
sample_rate = sp3_sample_rate
if not sample_rate:
if not sp3_filename:
raise ValueError("Either sp3_sample_rate or sp3_filename must be provided")
sample_rate = filenames.convert_nominal_span(
filenames.determine_properties_from_filename(sp3_filename)["sampling_rate"]
)

time_offset_from_start: timedelta = sample_rate * (epoch_count - 1)
return trim_df(sp3_df, keep_first_delta_amount=time_offset_from_start)


def sp3_hlm_trans(
a: _pd.DataFrame,
b: _pd.DataFrame,
) -> tuple[_pd.DataFrame, list]:
"""
Rotates sp3_b into sp3_a.

:param DataFrame a: The sp3_a DataFrame.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DataFrame --> _pd.DataFrame

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I changed this one to be more generic because that seemed more readable (given the renamed import is an internal thing). If it's parsed by an IDE I can see that backfiring.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other parts of the code, we have pandas.DataFrame. We should standardise on one format for docstrings. One of:

  • DataFrame
  • pandas.DataFrame
  • _pd.DataFrame (looks non-standard for human use but may have the benefit of being resolvable in our context by an IDE.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These have now been updated to fit the convention applied to the majority of code in this codebase, which is using the imported module name, e.g. _pd.DataFrame

:param DataFrame b : The sp3_b DataFrame.
treefern marked this conversation as resolved.
Show resolved Hide resolved

:returntuple[pandas.DataFrame, list]: A tuple containing the updated sp3_b DataFrame and the HLM array with applied computed parameters and residuals.
:return tuple[pandas.DataFrame, list]: A tuple containing the updated sp3_b DataFrame and the HLM array with applied computed parameters and residuals.
treefern marked this conversation as resolved.
Show resolved Hide resolved
"""
hlm = _gn_transform.get_helmert7(pt1=a.EST[["X", "Y", "Z"]].values, pt2=b.EST[["X", "Y", "Z"]].values)
b.iloc[:, :3] = _gn_transform.transform7(xyz_in=b.EST[["X", "Y", "Z"]].values, hlm_params=hlm[0])
Expand All @@ -942,7 +1091,7 @@ def diff_sp3_rac(
:param bool use_offline_sat_removal: Flag indicating whether to remove satellites which are offline / have some
nodata position values. Caution: ensure you turn this on if using cubic spline interpolation with data
which may have holes in it (nodata).
:return: The DataFrame containing the difference in RAC coordinates.
:return DataFrame: The DataFrame containing the difference in RAC coordinates.
treefern marked this conversation as resolved.
Show resolved Hide resolved
"""
hlm_modes = [None, "ECF", "ECI"]
if hlm_mode not in hlm_modes:
Expand Down
66 changes: 66 additions & 0 deletions gnssanalysis/test_file_creation_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from datetime import timedelta
from typing import Optional
from gnssanalysis.filenames import convert_nominal_span, determine_properties_from_filename
from gnssanalysis.gn_io.sp3 import filter_by_svs, read_sp3, trim_to_first_n_epochs, write_sp3, remove_offline_sats
import logging

logger = logging.getLogger(__name__)


#### Configuration ####

src_path = "IGS0DEMULT_20243181800_02D_05M_ORB.SP3"
dest_path = "IGS0DEMULT_20243181800_02D_05M_ORB.SP3-trimmed"

# Constrain to x SVs, specific SV names, both, or neither
trim_to_sv_names: Optional[list[str]] = ["G02", "G03", "G19"]
trim_to_sv_count: Optional[int] = None # 1
trim_to_sat_letter: Optional[str] = None # "E"

# How many epochs to include in the trimmed file (offset from start)
trim_to_num_epochs: int = 3

drop_offline_sats: bool = False

#### End configuration ####


filename = src_path.rsplit("/")[-1]
print(f"Filename is: {filename}")

# Raw data would be: determine_sp3_name_props() - that retrieves in seconds. But we want to be more generally applicable, so not just SP3 here ideally.
sample_rate: timedelta = convert_nominal_span(determine_properties_from_filename(filename)["sampling_rate"])
print(f"sample_rate is: {sample_rate}")

# Load
print("Loading SP3 into DataFrame...")
sp3_df = read_sp3(src_path)

# Trim to first x epochs
print(f"Trimming to first {trim_to_num_epochs} epochs")
sp3_df = trim_to_first_n_epochs(sp3_df=sp3_df, epoch_count=trim_to_num_epochs, sp3_filename=filename)

# Filter to chosen SVs or number of SVs...
print(
f"Applying SV filters (max count: {trim_to_sv_count}, limit to names: {trim_to_sv_names}, limit to constellation: {trim_to_sat_letter})..."
)
sp3_df = filter_by_svs(
sp3_df, filter_by_count=trim_to_sv_count, filter_by_name=trim_to_sv_names, filter_to_sat_letter=trim_to_sat_letter
)

# Drop offline sats if requested
if drop_offline_sats:
print(f"Dropping offline sats...")
sp3_df = remove_offline_sats(sp3_df)

# Write out
print(
"Writing out new SP3 file... "
'CAUTION: at the time of writing the header is based on stale metadata in .attrs["HEADER"], not the contents '
"of the dataframe. It will need to be manually updated."
)
write_sp3(sp3_df, dest_path)

# Test if we can successfully read that file...
print("Testing re-read of the output file...")
re_read = read_sp3(dest_path)
Loading
Loading