diff --git a/src/rrdp.rs b/src/rrdp.rs index 64bc120..7238ba7 100644 --- a/src/rrdp.rs +++ b/src/rrdp.rs @@ -303,9 +303,318 @@ impl NotificationFile { } +//------------ LimitedNotificationFile --------------------------------------- + +/// A notification file with a limited delta list. +/// +/// This type can be used to parse a notification XML with a potentially +/// broken delta list. +#[derive(Clone, Debug)] +pub struct LimitedNotificationFile { + /// The identifier of the current session of the server. + session_id: Uuid, + + /// The serial number of the most recent update. + serial: u64, + + /// Information about the most recent snapshot. + snapshot: SnapshotInfo, + + /// The list of available delta updates. + deltas: Result, DeltaError>, +} + +impl LimitedNotificationFile { + /// Returns the identifier of the current session of the server. + /// + /// Delta updates can only be used if the session ID of the last processed + /// update matches this value. + pub fn session_id(&self) -> Uuid { + self.session_id + } + + /// Returns the serial number of the most recent update. + /// + /// Serial numbers increase by one between each update. + pub fn serial(&self) -> u64 { + self.serial + } + + /// Returns information about the most recent snapshot. + /// + /// The snapshot contains a complete set of all data published via the + /// repository. It can be processed using the [`ProcessSnapshot`] trait. + pub fn snapshot(&self) -> &SnapshotInfo { + &self.snapshot + } + + /// Returns the list of available delta updates. + /// + /// Deltas can be processed using the [`ProcessDelta`] trait. + pub fn deltas(&self) -> Result<&[DeltaInfo], DeltaError> { + match self.deltas { + Ok(ref deltas) => Ok(deltas.as_slice()), + Err(err) => Err(err) + } + } + + /// Parses the notification file from XML. + /// + /// Parses the XML and sanitizes the delta list while doing so. If the + /// delta list is suspicious in any way, it will not be included and + /// [`Self::deltas`] will return an error instead. + /// + /// If `current_serial` is given, only the deltas necessary to update + /// from that serial are included. If it is `None`, no deltas are included + /// at all. + /// + /// The `delta_use_limit` is the maximum number of deltas that the caller + /// is willing to use before falling back to the snapshot. If the delta + /// list would be larger than that, it will be empty instead. + /// + /// The `hard_delta_limit` is the maxium number of deltas to keep while + /// sanitizing the list. If a serial number is encountered that is smaller + /// than the target serial by this value, the delta list is discarded for + /// an error. + pub fn parse( + reader: R, + current_serial: Option, + delta_use_limit: usize, + hard_delta_limit: usize, + ) -> Result { + let mut reader = Reader::new(reader); + + let mut session_id = None; + let mut target_serial: Option = None; + let mut outer = reader.start(|element| { + if element.name() != NOTIFICATION { + return Err(XmlError::Malformed) + } + + element.attributes(|name, value| match name { + b"version" => { + if value.ascii_into::()? != 1 { + return Err(XmlError::Malformed) + } + Ok(()) + } + b"session_id" => { + session_id = Some(value.ascii_into()?); + Ok(()) + } + b"serial" => { + target_serial = Some(value.ascii_into()?); + Ok(()) + } + _ => Err(XmlError::Malformed) + }) + })?; + + let (session_id, target_serial) = match (session_id, target_serial) { + (Some(session_id), Some(serial)) => (session_id, serial), + _ => return Err(XmlError::Malformed), + }; + + // Now that we know the target serial, we can prepare for collecting + // deltas. We keep them in a vec `deltas` whose indexes represent + // reverse serials from the target. I.e., index 0 is the target + // serial, index 1 is target serial minus 1 and so on. If anything bad + // happens, we replace the vec with the error and stop processing + // deltas. + // + // The vec contains options, `None` represents a delta we haven’t seen + // yet. For deltas we will need to move from the current to the target + // serial, we keep the uri and hash in an `Ok(_)`. Beyond that we just + // use a dummy `Err(())` to mark that we’ve seen them. We are using a + // result here rather than an option to better distinguish between the + // outer option and the inner result. + // + // `use_limit` will keep the index where we start inserting `Err(())`. + // + // We will make sure the vec never grows beyond `hard_delta_limit` + // elements. + + let mut snapshot = None; + let mut deltas = Ok(Vec::new()); + let use_limit = match current_serial { + Some(serial) => { + //let serial = serial.saturating_add(1); + match target_serial.checked_sub(serial) { + Some(limit) => { + match usize::try_from(limit) { + Ok(limit) => { + if limit <= delta_use_limit { + limit + } + else { + deltas = Err(DeltaError::AboveUseLimit); + 0 + } + } + _ => 0, + } + } + None => 0, + } + }, + None => { + deltas = Err(DeltaError::NewRepository); + 0 + } + }; + + while let Some(mut content) = outer.take_opt_element( + &mut reader, |element| { + match element.name() { + SNAPSHOT => { + if snapshot.is_some() { + return Err(XmlError::Malformed) + } + let mut uri = None; + let mut hash = None; + element.attributes(|name, value| match name { + b"uri" => { + uri = Some(value.ascii_into()?); + Ok(()) + } + b"hash" => { + hash = Some(value.ascii_into()?); + Ok(()) + } + _ => Err(XmlError::Malformed) + })?; + match (uri, hash) { + (Some(uri), Some(hash)) => { + snapshot = Some(UriAndHash::new(uri, hash)); + Ok(()) + } + _ => Err(XmlError::Malformed) + } + } + DELTA => { + let mut serial = None; + let mut uri = None; + let mut hash = None; + element.attributes(|name, value| match name { + b"serial" => { + serial = Some(value.ascii_into()?); + Ok(()) + } + b"uri" => { + uri = Some(value.ascii_into()?); + Ok(()) + } + b"hash" => { + hash = Some(value.ascii_into()?); + Ok(()) + } + _ => Err(XmlError::Malformed) + })?; + let (serial, uri, hash) = match (serial, uri, hash) { + (Some(serial), Some(uri), Some(hash)) => { + (serial, uri, hash) + }, + _ => return Err(XmlError::Malformed) + }; + + if deltas.is_err() { + return Ok(()) + } + + let idx = match target_serial.checked_sub(serial) { + Some(idx) => { + match usize::try_from(idx) { + Ok(idx) => idx, + Err(_) => { + deltas = Err( + DeltaError::SerialTooOld(serial) + ); + return Ok(()) + } + } + } + None => { + deltas = Err(DeltaError::FutureSerial(serial)); + return Ok(()) + } + }; + if idx >= hard_delta_limit { + deltas = Err(DeltaError::SerialTooOld(serial)); + return Ok(()) + } + let info = if idx < use_limit { + Ok(UriAndHash::new(uri, hash)) + } + else { + Err(()) + }; + let err = if let Ok(deltas) = deltas.as_mut() { + if idx >= deltas.len() { + deltas.resize(idx + 1, None); + } + if deltas[idx].is_some() { + true + } + else { + deltas[idx] = Some(info); + false + } + } + else { + false + }; + if err { + deltas = Err(DeltaError::DuplicateSerial(serial)); + } + Ok(()) + } + _ => Err(XmlError::Malformed) + } + } + )? { + content.take_end(&mut reader)?; + } + + outer.take_end(&mut reader)?; + reader.end()?; + + match snapshot { + Some(snapshot) => { + Ok(LimitedNotificationFile { + session_id, + serial: target_serial, + snapshot, + deltas: Self::convert_deltas(deltas, target_serial), + }) + } + _ => Err(XmlError::Malformed) + } + } + + fn convert_deltas( + deltas: Result>>, DeltaError>, + mut serial: u64 + ) -> Result, DeltaError> { + let deltas = deltas?; + let mut res = Vec::new(); + for item in deltas { + let item = match item { + Some(item) => item, + None => return Err(DeltaError::MissingDelta(serial)) + }; + if let Ok(uri_and_hash) = item { + res.push(DeltaInfo { serial, uri_and_hash }) + } + serial = serial.checked_sub(1).expect("serial number underrun"); + } + Ok(res) + } +} + + //------------ PublishElement ------------------------------------------------ -/// Am RPKI object to be published for the first time. +/// An RPKI object to be published for the first time. /// /// This type defines an RRDP publish element as found in RRDP Snapshots and /// Deltas. See [`UpdateElement`] for the related element that replaces a @@ -1477,6 +1786,56 @@ impl fmt::Display for ParseHashError { impl error::Error for ParseHashError { } +//------------ DeltaError ---------------------------------------------------- + +/// The delta list in a notification file was unacceptable. +#[derive(Clone, Copy, Debug)] +pub enum DeltaError { + /// Too many deltas required to update to the target serial. + AboveUseLimit, + + /// This is the first time we use this repository. + NewRepository, + + /// A delta had a serial greater than the target serial. + FutureSerial(u64), + + /// A delta serial was smaller than the hard limit. + SerialTooOld(u64), + + /// A duplicate delta was found. + DuplicateSerial(u64), + + /// A delta was missing. + MissingDelta(u64), +} + +impl fmt::Display for DeltaError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match self { + Self::AboveUseLimit => { + write!(f, "too many deltas necessary for update") + } + Self::NewRepository => write!(f, "new repository"), + Self::FutureSerial(serial) => { + write!(f, "serial {serial} greater than target serial") + } + Self::SerialTooOld(serial) => { + write!(f, "serial {serial} below hard serial limit") + } + Self::DuplicateSerial(serial) => { + write!(f, "duplicate serial {serial}") + } + Self::MissingDelta(serial) => { + write!(f, "chain is missing serial {serial}") + } + } + } +} + +impl error::Error for DeltaError { } + + //------------ ProcessError -------------------------------------------------- /// An error occurred while processing RRDP data. @@ -1525,6 +1884,7 @@ impl error::Error for ProcessError { } #[cfg(test)] mod test { use std::str::from_utf8_unchecked; + use std::str::FromStr; use super::*; @@ -1597,6 +1957,16 @@ mod test { ).as_ref() ).is_err() ); + assert!( + LimitedNotificationFile::parse( + include_bytes!( + "../test-data/rrdp/lolz-notification.xml" + ).as_ref(), + None, + 0, + 0 + ).is_err() + ); } #[test] @@ -1612,6 +1982,29 @@ mod test { ).as_ref() ).unwrap(); assert!(!notification_with_gaps.sort_and_verify_deltas(None)); + + assert!(matches!( + LimitedNotificationFile::parse( + include_bytes!( + "../test-data/rrdp/ripe-notification-with-gaps.xml" + ).as_ref(), + Some(1740), 10, 1000 + ).unwrap().deltas(), + Err(DeltaError::MissingDelta(_)) + )); + } + + #[test] + fn duplicates_notification() { + assert!(matches!( + LimitedNotificationFile::parse( + include_bytes!( + "../test-data/rrdp/ripe-notification-duplicates.xml" + ).as_ref(), + Some(1740), 10, 1000 + ).unwrap().deltas(), + Err(DeltaError::DuplicateSerial(_)) + )); } #[test] @@ -1656,6 +2049,67 @@ mod test { assert_eq!(from_sorted, from_unsorted); } + #[test] + fn parse_limited_notification_file() { + let bytes = include_bytes!( + "../test-data/rrdp/ripe-notification-unsorted.xml" + ).as_ref(); + + // Get the full delta set. + let mut full = NotificationFile::parse(bytes).unwrap(); + full.reverse_sort_deltas(); + let deltas = full.deltas(); + let serial = full.serial(); + + // No current serial + let n = LimitedNotificationFile::parse( + bytes, None, 100, 1000 + ).unwrap(); + assert!(matches!(n.deltas(), Err(DeltaError::NewRepository))); + + // Current serial larger than target serial + let n = LimitedNotificationFile::parse( + bytes, Some(serial + 1), 100, 1000 + ).unwrap(); + assert_eq!(n.deltas().unwrap().len(), 0); + + // Current serial equal to target serial + let n = LimitedNotificationFile::parse( + bytes, Some(serial), 100, 1000 + ).unwrap(); + assert_eq!(n.deltas().unwrap().len(), 0); + + // Current serial one below target serial + let n = LimitedNotificationFile::parse( + bytes, Some(serial - 1), 100, 1000 + ).unwrap(); + assert_eq!(*n.deltas().unwrap(), deltas[..1]); + + // Current serial 10 below target and limit is 20 + let n = LimitedNotificationFile::parse( + bytes, Some(serial - 10), 20, 1000 + ).unwrap(); + assert_eq!(*n.deltas().unwrap(), deltas[..10]); + + // Current serial 10 below target and limit is 5 + let n = LimitedNotificationFile::parse( + bytes, Some(serial - 10), 5, 1000 + ).unwrap(); + assert!(matches!(n.deltas(), Err(DeltaError::AboveUseLimit))); + + // Hard limit is too low + let n = LimitedNotificationFile::parse( + bytes, Some(serial - 2), 5, deltas.len() - 1, + ).unwrap(); + assert!(matches!(n.deltas(), Err(DeltaError::SerialTooOld(_)))); + + // Hard limit is same as delta len. + let n = LimitedNotificationFile::parse( + bytes, Some(serial - 2), 5, deltas.len(), + ).unwrap(); + assert_eq!(n.deltas().unwrap().len(), 2); + } + #[test] fn ripe_snapshot() { ::process( @@ -1698,7 +2152,9 @@ mod test { from_utf8_unchecked(vec.as_ref()) }; - let notification_parsed = NotificationFile::parse(xml.as_bytes()).unwrap(); + let notification_parsed = NotificationFile::parse( + xml.as_bytes() + ).unwrap(); assert_eq!(notification, notification_parsed); } diff --git a/test-data/rrdp/ripe-notification-duplicates.xml b/test-data/rrdp/ripe-notification-duplicates.xml new file mode 100644 index 0000000..f82e831 --- /dev/null +++ b/test-data/rrdp/ripe-notification-duplicates.xml @@ -0,0 +1,4 @@ + + + +