Skip to content

Commit

Permalink
Merge branch 'main' into NPI-3676-warn-on-sp3-velocity-output
Browse files Browse the repository at this point in the history
  • Loading branch information
treefern authored Jan 8, 2025
2 parents ee7507a + 8cadd2e commit 0e8db5f
Show file tree
Hide file tree
Showing 3 changed files with 337 additions and 48 deletions.
154 changes: 150 additions & 4 deletions gnssanalysis/gn_io/sp3.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
import logging
import io as _io
import os as _os
Expand All @@ -9,6 +10,7 @@
import pandas as _pd
from scipy import interpolate as _interpolate

from .. import filenames
from .. import gn_aux as _gn_aux
from .. import gn_const as _gn_const
from .. import gn_datetime as _gn_datetime
Expand Down Expand Up @@ -226,6 +228,65 @@ def remove_offline_sats(sp3_df: _pd.DataFrame, df_friendly_name: str = ""):
return sp3_df


def filter_by_svs(
sp3_df: _pd.DataFrame,
filter_by_count: Optional[int] = None,
filter_by_name: Optional[list[str]] = None,
filter_to_sat_letter: Optional[str] = None,
) -> _pd.DataFrame:
"""
Utility function to trim an SP3 DataFrame down, intended for creating small sample SP3 files for
unit testing (but could be used for other purposes).
Can filter to a specific number of SVs, to specific SV names, and to a specific constellation.
These filters can be used together (though filter by name and filter by sat letter i.e. constellation, does
not make sense).
E.g. you may filter sats to a set of possible SV names, and also to a maximum of n sats. Or you might filter to
a specific constellation, then cap at a max of n sats.
:param _pd.DataFrame sp3_df: input SP3 DataFrame to perform filtering on
:param Optional[int] filter_by_count: max number of sats to return
:param Optional[list[str]] filter_by_name: names of sats to constrain to
:param Optional[str] filter_to_sat_letter: name of constellation (single letter) to constrain to
:return _pd.DataFrame: new SP3 DataFrame after filtering
"""

# Get all SV names
all_sv_names = sp3_df.index.get_level_values(1).unique().array
total_svs = len(all_sv_names)
logger.info(f"Total SVs: {total_svs}")

# Drop SVs which don't match given names
if filter_by_name:
# Make set of every SV name to drop (exclude everything besides what we want to keep)
exclusion_list: list[str] = list(set(all_sv_names) - set(filter_by_name))
sp3_df = sp3_df.drop(exclusion_list, level=1)

# Drop SVs which don't match a given constellation letter (i.e. 'G', 'E', 'R', 'C')
if filter_to_sat_letter:
if len(filter_to_sat_letter) != 1:
raise ValueError(
"Name of sat constellation to filter to, must be a single char. E.g. you cannot enter 'GR'"
)
# Make set of every SV name to drop (exclude everything besides what we want to keep)
other_constellation_sats = [sv for sv in all_sv_names if not filter_to_sat_letter.upper() in sv]
sp3_df = sp3_df.drop(other_constellation_sats, level=1)

# Drop SVs beyond n (i.e. keep only the first n SVs)
if filter_by_count:
if filter_by_count < 0:
raise ValueError("Cannot filter to a negative number of SVs!")
if total_svs <= filter_by_count:
raise ValueError(
f"Cannot filter to max of {filter_by_count} sats, as there are only {total_svs} sats total!"
)
# Exclusion list built by taking all sats *beyond* the amount we want to keep.
exclusion_list = all_sv_names[filter_by_count:]
sp3_df = sp3_df.drop(exclusion_list, level=1)

return sp3_df


