diff --git a/CHANGELOG.md b/CHANGELOG.md index acf09a3..8a46956 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,10 @@ - Updated default block size to 256 bytes and generally improved handling of larger files in storage layer. - Archived unused `car-utility` and `cpp-transmit-example` projects. - Converted all crate dependencies to workspace dependencies, tightened version specificity, narrowed features when possible. +- Increase default MTU to 512 bytes to accommodate more realistic systems +- Move functions for fetching all DAG cids & blocks into storage layer +- Increase default file block size to 100kb for better performance when importing larger files +- Small revision to testing plan ## [0.6.3] - 2023-05-04 diff --git a/controller/src/main.rs b/controller/src/main.rs index ca74c88..d079627 100644 --- a/controller/src/main.rs +++ b/controller/src/main.rs @@ -18,7 +18,7 @@ pub struct Cli { impl Cli { pub async fn run(&self) -> Result<()> { - let transport = UdpTransport::new(&self.bind_address, 60)?; + let transport = UdpTransport::new(&self.bind_address, 512)?; let command = Message::ApplicationAPI(self.command.clone()); let cmd_str = serde_json::to_string(&command)?; diff --git a/local-storage/src/provider.rs b/local-storage/src/provider.rs index b785e0c..a78b6e5 100644 --- a/local-storage/src/provider.rs +++ b/local-storage/src/provider.rs @@ -14,6 +14,15 @@ pub trait StorageProvider { fn get_links_by_cid(&self, cid: &str) -> Result>; fn list_available_dags(&self) -> Result>; fn get_missing_cid_blocks(&self, cid: &str) -> Result>; + fn get_dag_blocks_by_window( + &self, + cid: &str, + offset: u32, + window_size: u32, + ) -> Result>; + fn get_all_dag_cids(&self, cid: &str) -> Result>; + fn get_all_dag_blocks(&self, cid: &str) -> Result>; + fn get_all_blocks_under_cid(&self, cid: &str) -> Result>; } pub struct SqliteStorageProvider { @@ -201,6 +210,140 @@ impl StorageProvider for SqliteStorageProvider { .collect(); Ok(cids) } + + fn get_dag_blocks_by_window( + &self, + cid: &str, + offset: u32, + window_size: u32, + ) -> Result> { + let blocks: Vec = self + .conn + .prepare( + " + WITH RECURSIVE cids(x,y) AS ( + SELECT cid,data FROM blocks WHERE cid = (?1) + UNION + SELECT cid,data FROM blocks b + INNER JOIN links l ON b.cid==l.block_cid + INNER JOIN cids ON (root_cid=x) + ) + SELECT x,y FROM cids + LIMIT (?2) OFFSET (?3); + ", + )? + .query_map( + [cid, &format!("{window_size}"), &format!("{offset}")], + |row| { + let cid_str: String = row.get(0)?; + let data: Vec = row.get(1)?; + let links = match self.get_links_by_cid(&cid_str) { + Ok(links) => links, + Err(_) => vec![], + }; + Ok(StoredBlock { + cid: cid_str, + data, + links, + }) + }, + )? + .filter_map(|b| b.ok()) + .collect(); + + Ok(blocks) + } + + fn get_all_dag_cids(&self, cid: &str) -> Result> { + let cids: Vec = self + .conn + .prepare( + " + WITH RECURSIVE cids(x) AS ( + VALUES(?1) + UNION + SELECT block_cid FROM links JOIN cids ON root_cid=x + ) + SELECT x FROM cids; + ", + )? + .query_map([cid], |row| { + let cid_str: String = row.get(0)?; + Ok(cid_str) + })? + .filter_map(|b| b.ok()) + .collect(); + + Ok(cids) + } + + fn get_all_dag_blocks(&self, cid: &str) -> Result> { + let blocks: Vec = self + .conn + .prepare( + " + WITH RECURSIVE cids(x,y) AS ( + SELECT cid,data FROM blocks WHERE cid = (?1) + UNION + SELECT cid,data FROM blocks b + INNER JOIN links l ON b.cid==l.block_cid + INNER JOIN cids ON (root_cid=x) + ) + SELECT x,y FROM cids + ", + )? + .query_map([cid], |row| { + let cid_str: String = row.get(0)?; + let data: Vec = row.get(1)?; + let links = match self.get_links_by_cid(&cid_str) { + Ok(links) => links, + Err(_) => vec![], + }; + Ok(StoredBlock { + cid: cid_str, + data, + links, + }) + })? + .filter_map(|b| b.ok()) + .collect(); + + Ok(blocks) + } + + fn get_all_blocks_under_cid(&self, cid: &str) -> Result> { + let blocks: Vec = self + .conn + .prepare( + " + WITH RECURSIVE cids(x,y) AS ( + SELECT cid,data FROM blocks WHERE cid = (?1) + UNION + SELECT cid,data FROM blocks b + INNER JOIN links l ON b.cid==l.block_cid + INNER JOIN cids ON (root_cid=x) + ) + SELECT x,y FROM cids + ", + )? + .query_map([cid], |row| { + let cid_str: String = row.get(0)?; + let data: Vec = row.get(1)?; + let links = match self.get_links_by_cid(&cid_str) { + Ok(links) => links, + Err(_) => vec![], + }; + Ok(StoredBlock { + cid: cid_str, + data, + links, + }) + })? + .filter_map(|b| b.ok()) + .collect(); + + Ok(blocks) + } } #[cfg(test)] diff --git a/local-storage/src/storage.rs b/local-storage/src/storage.rs index acb8bfe..e0076d0 100644 --- a/local-storage/src/storage.rs +++ b/local-storage/src/storage.rs @@ -18,7 +18,8 @@ pub struct Storage { } // TODO: Make this configurable -const BLOCK_SIZE: usize = 256; +// Changing to 1MB to optimize for larger files +const BLOCK_SIZE: usize = 1024 * 100; impl Storage { pub fn new(provider: Box) -> Self { @@ -39,6 +40,8 @@ impl Storage { let blocks = blocks?; let mut root_cid: Option = None; + let mut stored_blocks = vec![]; + blocks.iter().for_each(|b| { let links = b .links() @@ -50,13 +53,22 @@ impl Storage { data: b.data().to_vec(), links, }; + // First validate each block + if let Err(e) = stored.validate() { + error!("Failed to validate {}, {e}", b.cid()); + } if let Err(e) = self.provider.import_block(&stored) { error!("Failed to import block {e}"); } if !stored.links.is_empty() { - root_cid = Some(stored.cid); + root_cid = Some(stored.cid.clone()); } + stored_blocks.push(stored); }); + info!("Validating imported blocks {}", blocks.len()); + if let Err(e) = crate::block::validate_dag(&stored_blocks) { + error!("Failed to validate dag on import: {e}"); + } if blocks.len() == 1 { if let Some(first) = blocks.first() { root_cid = Some(first.cid().to_string()); @@ -77,7 +89,7 @@ impl Storage { bail!(StorageError::DagIncomplete(cid.to_string())) } // Fetch all blocks tied to links under given cid - let child_blocks = self.get_all_blocks_under_cid(cid)?; + let child_blocks = self.get_all_dag_blocks(cid)?; // Open up file path for writing let mut output_file = FsFile::create(path)?; // Walk the StoredBlocks and write out to path @@ -102,31 +114,12 @@ impl Storage { self.provider.get_block_by_cid(cid) } - pub fn get_all_blocks_under_cid(&self, cid: &str) -> Result> { - // Get StoredBlock by cid and check for links - let root_block = self.provider.get_block_by_cid(cid)?; - // If links, grab all appropriate StoredBlocks - let mut child_blocks = vec![]; - for link in root_block.links { - let block = self.provider.get_block_by_cid(&link)?; - if !block.links.is_empty() { - child_blocks.append(&mut self.get_all_blocks_under_cid(&block.cid)?); - } - child_blocks.push(block); - } - Ok(child_blocks) + pub fn get_all_dag_cids(&self, cid: &str) -> Result> { + self.provider.get_all_dag_cids(cid) } - pub fn get_dag_blocks(&self, cid: &str) -> Result> { - // Get StoredBlock by cid and check for links - let root_block = self.provider.get_block_by_cid(cid)?; - // If links, grab all appropriate StoredBlocks - let mut blocks = vec![]; - for link in &root_block.links { - blocks.push(self.provider.get_block_by_cid(link)?); - } - blocks.push(root_block); - Ok(blocks) + pub fn get_all_dag_blocks(&self, cid: &str) -> Result> { + self.provider.get_all_dag_blocks(cid) } pub fn import_block(&self, block: &StoredBlock) -> Result<()> { @@ -141,6 +134,19 @@ impl Storage { pub fn list_available_dags(&self) -> Result> { self.provider.list_available_dags() } + + pub fn get_dag_blocks_by_window( + &self, + cid: &str, + window_size: u32, + window_num: u32, + ) -> Result> { + println!("offset = {} * {}", window_size, window_num); + let offset = window_size * window_num; + + self.provider + .get_dag_blocks_by_window(cid, offset, window_size) + } } #[cfg(test)] @@ -198,7 +204,7 @@ pub mod tests { test_file .write_binary( "654684646847616846846876168468416874616846416846846186468464684684648684684" - .repeat(10) + .repeat(500) .as_bytes(), ) .unwrap(); @@ -217,7 +223,7 @@ pub mod tests { #[test] pub fn export_from_storage_various_file_sizes_binary_data() { - for size in [100, 200, 300, 500, 1000] { + for size in [100, 200, 300, 500, 1_000] { let harness = TestHarness::new(); let temp_dir = assert_fs::TempDir::new().unwrap(); let test_file = temp_dir.child("data.txt"); @@ -243,6 +249,55 @@ pub mod tests { } } + #[test] + pub fn test_get_dag_blocks_by_window() { + let harness = TestHarness::new(); + let temp_dir = assert_fs::TempDir::new().unwrap(); + let test_file = temp_dir.child("data.txt"); + + let data_size = BLOCK_SIZE * 50; + let mut data = Vec::::new(); + data.resize(data_size, 1); + thread_rng().fill_bytes(&mut data); + + test_file.write_binary(&data).unwrap(); + let cid = harness.storage.import_path(test_file.path()).unwrap(); + + let window_size: u32 = 10; + let mut window_num = 0; + + let all_dag_blocks = harness.storage.get_all_dag_blocks(&cid).unwrap(); + + for chunk in all_dag_blocks.chunks(window_size as usize).into_iter() { + let window_blocks = harness + .storage + .get_dag_blocks_by_window(&cid, window_size, window_num) + .unwrap(); + assert_eq!(chunk, &window_blocks); + window_num += 1; + } + } + + #[test] + pub fn compare_get_blocks_to_get_cids() { + let harness = TestHarness::new(); + let temp_dir = assert_fs::TempDir::new().unwrap(); + let test_file = temp_dir.child("data.txt"); + + let data_size = BLOCK_SIZE * 50; + let mut data = Vec::::new(); + data.resize(data_size, 1); + thread_rng().fill_bytes(&mut data); + + test_file.write_binary(&data).unwrap(); + let cid = harness.storage.import_path(test_file.path()).unwrap(); + + let blocks = harness.storage.get_all_dag_blocks(&cid).unwrap(); + let cids = harness.storage.get_all_dag_cids(&cid).unwrap(); + + assert_eq!(blocks.len(), cids.len()); + } + // TODO: duplicated data is not being handled correctly right now, need to fix this // #[test] // pub fn export_from_storage_various_file_sizes_duplicated_data() { diff --git a/messages/src/api.rs b/messages/src/api.rs index 91c8731..ccea14b 100644 --- a/messages/src/api.rs +++ b/messages/src/api.rs @@ -60,13 +60,6 @@ pub enum ApplicationAPI { Receive { listen_addr: String, }, - /// Information about the next pass used for calculating - /// data transfer parameters - NextPassInfo { - duration: u32, - send_bytes: u32, - receive_bytes: u32, - }, /// Request Available Blocks RequestAvailableBlocks, /// Advertise all available blocks by CID @@ -77,10 +70,6 @@ pub enum ApplicationAPI { DeleteCid { cid: String, }, - /// Request available DAGs - RequestAvailableDags, - /// Advertise available DAGs as a map of CID to filename - // AvailableDags { dags: BTreeMap }, /// Delete block from local store DeleteBlock { cid: String, @@ -99,4 +88,16 @@ pub enum ApplicationAPI { Version { version: String, }, + // TODO: Implement later + // Information about the next pass used for calculating + // data transfer parameters + // NextPassInfo { + // duration: u32, + // send_bytes: u32, + // receive_bytes: u32, + // }, + // Request available DAGs + // RequestAvailableDags, + // Advertise available DAGs as a map of CID to filename + // AvailableDags { dags: BTreeMap }, } diff --git a/messages/src/protocol.rs b/messages/src/protocol.rs index e5e637b..a4e89ef 100644 --- a/messages/src/protocol.rs +++ b/messages/src/protocol.rs @@ -65,8 +65,4 @@ pub enum DataProtocol { cid: String, blocks: Vec, }, - /// Sets current connected state - SetConnected { - connected: bool, - }, } diff --git a/myceli/src/config.rs b/myceli/src/config.rs index 6da64fc..9472704 100644 --- a/myceli/src/config.rs +++ b/myceli/src/config.rs @@ -11,7 +11,7 @@ pub struct Config { pub retry_timeout_duration: u64, pub storage_path: String, pub mtu: u16, - pub window_size: u8, + pub window_size: u32, } impl Default for Config { diff --git a/myceli/src/handlers.rs b/myceli/src/handlers.rs index a24ea35..f597da4 100644 --- a/myceli/src/handlers.rs +++ b/myceli/src/handlers.rs @@ -13,7 +13,7 @@ pub fn import_file(path: &str, storage: Rc) -> Result { } pub fn validate_dag(cid: &str, storage: Rc) -> Result { - let dag_blocks = storage.get_dag_blocks(cid)?; + let dag_blocks = storage.get_all_dag_blocks(cid)?; let resp = match local_storage::block::validate_dag(&dag_blocks) { Ok(_) => "Dag is valid".to_string(), Err(e) => e.to_string(), @@ -167,7 +167,7 @@ pub mod tests { let blocks = harness .storage - .get_all_blocks_under_cid(&imported_file_cid) + .get_all_dag_blocks(&imported_file_cid) .unwrap(); for block in blocks { let (validated_cid, result) = match validate_dag(&block.cid, harness.storage.clone()) { diff --git a/myceli/src/listener.rs b/myceli/src/listener.rs index ced6c66..8e5fb44 100644 --- a/myceli/src/listener.rs +++ b/myceli/src/listener.rs @@ -8,7 +8,7 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::rc::Rc; use std::sync::mpsc::{self, Sender}; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread::spawn; use tracing::{debug, error, info}; use transports::Transport; @@ -17,7 +17,7 @@ pub struct Listener { storage_path: String, storage: Rc, transport: Arc, - connected: bool, + connected: Arc>, } impl Listener { @@ -34,17 +34,17 @@ impl Listener { storage_path: storage_path.to_string(), storage, transport, - connected: true, + connected: Arc::new(Mutex::new(true)), }) } - pub fn start(&mut self, shipper_timeout_duration: u64, shipper_window_size: u8) -> Result<()> { + pub fn start(&mut self, shipper_timeout_duration: u64, shipper_window_size: u32) -> Result<()> { // First setup the shipper and its pieces let (shipper_sender, shipper_receiver) = mpsc::channel(); let shipper_storage_path = self.storage_path.to_string(); let shipper_sender_clone = shipper_sender.clone(); let shipper_transport = Arc::clone(&self.transport); - let initial_connected = self.connected; + let initial_connected = Arc::clone(&self.connected); spawn(move || { let mut shipper = Shipper::new( &shipper_storage_path, @@ -143,16 +143,17 @@ impl Listener { })) } Message::ApplicationAPI(ApplicationAPI::SetConnected { connected }) => { - self.connected = connected; - shipper_sender.send(( - DataProtocol::SetConnected { connected }, - sender_addr.to_string(), - ))?; + let prev_connected = *self.connected.lock().unwrap(); + *self.connected.lock().unwrap() = connected; + if !prev_connected && connected { + shipper_sender + .send((DataProtocol::ResumeTransmitAllDags, sender_addr.to_string()))?; + } None } Message::ApplicationAPI(ApplicationAPI::GetConnected) => { Some(Message::ApplicationAPI(ApplicationAPI::ConnectedState { - connected: self.connected, + connected: *self.connected.lock().unwrap(), })) } Message::ApplicationAPI(ApplicationAPI::ResumeTransmitDag { cid }) => { @@ -167,6 +168,14 @@ impl Listener { .send((DataProtocol::ResumeTransmitAllDags, sender_addr.to_string()))?; None } + Message::ApplicationAPI(ApplicationAPI::ValidateDagResponse { cid, result }) => { + info!("Received ValidateDagResponse from {sender_addr} for {cid}: {result}"); + None + } + Message::ApplicationAPI(ApplicationAPI::FileImported { path, cid }) => { + info!("Received FileImported from {sender_addr}: {path} -> {cid}"); + None + } // Default case for valid messages which don't have handling code implemented yet message => { info!("Received unhandled message: {:?}", message); diff --git a/myceli/src/shipper.rs b/myceli/src/shipper.rs index 44404c3..1b81efe 100644 --- a/myceli/src/shipper.rs +++ b/myceli/src/shipper.rs @@ -11,15 +11,17 @@ use std::net::ToSocketAddrs; use std::rc::Rc; use std::sync::mpsc::{Receiver, Sender}; use std::sync::Arc; +use std::sync::Mutex; use std::thread::{sleep, spawn}; use std::time::Duration; use tracing::{error, info}; use transports::Transport; +#[derive(Clone)] struct WindowSession { pub max_retries: u8, pub remaining_window_retries: u8, - pub window_num: u8, + pub window_num: u32, pub target_addr: String, } @@ -37,9 +39,9 @@ pub struct Shipper { // Transport shared between listener and shipper for a consistent listening interface transport: Arc, // Default window size for dag transfers - window_size: u8, + window_size: u32, // Current connection status - connected: bool, + connected: Arc>, } impl Shipper { @@ -48,9 +50,9 @@ impl Shipper { receiver: Receiver<(DataProtocol, String)>, sender: Sender<(DataProtocol, String)>, retry_timeout_duration: u64, - window_size: u8, + window_size: u32, transport: Arc, - connected: bool, + connected: Arc>, ) -> Result> { let provider = SqliteStorageProvider::new(storage_path)?; provider.setup()?; @@ -81,7 +83,7 @@ impl Shipper { pub fn receive(&mut self, message: DataProtocol, sender_addr: &str) -> Result<()> { match message { DataProtocol::RequestTransmitBlock { cid, target_addr } => { - if self.connected { + if *self.connected.lock().unwrap() { self.transmit_block(&cid, &target_addr)?; } } @@ -94,7 +96,7 @@ impl Shipper { self.begin_dag_window_session(&cid, &target_addr, retries)?; } DataProtocol::RetryDagSession { cid, target_addr } => { - if self.connected && self.window_sessions.contains_key(&cid) { + if *self.connected.lock().unwrap() && self.window_sessions.contains_key(&cid) { info!("Received retry dag session, sending get missing req to {target_addr}"); if let Some(session) = self.window_sessions.get(&cid) { let blocks = self.get_dag_window_blocks(&cid, session.window_num)?; @@ -111,50 +113,53 @@ impl Shipper { } } DataProtocol::RequestMissingDagWindowBlocks { cid, blocks } => { - let missing_blocks_msg = handlers::get_missing_dag_blocks_window_protocol( - &cid, - blocks, - Rc::clone(&self.storage), - )?; - self.transmit_msg(missing_blocks_msg, sender_addr)?; + if *self.connected.lock().unwrap() { + let missing_blocks_msg = handlers::get_missing_dag_blocks_window_protocol( + &cid, + blocks, + Rc::clone(&self.storage), + )?; + self.transmit_msg(missing_blocks_msg, sender_addr)?; + } } DataProtocol::RequestMissingDagBlocks { cid } => { - let missing_blocks_msg = - handlers::get_missing_dag_blocks(&cid, Rc::clone(&self.storage))?; - self.transmit_msg(missing_blocks_msg, sender_addr)?; + if *self.connected.lock().unwrap() { + let missing_blocks_msg = + handlers::get_missing_dag_blocks(&cid, Rc::clone(&self.storage))?; + self.transmit_msg(missing_blocks_msg, sender_addr)?; + } } DataProtocol::MissingDagBlocks { cid, blocks } => { - // If no blocks are missing, then attempt to move to next window - if blocks.is_empty() { - self.increment_dag_window_session(&cid, sender_addr)?; - } else { - info!( - "Dag {cid} is missing {} blocks, sending again", - blocks.len() - ); - for b in blocks.clone() { - self.transmit_block(&b, sender_addr)?; + if *self.connected.lock().unwrap() { + // If no blocks are missing, then attempt to move to next window + if blocks.is_empty() { + self.increment_dag_window_session(&cid, sender_addr)?; + } else { + info!( + "Dag {cid} is missing {} blocks, sending again", + blocks.len() + ); + for b in blocks.clone() { + self.transmit_block(&b, sender_addr)?; + } + self.transmit_msg( + Message::DataProtocol(DataProtocol::RequestMissingDagWindowBlocks { + cid, + blocks, + }), + sender_addr, + )?; } - self.transmit_msg( - Message::DataProtocol(DataProtocol::RequestMissingDagWindowBlocks { - cid, - blocks, - }), - sender_addr, - )?; } } DataProtocol::ResumeTransmitDag { cid } => { - info!("Shipper resume {cid}"); - self.resume_dag_window_session(&cid)?; + if *self.connected.lock().unwrap() { + info!("Shipper resume {cid}"); + self.resume_dag_window_session(&cid)?; + } } DataProtocol::ResumeTransmitAllDags => { - self.resume_all_dag_window_sessions()?; - } - DataProtocol::SetConnected { connected } => { - let prev_connected = self.connected; - self.connected = connected; - if !prev_connected && connected { + if *self.connected.lock().unwrap() { self.resume_all_dag_window_sessions()?; } } @@ -175,7 +180,7 @@ impl Shipper { } // Helper function for incrementing a session's window and resetting the retries - fn next_dag_window_session(&mut self, cid: &str) -> Option { + fn next_dag_window_session(&mut self, cid: &str) -> Option { if let Some(session) = self.window_sessions.get_mut(cid) { session.window_num += 1; session.remaining_window_retries = session.max_retries; @@ -220,27 +225,29 @@ impl Shipper { fn dag_window_session_run( &mut self, cid: &str, - window_num: u8, + window_num: u32, target_addr: &str, ) -> Result<()> { - let blocks = self.transmit_dag_window(cid, window_num, target_addr)?; - if !blocks.is_empty() { - info!( - "Transmitting window {} for {}, {} blocks", - window_num, - cid, - blocks.len() - ); - self.transmit_msg( - Message::DataProtocol(DataProtocol::RequestMissingDagWindowBlocks { - cid: cid.to_string(), - blocks, - }), - target_addr, - )?; - } else { - info!("Dag transfer session for {cid} is complete"); - self.end_dag_window_session(cid); + if *self.connected.lock().unwrap() { + let blocks = self.transmit_dag_window(cid, window_num, target_addr)?; + if !blocks.is_empty() { + info!( + "Transmitted window {} for {}, {} blocks", + window_num, + cid, + blocks.len() + ); + self.transmit_msg( + Message::DataProtocol(DataProtocol::RequestMissingDagWindowBlocks { + cid: cid.to_string(), + blocks, + }), + target_addr, + )?; + } else { + info!("Dag transfer session for {cid} is complete"); + self.end_dag_window_session(cid); + } } Ok(()) } @@ -248,12 +255,17 @@ impl Shipper { // This function resumes the transmission of a DAG by fetching the relevant session // and running the last sent window again fn resume_dag_window_session(&mut self, cid: &str) -> Result<()> { - if let Some(session) = self.window_sessions.get(cid) { + if *self.connected.lock().unwrap() { + let session = if let Some(session) = self.window_sessions.get(cid) { + session.clone() + } else { + info!("session not found for {cid}"); + return Ok(()); + }; info!("start dag window session for {cid}"); // Need to reset the window retries here - self.dag_window_session_run(cid, session.window_num, &session.target_addr.clone())?; - } else { - info!("session not found for {cid}"); + self.dag_window_session_run(cid, session.window_num, &session.target_addr)?; + self.start_dag_window_retry_timeout(cid, &session.target_addr); } Ok(()) @@ -274,7 +286,7 @@ impl Shipper { target_addr: &str, retries: u8, ) -> Result<()> { - if self.connected { + if *self.connected.lock().unwrap() { self.dag_window_session_run(cid, 0, target_addr)?; let retries = if retries == 0 { 0 } else { retries - 1 }; self.open_dag_window_session(cid, retries, target_addr); @@ -287,8 +299,10 @@ impl Shipper { } pub fn increment_dag_window_session(&mut self, cid: &str, target_addr: &str) -> Result<()> { - if let Some(next_window_num) = self.next_dag_window_session(cid) { - self.dag_window_session_run(cid, next_window_num, target_addr)?; + if *self.connected.lock().unwrap() { + if let Some(next_window_num) = self.next_dag_window_session(cid) { + self.dag_window_session_run(cid, next_window_num, target_addr)?; + } } Ok(()) @@ -321,7 +335,7 @@ impl Shipper { } pub fn transmit_block(&mut self, cid: &str, target_addr: &str) -> Result<()> { - if self.connected { + if *self.connected.lock().unwrap() { let block = self.storage.get_block_by_cid(cid)?; self.transmit_blocks(&[block], target_addr)?; } @@ -329,9 +343,9 @@ impl Shipper { } pub fn transmit_dag(&mut self, cid: &str, target_addr: &str) -> Result<()> { - if self.connected { + if *self.connected.lock().unwrap() { let root_block = self.storage.get_block_by_cid(cid)?; - let blocks = self.storage.get_all_blocks_under_cid(cid)?; + let blocks = self.storage.get_all_dag_blocks(cid)?; let mut all_blocks = vec![root_block]; all_blocks.extend(blocks); self.transmit_blocks(&all_blocks, target_addr)?; @@ -342,10 +356,10 @@ impl Shipper { pub fn transmit_dag_window( &mut self, cid: &str, - window_num: u8, + window_num: u32, target_addr: &str, ) -> Result> { - if self.connected { + if *self.connected.lock().unwrap() { let mut transmitted_cids = vec![]; let window_blocks = self.get_dag_window_blocks(cid, window_num)?; @@ -367,28 +381,11 @@ impl Shipper { } } - fn get_dag_window_blocks(&mut self, cid: &str, window_num: u8) -> Result> { - let root_block = self.storage.get_block_by_cid(cid)?; - let blocks = self.storage.get_all_blocks_under_cid(cid)?; - let mut all_blocks = vec![root_block]; - all_blocks.extend(blocks); - - // TODO: Push this windowing down into the storage layer instead of - // grabbing all blocks every time - let window_start: usize = (self.window_size * window_num) as usize; - let window_end: usize = window_start + self.window_size as usize; - - if window_start > all_blocks.len() { - Ok(vec![]) - } else { - all_blocks.drain(..window_start); - - if window_end < all_blocks.len() { - all_blocks.drain(window_end..); - } - - Ok(all_blocks) - } + fn get_dag_window_blocks(&mut self, cid: &str, window_num: u32) -> Result> { + let blocks = self + .storage + .get_dag_blocks_by_window(cid, self.window_size, window_num)?; + Ok(blocks) } fn receive_block(&mut self, block: TransmissionBlock) -> Result<()> { @@ -433,7 +430,7 @@ mod tests { use rand::{thread_rng, Rng, RngCore}; use std::path::PathBuf; use std::sync::mpsc; - use std::sync::Arc; + use std::sync::{Arc, Mutex}; use std::time::Duration; use transports::UdpTransport; @@ -472,7 +469,7 @@ mod tests { 10, 5, shipper_transport, - true, + Arc::new(Mutex::new(true)), ) .unwrap(); TestShipper { diff --git a/myceli/tests/utils/mod.rs b/myceli/tests/utils/mod.rs index 87ddb83..1f9e191 100644 --- a/myceli/tests/utils/mod.rs +++ b/myceli/tests/utils/mod.rs @@ -49,7 +49,7 @@ impl TestListener { pub fn generate_file(&self) -> Result { let mut data = Vec::::new(); - data.resize(256 * 5, 1); + data.resize(256 * 50, 1); thread_rng().fill_bytes(&mut data); let tmp_file = self.test_dir.child("test.file"); diff --git a/testing/testing-plan.md b/testing/testing-plan.md index 3addbb8..b53284e 100644 --- a/testing/testing-plan.md +++ b/testing/testing-plan.md @@ -20,15 +20,15 @@ Command Details: This test case passes if both steps pass. -## Test Case - Retrieve an IPFS File +## Test Case - Transmit an IPFS File (Ground to Space) Steps: -1. Using the controller software, send the `ImportFile` command to the `myceli` space instance with a known good path for the one-pass payload file. +1. Using the controller software, send the `ImportFile` command to the `myceli` ground instance with a known good path for the one-pass payload file. - This step passes if an `FileImported` response with CID is received. Any other response / no response is a failure. -1. Using the controller software, send the `TransmitDag` command to the `myceli` space instance with the CID obtained from the `FileImported` response and with the network address of the space-to-ground radio link. -1. Using the controller software, send the `ValidateDag` command to the `myceli` ground instance with the CID obtained from the `FileImported` response. +1. Using the controller software, send the `TransmitDag` command to the `myceli` ground instance with the CID obtained from the `FileImported` response and with the network address of the ground-to-space radio link. +1. Using the controller software, send the `ValidateDag` command to the `myceli` space instance with the CID obtained from the `FileImported` response. - This step passes if an `ValidateDagResponse` response with true. Any other response / no response is a failure. -1. Using the controller software, send the `ExportDag` command to the `myceli` ground instance with the CID obtained from the `FileImported` response and a writeable file path. +1. Using the controller software, send the `ExportDag` command to the `myceli` space instance with the CID obtained from the `FileImported` response and a writeable file path. - This step passes if `myceli` is able to correctly write a file to the given file path. Command Details: @@ -43,7 +43,7 @@ Command Details: This test case passes if the final step is successful and the resulting written file matches the onboard payload file. -## Test Case - Send and Retrieve New IPFS File +## Test Case - Transmit Back & Forth, and Export File with IPFS Steps: 1. Using the controller software, send the `ImportFile` command to the `myceli` ground instance with a known good path for the one-pass payload file.