Skip to content

Commit

Permalink
doc tweaks; remove deadcode
Browse files Browse the repository at this point in the history
  • Loading branch information
bmflynn committed Jan 28, 2025
1 parent effded9 commit 151b707
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 168 deletions.
3 changes: 3 additions & 0 deletions ccsds-cmd/src/info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ fn new_cds_decoder() -> TimecodeDecoder {
fn summarize(fpath: &Path, tc_format: &TCFormat) -> Result<Info> {
let reader = std::fs::File::open(fpath).context("opening input")?;
let packets = decode_packets(reader).filter_map(Result::ok);

let time_decoder: Option<TimecodeDecoder> = match tc_format {
TCFormat::Cds => Some(new_cds_decoder()),
TCFormat::None => None,
Expand Down Expand Up @@ -112,6 +113,8 @@ fn summarize(fpath: &Path, tc_format: &TCFormat) -> Result<Info> {
continue;
}

// NOTE: The resulting Epoch will have a reference time of Jan 1, 1900 and must be
// converted to the basetime for the specific APID.
if let Some(ref time_decoder) = time_decoder {
if let Ok(epoch) = time_decoder.decode(&packet) {
summary.first_packet_time = summary
Expand Down
20 changes: 1 addition & 19 deletions ccsds-lib/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,21 @@
#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
pub enum PacketError {
#[error("invaild length; expected [{min}, {max}), got {got}")]
Length { min: usize, max: usize, got: usize },
#[error("packet version != 0")]
Version,
#[error("packet type != 1")]
Telemetry,
}

#[derive(thiserror::Error, Debug)]
#[non_exhaustive]
pub enum Error {
#[error("Not enough bytes")]
NotEnoughData { actual: usize, minimum: usize },

#[error(transparent)]
Io(#[from] std::io::Error),

#[error("Packet length greater than maximum {0}: {1}")]
InvalidPacketLen(usize, usize),

#[error("Invalid timecode config: {0}")]
TimecodeConfig(String),

#[error("Overflow")]
Overflow,
#[error("Underflow")]
Underflow,

/// Integrity check or correct error executing the algorithm.
#[error("integrity algorithm error: {0}")]
IntegrityAlgorithm(String),

#[error("Packet error: {0}")]
Packet(#[from] PacketError),
}

#[cfg(feature = "python")]
Expand Down
20 changes: 13 additions & 7 deletions ccsds-lib/src/framing/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,23 +53,25 @@ impl FrameDecoder {
const DEFAULT_BUFFER_SIZE: usize = 1024;

pub fn new() -> Self {
FrameDecoder {
num_threads: None,
derandomization: None,
integrity: None,
}
FrameDecoder::default()
}

/// Apply derandomization using the provided algorithm. If not provided no derandomization is
/// performed.
pub fn with_derandomization(mut self, derandomizer: Box<dyn Derandomizer>) -> Self {
self.derandomization = Some(derandomizer);
self
}

/// Perform integrity checking with the give algorithm. If not provided, no configuration
/// checking is performed.
pub fn with_integrity(mut self, integrity: Box<dyn IntegrityAlgorithm>) -> Self {
self.integrity = Some(integrity);
self
}

/// Use this number of threads for integrity checks. By default the number of threads is
/// configured automatically and is typically the number of CPUs available on the system.
pub fn with_integrity_threads(mut self, num: u32) -> Self {
self.num_threads = Some(num);
self
Expand All @@ -78,8 +80,12 @@ impl FrameDecoder {
/// Returns an interator that performs the decode, including derandomization and integrity
/// checks, if configured.
///
/// The returned iterator performs all decoding in a background thread. Integrity checking is
/// further performed
/// Integrity checks are not performed on VCDU fill frames (vcid=63), however, fill frames are
/// not filtered and are produced by the returned iterator.
///
/// Integrity checking is handled in parallel with a distinct job per-CADU using an
/// automatically configured number of threads by default, otherwise the number of threads
/// set using [Self::with_integrity_threads].
///
/// # Errors
/// [Error] if integrity checking is used and fails.
Expand Down
69 changes: 38 additions & 31 deletions ccsds-lib/src/framing/packets.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::{
collections::{HashMap, VecDeque},
collections::{HashMap, HashSet, VecDeque},
fmt::Display,
};

use spacecrafts::Spacecraft;
use tracing::{debug, trace};
use tracing::{debug, trace, warn};

use crate::framing::{integrity::Integrity, DecodedFrame, Scid, Vcid};
use crate::spacepacket::{Packet, PrimaryHeader};
Expand Down Expand Up @@ -32,8 +32,9 @@ impl VcidTracker {
}
}

fn clear(&mut self) {
fn reset(&mut self) {
self.cache.clear();
self.sync = false;
self.rs_corrected = false;
}
}
Expand Down Expand Up @@ -66,8 +67,7 @@ where
frames: I,
izone_length: usize,
trailer_length: usize,
// Expected min/max total packet length
apid_sizes: HashMap<Apid, (usize, usize)>,
valid_apids: HashSet<Apid>,

// Cache of partial packet data from frames that has not yet been decoded into
// packets. There should only be up to about 1 frame worth of data in the cache
Expand Down Expand Up @@ -114,18 +114,16 @@ where
}
Some(Integrity::Uncorrectable) | Some(Integrity::HasErrors) => {
debug!(vcid = %frame.header.vcid, tracker = %tracker, "uncorrectable or errored frame, dropping tracker");
tracker.clear();
tracker.sync = false;
tracker.reset();
continue;
}
_ => {}
}
// Frame error indicates there are frames missing _before_ this one, so clear the
// existing cache and continue to process this frame.
// Frame error indicates there are frames missing _before_ this one -- this one is
// still useable, so clear the existing cache and continue to process this frame.
if missing > 0 {
trace!(vcid = frame.header.vcid, tracker=%tracker, missing=missing, "missing frames, dropping tracker");
tracker.clear();
tracker.sync = false;
tracker.reset();
}

if tracker.sync {
Expand All @@ -134,7 +132,7 @@ where
} else {
// No sync, check for the presence of a FPH (first packet header).

// No way to get sync if we don't have a header
// No way to get sync if we don't have a packet header
if !mpdu.has_header() {
trace!(vcid = %frame.header.vcid, tracker = %tracker, "frames w/o mpdu, dropping");
continue;
Expand Down Expand Up @@ -170,6 +168,10 @@ where
// The start of the cache should always contain a packet primary header
let mut header =
PrimaryHeader::decode(&tracker.cache).expect("failed to decode primary header");
if !valid_packet_header(&header, &self.valid_apids) {
tracker.reset();
continue;
}

// TODO: Add packet validations for length, version, and type

Expand Down Expand Up @@ -204,6 +206,10 @@ where
}
header =
PrimaryHeader::decode(&tracker.cache).expect("failed to decode primary header");
if !valid_packet_header(&header, &self.valid_apids) {
tracker.reset();
break;
}
need = header.len_minus1 as usize + 1 + PrimaryHeader::LEN;
if tracker.cache.len() < need {
break;
Expand All @@ -219,21 +225,28 @@ where
}
}

/// Perform sanity checks on packet header and return true if the packet header appears to be valid
/// and the APID is in `valid_apids`.
fn valid_packet_header(header: &PrimaryHeader, valid_apids: &HashSet<Apid>) -> bool {
if header.version != 0 || header.type_flag != 0 {
warn!("bad packet version or type, dropping {header:?}");
false
} else if !valid_apids.is_empty() && !valid_apids.contains(&header.apid) {
warn!("invalid apid for spacecraft, dropping {header:?}");
false
} else {
true
}
}

/// Decodes the provided frames into a packets contained within the frames' MPDUs.
///
/// While not strictly enforced, frames should all be from the same spacecraft, i.e., have
/// the same spacecraft id.
///
/// If `spacecraft` is provided its configuration is used to perform additional validation for
/// packet minimum and maximum length.
///
/// There are several cases when frame data cannot be fully recovered and is dropped,
/// i.e., not used to construct packets:
///
/// 1. Missing frames
/// 2. Frames with state ``rs2::RSState::Uncorrectable``
/// 3. Fill Frames
/// 4. Frames before the first header is available in an MPDU
/// If `spacecraft` is provided its configuration is used to perform additional checks to make sure
/// any decoded packets have APID present in the spacecraft config. Packets with APIDs not present
/// are dropped.
pub fn decode_framed_packets<I>(
frames: I,
izone_length: usize,
Expand All @@ -243,25 +256,19 @@ pub fn decode_framed_packets<I>(
where
I: Iterator<Item = DecodedFrame> + Send,
{
let mut apid_sizes = HashMap::default();
let mut valid_apids = HashSet::default();
if let Some(spacecraft) = spacecraft {
for vcid in spacecraft.vcids {
for apid in vcid.apids {
let val = match (apid.min_size, apid.max_size) {
(Some(min), Some(max)) => (min, max),
(Some(min), None) => (min, 65536),
(None, Some(max)) => (0, max),
(None, None) => continue,
};
apid_sizes.insert(apid.apid, val);
valid_apids.insert(apid.apid);
}
}
}
FramedPacketIter {
frames: frames.filter(|dc| !dc.frame.is_fill()),
izone_length,
trailer_length,
apid_sizes,
valid_apids,
cache: HashMap::new(),
ready: VecDeque::new(),
}
Expand Down
2 changes: 1 addition & 1 deletion ccsds-lib/src/framing/synchronizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ mod tests {
let asm = ASM.to_vec();
let mut scanner = Synchronizer::new(&pat[..], &asm, 0);
let msg = format!("expected sync for {pat:?}");
let loc = scanner.scan().expect(msg.as_str());
let loc = scanner.scan().unwrap_or_else(|_| panic!("{msg}"));

let expected = Loc {
offset: 5,
Expand Down
Loading

0 comments on commit 151b707

Please sign in to comment.