Skip to content

Commit

Permalink
NPI-3669 more unit tests for sp3 processing, filter_by_svs(), trim_df…
Browse files Browse the repository at this point in the history
…() including new functionality to take first n epochs
  • Loading branch information
treefern committed Dec 24, 2024
1 parent 7a8ae97 commit 8dd48a2
Showing 1 changed file with 105 additions and 2 deletions.
107 changes: 105 additions & 2 deletions tests/test_sp3.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from datetime import timedelta
import unittest
from unittest.mock import patch, mock_open
from pyfakefs.fake_filesystem_unittest import TestCase

import numpy as np
import pandas as pd

from gnssanalysis.filenames import convert_nominal_span, determine_properties_from_filename
import gnssanalysis.gn_io.sp3 as sp3

from test_datasets.sp3_test_data import (
Expand Down Expand Up @@ -185,18 +187,119 @@ def test_velinterpolation(self, mock_file):
def test_sp3_offline_sat_removal(self, mock_file):
sp3_df = sp3.read_sp3("mock_path", pOnly=False)
self.assertEqual(
sp3_df.index.get_level_values(1).unique().array,
sp3_df.index.get_level_values(1).unique().array.tolist(),
["G02", "G03", "G19"],
"Should be three SVs in test file before removing offline ones",
)

sp3_df = sp3.remove_offline_sats(sp3_df)
self.assertEqual(
sp3_df.index.get_level_values(1).unique().array,
sp3_df.index.get_level_values(1).unique().array.tolist(),
["G02", "G03"],
"Should be two SVs after removing offline ones",
)

# sp3_test_data_truncated_cod_final is input_data2
@patch("builtins.open", new_callable=mock_open, read_data=input_data2)
def test_filter_by_svs(self, mock_file):
sp3_df = sp3.read_sp3("mock_path", pOnly=False)
self.assertEqual(
len(sp3_df.index.get_level_values(1).unique().array),
34,
"Should be 34 unique SVs in test file before filtering",
)

sp3_df_filtered_by_count = sp3.filter_by_svs(sp3_df, filter_by_count=2)
self.assertEqual(
sp3_df_filtered_by_count.index.get_level_values(1).unique().array.tolist(),
["G01", "G02"],
"Should be two SVs after trimming to max 2",
)

sp3_df_filtered_by_constellation = sp3.filter_by_svs(sp3_df, filter_to_sat_letter="R")
self.assertEqual(
sp3_df_filtered_by_constellation.index.get_level_values(1).unique().array.tolist(),
["R01", "R02"],
"Should have only Glonass sats after filtering to constellation R",
)

sp3_df_filtered_by_name = sp3.filter_by_svs(sp3_df, filter_by_name=["G19", "G03"])
self.assertEqual(
sp3_df_filtered_by_name.index.get_level_values(1).unique().array.tolist(),
["G03", "G19"],
"Should have only specific sats after filtering by name",
)

@patch("builtins.open", new_callable=mock_open, read_data=offline_sat_test_data)
def test_trim_df(self, mock_file):
sp3_df = sp3.read_sp3("mock_path", pOnly=False)
# offline_sat_test_data is based on the following file, but 3 epochs, not 2 days:
filename = "IGS0DEMULT_20243181800_02D_05M_ORB.SP3"
# Expected starting set of epochs, in j2000 seconds
expected_initial_epochs = [784792800, 784793100, 784793400]
# Those epochs as datetimes are:
# ['2024-11-13T18:00:00', '2024-11-13T18:05:00', '2024-11-13T18:10:00'], dtype='datetime64[s]'
# Our sample rate is 5 mins, so indexing from here on, is in timedeltas in multiples of 5 mins
self.assertEqual(
sp3_df.index.get_level_values(0).unique().array.tolist(),
expected_initial_epochs,
"Should be 3 epochs in test file before trimming",
)

# Trimming 5 mins from end should result in first two epochs only
sp3_df_start_trim = sp3.trim_df(sp3_df=sp3_df, trim_start=timedelta(0), trim_end=timedelta(minutes=5))
self.assertEqual(sp3_df_start_trim.index.get_level_values(0).unique().array.tolist(), [784792800, 784793100])

# After trimming end by 3 epochs, expect no data
sp3_df_start_trim = sp3.trim_df(sp3_df=sp3_df, trim_start=timedelta(0), trim_end=timedelta(minutes=15))
self.assertEqual(sp3_df_start_trim.index.get_level_values(0).unique().array.tolist(), [])

# Expected resulting epochs after trimming start by 1 epoch
sp3_df_start_trim = sp3.trim_df(sp3_df=sp3_df, trim_start=timedelta(minutes=5), trim_end=timedelta(0))
self.assertEqual(sp3_df_start_trim.index.get_level_values(0).unique().array.tolist(), [784793100, 784793400])

# Expected resulting epochs after trimming start by 3 epochs (no data)
sp3_df_start_trim = sp3.trim_df(sp3_df=sp3_df, trim_start=timedelta(minutes=15), trim_end=timedelta(0))
self.assertEqual(sp3_df_start_trim.index.get_level_values(0).unique().array.tolist(), [])

# Trim start and end by one epoch (test you can do both at once)
sp3_df_start_trim = sp3.trim_df(sp3_df=sp3_df, trim_start=timedelta(minutes=5), trim_end=timedelta(minutes=5))
self.assertEqual(sp3_df_start_trim.index.get_level_values(0).unique().array.tolist(), [784793100])

# Test trimming by epoch count
trim_to_num_epochs = 2
sample_rate = convert_nominal_span(determine_properties_from_filename(filename=filename)["sampling_rate"])
self.assertEqual(
sample_rate, timedelta(minutes=5), "Sample rate should've been parsed as 5 minutes, from filename"
)

sp3_df_trimmed = sp3.trim_to_epoch_count(sp3_df, epoch_count=2, sp3_sample_rate=sample_rate)
self.assertEqual(
sp3_df_trimmed.index.get_level_values(0).unique().array.tolist(),
[784792800, 784793100],
"Should be first two epochs after trimming with trim_to_epoch_count() using sample_rate",
)

sp3_df_trimmed = sp3.trim_to_epoch_count(sp3_df, epoch_count=2, sp3_filename=filename)
self.assertEqual(
sp3_df_trimmed.index.get_level_values(0).unique().array.tolist(),
[784792800, 784793100],
"Should be first two epochs after trimming with trim_to_epoch_count() using filename to derive sample_rate",
)

# Test the keep_first_delta_amount parameter of trim_df(), used above
trim_to_num_epochs = 2
sample_rate = timedelta(minutes=5)
time_offset_from_start: timedelta = sample_rate * (trim_to_num_epochs - 1)
self.assertEqual(time_offset_from_start, timedelta(minutes=5))
# Now the actual test
sp3_df_trimmed = sp3.trim_df(sp3_df, keep_first_delta_amount=time_offset_from_start)
self.assertEqual(
sp3_df_trimmed.index.get_level_values(0).unique().array.tolist(),
[784792800, 784793100],
"Should be two epochs after trimming with keep_first_delta_amount parameter",
)


class TestMergeSP3(TestCase):
def setUp(self):
Expand Down

0 comments on commit 8dd48a2

Please sign in to comment.