From 52c8da8768a6d06da46f364ebbf8cedcebeb5268 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 13 Oct 2024 23:23:15 +0530 Subject: [PATCH] fix: swap with write buffer when no packets left to be read --- storage/src/lib.rs | 70 ++++++++++++++----------------- uplink/src/base/serializer/mod.rs | 2 +- 2 files changed, 32 insertions(+), 40 deletions(-) diff --git a/storage/src/lib.rs b/storage/src/lib.rs index 857c9b2ea..5eadc43d6 100644 --- a/storage/src/lib.rs +++ b/storage/src/lib.rs @@ -30,8 +30,6 @@ pub struct Storage { current_write_file: BytesMut, /// current_read_file current_read_file: BytesMut, - /// Marked true only if current_read_file buffer is loaded from file - is_read_from_file: bool, /// disk persistence persistence: Option, } @@ -43,7 +41,6 @@ impl Storage { max_file_size, current_write_file: BytesMut::with_capacity(max_file_size * 2), current_read_file: BytesMut::with_capacity(max_file_size * 2), - is_read_from_file: false, persistence: None, } } @@ -104,6 +101,13 @@ impl Storage { /// Force flush the contents of write buffer onto disk pub fn flush(&mut self) -> Result, Error> { + let read_buffer_requires_flushing = + !self.persistence.as_ref().is_some_and(|p| p.current_read_file_id.is_some()) + && !self.current_read_file.is_empty(); + if self.current_write_file.is_empty() && !read_buffer_requires_flushing { + return Err(Error::NoWrites); + } + let Some(persistence) = &mut self.persistence else { // TODO(RT): Make sure that disk files starts with id 1 to represent in memory file // with id 0 @@ -115,7 +119,7 @@ impl Storage { return Ok(Some(0)); }; - if !self.is_read_from_file && !self.current_read_file.is_empty() { + if read_buffer_requires_flushing { let NextFile { mut file, deleted } = persistence.open_next_write_file()?; if let Some(id) = deleted { error!("Deleted file while flushing to disk: {id}"); @@ -124,9 +128,6 @@ impl Storage { file.write(&mut self.current_read_file)?; } - if self.current_write_file.is_empty() { - return Err(Error::NoWrites); - } let NextFile { mut file, deleted } = persistence.open_next_write_file()?; info!("Flushing data to disk for stoarge: {}; path = {:?}", self.name, file.path()); file.write(&mut self.current_write_file)?; @@ -149,43 +150,34 @@ impl Storage { return Ok(()); } - let Some(persistence) = &mut self.persistence else { - mem::swap(&mut self.current_read_file, &mut self.current_write_file); - // If read buffer is 0 after swapping, all the data is caught up - if self.current_read_file.is_empty() { - return Err(Error::Done); + if let Some(persistence) = &mut self.persistence { + // Remove read file on completion in destructive-read mode + if let Some(id) = + persistence.current_read_file_id.take_if(|_| !persistence.non_destructive_read) + { + let deleted_file = persistence.remove(id)?; + debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name); } - return Ok(()); - }; - - // Remove read file on completion in destructive-read mode - let read_is_destructive = !persistence.non_destructive_read; - let read_file_id = persistence.current_read_file_id.take(); - if let Some(id) = read_is_destructive.then_some(read_file_id).flatten() { - let deleted_file = persistence.remove(id)?; - debug!("Completed reading a persistence file, deleting it; storage = {}, path = {deleted_file:?}", self.name); - } - - // Swap read buffer with write buffer to read data in inmemory write - // buffer when all the backlog disk files are done - if persistence.backlog_files.is_empty() { - self.is_read_from_file = false; - mem::swap(&mut self.current_read_file, &mut self.current_write_file); - // If read buffer is 0 after swapping, all the data is caught up - if self.current_read_file.is_empty() { - return Err(Error::Done); + match persistence.load_next_read_file(&mut self.current_read_file) { + Err(Error::Done) => {} + Err(e) => { + error!("Couldn't read persisted file: {e}"); + return Err(e); + } + _ => return Ok(()), } + }; - return Ok(()); - } - - if let Err(e) = persistence.load_next_read_file(&mut self.current_read_file) { - self.current_read_file.clear(); - persistence.current_read_file_id.take(); - return Err(e); + // Swap read buffer with write buffer to read data from inmemory write + // buffer when all the backlog disk files are done. + mem::swap(&mut self.current_read_file, &mut self.current_write_file); + // Write buffer is emptied to ensure fresh start. + self.current_write_file.clear(); + // If read buffer is 0 after swapping, all the data is caught up + if self.current_read_file.is_empty() { + return Err(Error::Done); } - self.is_read_from_file = true; Ok(()) } diff --git a/uplink/src/base/serializer/mod.rs b/uplink/src/base/serializer/mod.rs index bb33efee0..798bd862c 100644 --- a/uplink/src/base/serializer/mod.rs +++ b/uplink/src/base/serializer/mod.rs @@ -266,7 +266,7 @@ impl StorageHandler { return Some((stream.to_owned(), publish)); } - // Reset read_stream if it is empty + // All packets read from storage Err(storage::Error::Done) => { if self.read_stream.take_if(|s| s == stream).is_some() { debug!("Done reading from: {}", stream.topic);