diff --git a/mehsh_check/src/analysis/analysis_command.rs b/mehsh_check/src/analysis/analysis_command.rs index 86c9144..8a9be7f 100644 --- a/mehsh_check/src/analysis/analysis_command.rs +++ b/mehsh_check/src/analysis/analysis_command.rs @@ -11,8 +11,6 @@ use tokio::process::Command; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use tokio::task::JoinHandle; -type CliOutput = Vec; - pub struct ExecuteAnalysisCommandHandler { notify_send: UnboundedSender<()>, } @@ -25,7 +23,7 @@ struct CommandExecutionContext { #[derive(Debug)] enum ExecuteMsg { - CliOutput(CliOutput), + CliOutput(Vec), Finish(()), } @@ -203,42 +201,54 @@ async fn execute_analysis_command( .stderr(Stdio::piped()) .kill_on_drop(true) .spawn() - .context("could not start child")?; + .expect("could not start child"); - let mut stdout_buffer = [0; 4096]; - let mut stdout = BufReader::new( + let stdout = BufReader::new( child .stdout .take() - .context("could not take stdout from child")?, + .expect("could not take stdout from child"), ); - let mut stderr_buffer = [0; 4096]; - let mut stderr = BufReader::new( + let stderr = BufReader::new( child .stderr .take() - .context("could not take stderr from child")?, + .expect("could not take stderr from child"), ); - loop { - ::tokio::select! { - stdout_read_res = stdout.read(&mut stdout_buffer) => { - let out : &[u8] = &stdout_buffer[..stdout_read_res.context("could not read stdout_read_res")?]; - if out.len() == 0 { - continue; - } - sender.send(ExecuteMsg::CliOutput(out.to_vec()))?; - }, - stderr_read_res = stderr.read(&mut stderr_buffer) => { - let out : &[u8] = &stderr_buffer[..stderr_read_res.context("could not read stdout_read_res")?]; - if out.len() == 0 { - continue; - } - sender.send(ExecuteMsg::CliOutput(out.to_vec()))?; - }, - res = child.wait() => { - return res.map_err(|e| anyhow!(e)); - }, + let jh_sender = sender.clone(); + let mut jh_read = stdout; + let jh_stdout : JoinHandle> = ::tokio::spawn(async move { + let mut buffer = [0; 4096]; + loop { + match jh_read.read(&mut buffer).await? { + 0 => return Ok(()), + size => { + let out : &[u8] = &buffer[..size]; + jh_sender.send(ExecuteMsg::CliOutput(out.to_vec())).context("could not send")?; + }, + } } - } + }); + + let jh_sender = sender.clone(); + let mut jh_read = stderr; + let jh_stderr : JoinHandle> = ::tokio::spawn(async move { + let mut buffer = [0; 4096]; + loop { + match jh_read.read(&mut buffer).await? { + 0 => return Ok(()), + size => { + let out : &[u8] = &buffer[..size]; + jh_sender.send(ExecuteMsg::CliOutput(out.to_vec())).context("could not send")?; + }, + } + } + }); + + let (jh1, jh2) = ::tokio::join!(jh_stdout, jh_stderr); + jh1??; + jh2??; + + return child.wait().await.map_err(|e| anyhow!(e)); }