Skip to content

Commit

Permalink
integrity changes
Browse files Browse the repository at this point in the history
Let the alg decide if it wants to perform on a give frame. This required
giving it the VCDU header to make decisions. I also added an
Integrity::Skipped state.
  • Loading branch information
bmflynn committed Feb 5, 2025
1 parent f16c304 commit 7442de8
Show file tree
Hide file tree
Showing 9 changed files with 95 additions and 77 deletions.
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ tracing-subscriber = "0.3"
spacecrafts = "0.1.0-beta.5"
hifitime = "4.0.0"
pyo3 = { version = "^0.22.0" }
crc = "3.2.1"
2 changes: 1 addition & 1 deletion ccsds-cmd/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ ccsds = { path = "../ccsds-lib" }
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
spacecrafts.workspace = true
crc.workspace = true
tracing.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter"] }
handlebars = "6"
regex = "1.10.5"
hifitime.workspace = true
crc = "3.2.1"
4 changes: 4 additions & 0 deletions ccsds-cmd/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ pub fn info(config: FramingConfig, fpath: &Path, format: &Format) -> Result<()>
sum.uncorrectable += 1;
info.summary.uncorrectable += 1;
}
Integrity::Skipped => {
sum.not_performed += 1;
info.summary.not_performed += 1;
}
},
None => {
sum.not_performed += 1;
Expand Down
2 changes: 0 additions & 2 deletions ccsds-cmd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ struct Cli {

#[derive(Subcommand)]
enum FramingCommands {
//Info,
//CheckRs,
/// Byte-align and remove fill
///
/// Leaves ASM in place. Performs no PN or integrity checking
Expand Down
1 change: 1 addition & 0 deletions ccsds-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ repository.workspace = true
[dependencies]
hifitime.workspace = true
pyo3 = { workspace = true, optional = true }
crc.workspace = true
crossbeam = { version = "^0.8.4", features = ["crossbeam-channel"] }
ndarray = "0.16.1"
rayon = "^1.7.0"
Expand Down
79 changes: 25 additions & 54 deletions ccsds-lib/src/framing/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ pub struct FrameDecoder {
num_threads: Option<u32>,
derandomization: Option<Box<dyn Derandomizer>>,
integrity: Option<Box<dyn IntegrityAlgorithm>>,
integrity_noop: bool,
}

impl FrameDecoder {
Expand All @@ -71,13 +70,6 @@ impl FrameDecoder {
self
}

/// Do not perform integrity check. Useful when there are parity bytes to remove but you do not
/// want to perform the algorithm.
pub fn with_integrity_noop(mut self) -> Self {
self.integrity_noop = true;
self
}

/// Use this number of threads for integrity checks. By default the number of threads is
/// configured automatically and is typically the number of CPUs available on the system.
pub fn with_integrity_threads(mut self, num: u32) -> Self {
Expand Down Expand Up @@ -132,55 +124,34 @@ impl FrameDecoder {
continue;
};

// No integrity checking on FILL, however, we still remove parity bytes
if hdr.vcid == VCDUHeader::FILL || self.integrity_noop {
let data = match integrity_alg.clone().borrow() {
Some(alg) => alg.remove_parity(&block),
None => &block,
};

if let Some(frame) = Frame::decode(data.to_vec()) {
if future_tx
.send(Ok(DecodedFrame {
frame,
// Do integrity checking in the thread pool. Use spawn_fifo to make sure the frame
// order is maintained.
pool.spawn_fifo(move || {
let decoded_frame = if let Some(integrity_alg) = integrity_alg.clone().borrow()
{
match integrity_alg.perform(&hdr, &block) {
Ok((status, data)) => Ok(DecodedFrame {
frame: Frame { header: hdr, data },
missing: 0,
integrity: None,
}))
.is_err()
{
debug!(block_idx = idx, "failed to send fill frame");
integrity: Some(status),
}),
Err(err) => Err(err),
}
} else {
Ok(DecodedFrame {
frame: Frame {
header: hdr,
data: block,
},
missing: 0,
integrity: None,
})
};

if future_tx.send(decoded_frame).is_err() {
debug!(block_idx = idx, "failed to send frame");
}
} else {
// Do integrity checking in the thread pool. Use spawn_fifo to make sure the frame
// order is maintained.
pool.spawn_fifo(move || {
let decoded_frame =
if let Some(integrity_alg) = integrity_alg.clone().borrow() {
match integrity_alg.perform(&block) {
Ok((status, data)) => Ok(DecodedFrame {
frame: Frame { header: hdr, data },
missing: 0,
integrity: Some(status),
}),
Err(err) => Err(err),
}
} else {
Ok(DecodedFrame {
frame: Frame {
header: hdr,
data: block,
},
missing: 0,
integrity: None,
})
};

if future_tx.send(decoded_frame).is_err() {
debug!(block_idx = idx, "failed to send frame");
}
});
}
});
// All frames are forwarded, including fill
if let Err(err) = jobs_tx.send(future_rx) {
debug!("failed to send frame future: {err}");
Expand Down
36 changes: 29 additions & 7 deletions ccsds-lib/src/framing/integrity/crc32.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,43 @@
use super::IntegrityAlgorithm;
use super::{Integrity, IntegrityAlgorithm};
use crate::{
error::{Error, Result},
framing::VCDUHeader,
};

pub struct DefaultCrc32 {
/// Offset to the start of the crc checksum bytes
offset: usize,
size: usize,
alg: crc::Crc<u32>,
}

impl DefaultCrc32 {
const CRC_SIZE: usize = 4;
pub fn new(offset: usize) -> Self {
Self { offset }
Self {
offset,
size: offset + Self::CRC_SIZE,
alg: crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC),
}
}
}

impl IntegrityAlgorithm for DefaultCrc32 {
fn remove_parity<'a>(&self, _cadu_dat: &'a [u8]) -> &'a [u8] {
todo!();
}
fn perform(&self, _cadu_dat: &[u8]) -> super::Result<(super::Integrity, Vec<u8>)> {
todo!();
fn perform(&self, header: &VCDUHeader, cadu_dat: &[u8]) -> Result<(Integrity, Vec<u8>)> {
if cadu_dat.len() < self.size {
return Err(Error::NotEnoughData {
got: cadu_dat.len(),
wanted: self.size,
});
}
if header.vcid == VCDUHeader::FILL {
return Ok((Integrity::Skipped, cadu_dat.to_vec()));
}
let dat = &cadu_dat[self.offset..self.offset + Self::CRC_SIZE];
let expected = u32::from_be_bytes([dat[0], dat[1], dat[2], dat[3]]);
if expected != self.alg.checksum(cadu_dat) {
return Ok((Integrity::NoErrors, cadu_dat.to_vec()));
}
Ok((Integrity::HasErrors, cadu_dat.to_vec()))
}
}
17 changes: 11 additions & 6 deletions ccsds-lib/src/framing/integrity/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ pub use crate::prelude::*;
pub use crc32::*;
pub use reed_solomon::*;

use super::VCDUHeader;

#[derive(Clone, Debug, PartialEq)]
pub enum Integrity {
NoErrors,
Expand All @@ -14,14 +16,17 @@ pub enum Integrity {
/// Data was successfully corrected.
Corrected,
Uncorrectable,
/// The algorithm choose to skip performing integrity checks
Skipped,
}

pub trait IntegrityAlgorithm: Send + Sync {
/// Remove parity bytes from the CADU data.
/// Perform this integrity check.
///
/// This does not imply that this integrity check was performed.
fn remove_parity<'a>(&self, cadu_dat: &'a [u8]) -> &'a [u8];

/// Perform this integrity check. The returned data will have any parity bytes removed.
fn perform(&self, cadu_dat: &[u8]) -> Result<(Integrity, Vec<u8>)>;
/// `cadu_dat` must already be derandomized and be of expected lenght for this algorithm. This
/// algorithm may also choose to skip performance of the algorithm, e.g., for VCID fill frames.
///
/// The algorithm will remove any parity bytes such that the returned data is just the frame
/// bytes.
fn perform(&self, header: &VCDUHeader, cadu_dat: &[u8]) -> Result<(Integrity, Vec<u8>)>;
}
30 changes: 23 additions & 7 deletions ccsds-lib/src/framing/integrity/reed_solomon.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use rs2::{correct_message, RSState, N, PARITY_LEN};

use super::{Error, Integrity, IntegrityAlgorithm};
use crate::prelude::*;
use crate::{framing::VCDUHeader, prelude::*};

/// Deinterleave an interleaved RS block (code block + check symbols).
///
Expand Down Expand Up @@ -29,28 +29,39 @@ fn deinterleave(data: &[u8], interleave: u8) -> Vec<[u8; 255]> {
pub struct DefaultReedSolomon {
pub interleave: u8,
pub parity_len: usize,
noop: bool,
}

impl DefaultReedSolomon {
pub fn new(interleave: u8) -> Self {
Self {
interleave,
parity_len: PARITY_LEN,
noop: false,
}
}

pub fn noop(interleave: u8) -> Self {
Self {
interleave,
parity_len: PARITY_LEN,
noop: true,
}
}

fn can_correct(block: &[u8], interleave: u8) -> bool {
block.len() == N as usize * interleave as usize
}
}

//impl Corrector for DefaultReedSolomon {
impl IntegrityAlgorithm for DefaultReedSolomon {
fn remove_parity<'a>(&self, cadu_dat: &'a [u8]) -> &'a [u8] {
let parity_len = self.interleave as usize * self.parity_len;
&cadu_dat[..cadu_dat.len() - parity_len]
}
}

fn perform(&self, cadu_dat: &[u8]) -> Result<(Integrity, Vec<u8>)> {
//impl Corrector for DefaultReedSolomon {
impl IntegrityAlgorithm for DefaultReedSolomon {
fn perform(&self, header: &VCDUHeader, cadu_dat: &[u8]) -> Result<(Integrity, Vec<u8>)> {
if !DefaultReedSolomon::can_correct(cadu_dat, self.interleave) {
return Err(Error::IntegrityAlgorithm(format!(
"codeblock len={} cannot be corrected by this algorithm with interleave={}",
Expand All @@ -59,6 +70,10 @@ impl IntegrityAlgorithm for DefaultReedSolomon {
)));
}

if header.vcid == VCDUHeader::FILL || self.noop {
return Ok((Integrity::Skipped, self.remove_parity(cadu_dat).to_vec()));
}

let block: Vec<u8> = cadu_dat.to_vec();
let mut corrected = vec![0u8; block.len()];
let mut num_corrected = 0;
Expand Down Expand Up @@ -139,9 +154,10 @@ mod tests {

let rs = DefaultReedSolomon::new(interleave);
let expected_block_len = if interleave == 4 { 892 } else { 1115 };
let hdr = VCDUHeader::decode(&cadu).unwrap();

// Check original data tests out OK
let (status, block) = rs.perform(&cadu).unwrap();
let (status, block) = rs.perform(&hdr, &cadu).unwrap();
assert_eq!(
status,
Integrity::Ok,
Expand All @@ -151,7 +167,7 @@ mod tests {

// Introduce an error by just adding one with wrap to a byte and make sure it's corrected
cadu[100] += 1;
let (status, block) = rs.perform(&cadu).unwrap();
let (status, block) = rs.perform(&hdr, &cadu).unwrap();
assert_eq!(
status,
Integrity::Corrected,
Expand Down

0 comments on commit 7442de8

Please sign in to comment.