def mapparm(old: Tuple[float, float], new: Tuple[float, float]) -> Tuple[float, float]:
"""
Evaluate the offset and scale factor needed to map values from the old range to the new range.
Expand Down Expand Up @@ -577,6 +638,8 @@ def getVelPoly(sp3Df: _pd.DataFrame, deg: int = 35) -> _pd.DataFrame:
def gen_sp3_header(sp3_df: _pd.DataFrame) -> str:
"""
Generate the header for an SP3 file based on the given DataFrame.
NOTE: much of the header information is drawn from the DataFrame attrs structure. If this has not been
updated as the DataFrame has been transformed, the header will not reflect the data.
:param _pd.DataFrame sp3_df: The DataFrame containing the SP3 data.
:return str: The generated SP3 header as a string.
Expand Down Expand Up @@ -916,13 +979,96 @@ def sp3merge(
return merged_sp3


def sp3_hlm_trans(a: _pd.DataFrame, b: _pd.DataFrame) -> tuple[_pd.DataFrame, list]:
def transform_sp3(src_sp3: str, dest_sp3: str, transform_fn, *args, **kwargs):
"""
Apply a transformation to an sp3 file, by reading the file from the given path, applying the supplied
transformation function and args, and writing out a new file to the path given.
:param str src_sp3: Path of the source SP3 file to read in.
:param str dest_sp3: Path to write out the new SP3 file to.
:param callable transform_fn: The transformation function to apply to the SP3 data once loaded. *args
and **kwargs following, are passed to this function.
"""
logger.info(f"Reading file: " + str(src_sp3))
sp3_df = read_sp3(src_sp3)
transformed_df = transform_fn(sp3_df, *args, **kwargs)
write_sp3(transformed_df, dest_sp3)


def trim_df(
sp3_df: _pd.DataFrame,
trim_start: timedelta = timedelta(),
trim_end: timedelta = timedelta(),
keep_first_delta_amount: Optional[timedelta] = None,
):
"""
Rotates sp3_b into sp3_a.
Trim data from the start and end of an sp3 dataframe
:param _pd.DataFrame a: The sp3_a DataFrame.
:param _pd.DataFrame b: The sp3_b DataFrame.
:param _pd.DataFrame sp3_df: The input SP3 DataFrame.
:param timedelta trim_start: Amount of time to trim off the start of the dataframe.
:param timedelta trim_end: Amount of time to trim off the end of the dataframe.
:param Optional[timedelta] keep_first_delta_amount: If supplied, trim the dataframe to this length. Not
compatible with trim_start and trim_end.
:return _pd.DataFrame: Dataframe trimmed to the requested time range, or requested initial amount
"""
time_axis = sp3_df.index.get_level_values(0)
# Work out the new time range that we care about
first_time = min(time_axis)
first_keep_time = first_time + trim_start.total_seconds()
last_time = max(time_axis)
last_keep_time = last_time - trim_end.total_seconds()

# Operating in mode of trimming from start, to start + x amount of time in. As opposed to trimming a delta from each end.
if keep_first_delta_amount:
first_keep_time = first_time
last_keep_time = first_time + keep_first_delta_amount.total_seconds()
if trim_start.total_seconds() != 0 or trim_end.total_seconds() != 0:
raise ValueError("keep_first_delta_amount option is not compatible with start/end time options")

# Slice to the subset that we actually care about
trimmed_df = sp3_df.loc[first_keep_time:last_keep_time]
trimmed_df.index = trimmed_df.index.remove_unused_levels()
return trimmed_df


def trim_to_first_n_epochs(
sp3_df: _pd.DataFrame,
epoch_count: int,
sp3_filename: Optional[str] = None,
sp3_sample_rate: Optional[timedelta] = None,
) -> _pd.DataFrame:
"""
Utility function to trim an SP3 dataframe to the first n epochs, given either the filename, or sample rate
:param _pd.DataFrame sp3_df: The input SP3 DataFrame.
:param int epoch_count: Trim to this many epochs from start of SP3 data (i.e. first n epochs).
:param Optional[str] sp3_filename: Name of SP3 file, just used to derive sample_rate.
:param Optional[timedelta] sp3_sample_rate: Sample rate of the SP3 data. Alternatively this can be
derived from a filename.
:return _pd.DataFrame: Dataframe trimmed to the requested number of epochs.
"""
sample_rate = sp3_sample_rate
if not sample_rate:
if not sp3_filename:
raise ValueError("Either sp3_sample_rate or sp3_filename must be provided")
sample_rate = filenames.convert_nominal_span(
filenames.determine_properties_from_filename(sp3_filename)["sampling_rate"]
)

time_offset_from_start: timedelta = sample_rate * (epoch_count - 1)
return trim_df(sp3_df, keep_first_delta_amount=time_offset_from_start)


def sp3_hlm_trans(
a: _pd.DataFrame,
b: _pd.DataFrame,
) -> tuple[_pd.DataFrame, list]:
"""
Rotates sp3_b into sp3_a.
:param _pd.DataFrame a: The sp3_a DataFrame.
:param _pd.DataFrame b: The sp3_b DataFrame.
:return tuple[_pd.DataFrame, list]: A tuple containing the updated sp3_b DataFrame and the HLM array with applied computed parameters and residuals.
"""
hlm = _gn_transform.get_helmert7(pt1=a.EST[["X", "Y", "Z"]].values, pt2=b.EST[["X", "Y", "Z"]].values)
Expand Down
66 changes: 66 additions & 0 deletions gnssanalysis/test_file_creation_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from datetime import timedelta
from typing import Optional
from gnssanalysis.filenames import convert_nominal_span, determine_properties_from_filename
from gnssanalysis.gn_io.sp3 import filter_by_svs, read_sp3, trim_to_first_n_epochs, write_sp3, remove_offline_sats
import logging

logger = logging.getLogger(__name__)


#### Configuration ####

src_path = "IGS0DEMULT_20243181800_02D_05M_ORB.SP3"
dest_path = "IGS0DEMULT_20243181800_02D_05M_ORB.SP3-trimmed"

# Constrain to x SVs, specific SV names, both, or neither
trim_to_sv_names: Optional[list[str]] = ["G02", "G03", "G19"]
trim_to_sv_count: Optional[int] = None # 1
trim_to_sat_letter: Optional[str] = None # "E"

# How many epochs to include in the trimmed file (offset from start)
trim_to_num_epochs: int = 3

drop_offline_sats: bool = False

#### End configuration ####


filename = src_path.rsplit("/")[-1]
print(f"Filename is: {filename}")

# Raw data would be: determine_sp3_name_props() - that retrieves in seconds. But we want to be more generally applicable, so not just SP3 here ideally.
sample_rate: timedelta = convert_nominal_span(determine_properties_from_filename(filename)["sampling_rate"])
print(f"sample_rate is: {sample_rate}")

# Load
print("Loading SP3 into DataFrame...")
sp3_df = read_sp3(src_path)

# Trim to first x epochs
print(f"Trimming to first {trim_to_num_epochs} epochs")
sp3_df = trim_to_first_n_epochs(sp3_df=sp3_df, epoch_count=trim_to_num_epochs, sp3_filename=filename)

# Filter to chosen SVs or number of SVs...
print(
f"Applying SV filters (max count: {trim_to_sv_count}, limit to names: {trim_to_sv_names}, limit to constellation: {trim_to_sat_letter})..."
)
sp3_df = filter_by_svs(
sp3_df, filter_by_count=trim_to_sv_count, filter_by_name=trim_to_sv_names, filter_to_sat_letter=trim_to_sat_letter
)

# Drop offline sats if requested
if drop_offline_sats:
print(f"Dropping offline sats...")
sp3_df = remove_offline_sats(sp3_df)

# Write out
print(
"Writing out new SP3 file... "
'CAUTION: at the time of writing the header is based on stale metadata in .attrs["HEADER"], not the contents '
"of the dataframe. It will need to be manually updated."
)
write_sp3(sp3_df, dest_path)

# Test if we can successfully read that file...
print("Testing re-read of the output file...")
re_read = read_sp3(dest_path)
Loading

0 comments on commit 0e8db5f

Please sign in to comment.