diff --git a/packages/desktop/src-tauri/src/cli.rs b/packages/desktop/src-tauri/src/cli.rs index 8d02ba7d4..cb6d3ad37 100644 --- a/packages/desktop/src-tauri/src/cli.rs +++ b/packages/desktop/src-tauri/src/cli.rs @@ -6,13 +6,17 @@ use process_wrap::tokio::ProcessGroup; use process_wrap::tokio::{JobObject, KillOnDrop}; #[cfg(unix)] use std::os::unix::process::ExitStatusExt; +use std::sync::Arc; use std::{process::Stdio, time::Duration}; use tauri::{AppHandle, Manager, path::BaseDirectory}; use tauri_plugin_store::StoreExt; use tauri_specta::Event; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::process::Command; -use tokio::sync::{mpsc, oneshot}; +use tokio::{ + io::{AsyncBufRead, AsyncBufReadExt, BufReader}, + process::Command, + sync::{mpsc, oneshot}, + task::JoinHandle, +}; use tokio_stream::wrappers::ReceiverStream; use tracing::Instrument; @@ -34,8 +38,8 @@ pub struct Config { #[derive(Clone, Debug)] pub enum CommandEvent { - Stdout(Vec), - Stderr(Vec), + Stdout(String), + Stderr(String), Error(String), Terminated(TerminatedPayload), } @@ -64,10 +68,11 @@ pub async fn get_config(app: &AppHandle) -> Option { events .fold(String::new(), async |mut config_str, event| { - if let CommandEvent::Stdout(stdout) = event - && let Ok(s) = str::from_utf8(&stdout) - { - config_str += s + if let CommandEvent::Stdout(s) = &event { + config_str += s.as_str() + } + if let CommandEvent::Stderr(s) = &event { + config_str += s.as_str() } config_str @@ -317,9 +322,9 @@ pub fn spawn_command( cmd }; - cmd.stdin(Stdio::null()); cmd.stdout(Stdio::piped()); cmd.stderr(Stdio::piped()); + cmd.stdin(Stdio::null()); #[cfg(windows)] cmd.creation_flags(0x0800_0000); @@ -337,32 +342,24 @@ pub fn spawn_command( } let mut child = wrap.spawn()?; - let stdout = child.stdout().take(); - let stderr = child.stderr().take(); + let guard = Arc::new(tokio::sync::RwLock::new(())); let (tx, rx) = mpsc::channel(256); let (kill_tx, mut kill_rx) = mpsc::channel(1); - if let Some(stdout) = stdout { - let tx = tx.clone(); - tokio::spawn(async move { - let mut lines = BufReader::new(stdout).lines(); - while let Ok(Some(line)) = lines.next_line().await { - let _ = tx.send(CommandEvent::Stdout(line.into_bytes())).await; - } - }); - } + let stdout = spawn_pipe_reader( + tx.clone(), + guard.clone(), + BufReader::new(child.stdout().take().unwrap()), + CommandEvent::Stdout, + ); + let stderr = spawn_pipe_reader( + tx.clone(), + guard.clone(), + BufReader::new(child.stderr().take().unwrap()), + CommandEvent::Stderr, + ); - if let Some(stderr) = stderr { - let tx = tx.clone(); - tokio::spawn(async move { - let mut lines = BufReader::new(stderr).lines(); - while let Ok(Some(line)) = lines.next_line().await { - let _ = tx.send(CommandEvent::Stderr(line.into_bytes())).await; - } - }); - } - - tokio::spawn(async move { + tokio::task::spawn(async move { let mut kill_open = true; let status = loop { match child.try_wait() { @@ -394,6 +391,9 @@ pub fn spawn_command( let _ = tx.send(CommandEvent::Error(err.to_string())).await; } } + + stdout.abort(); + stderr.abort(); }); let event_stream = ReceiverStream::new(rx); @@ -404,9 +404,7 @@ pub fn spawn_command( fn signal_from_status(status: std::process::ExitStatus) -> Option { #[cfg(unix)] - { - return status.signal(); - } + return status.signal(); #[cfg(not(unix))] { @@ -442,12 +440,10 @@ pub fn serve( events .for_each(move |event| { match event { - CommandEvent::Stdout(line_bytes) => { - let line = String::from_utf8_lossy(&line_bytes); + CommandEvent::Stdout(line) => { tracing::info!("{line}"); } - CommandEvent::Stderr(line_bytes) => { - let line = String::from_utf8_lossy(&line_bytes); + CommandEvent::Stderr(line) => { tracing::info!("{line}"); } CommandEvent::Error(err) => { @@ -499,11 +495,7 @@ pub mod sqlite_migration { } future::ready(match &event { - CommandEvent::Stdout(stdout) => { - let Ok(s) = str::from_utf8(stdout) else { - return future::ready(None); - }; - + CommandEvent::Stdout(s) | CommandEvent::Stderr(s) => { if let Some(s) = s.strip_prefix("sqlite-migration:").map(|s| s.trim()) { if let Ok(progress) = s.parse::() { let _ = SqliteMigrationProgress::InProgress(progress).emit(&app); @@ -522,3 +514,41 @@ pub mod sqlite_migration { }) } } + +fn spawn_pipe_reader CommandEvent + Send + Copy + 'static>( + tx: mpsc::Sender, + guard: Arc>, + pipe_reader: impl AsyncBufRead + Send + Unpin + 'static, + wrapper: F, +) -> JoinHandle<()> { + tokio::spawn(async move { + let _lock = guard.read().await; + let reader = BufReader::new(pipe_reader); + + read_line(reader, tx, wrapper).await; + }) +} + +async fn read_line CommandEvent + Send + Copy + 'static>( + reader: BufReader, + tx: mpsc::Sender, + wrapper: F, +) { + let mut lines = reader.lines(); + loop { + let line = lines.next_line().await; + + match line { + Ok(s) => { + if let Some(s) = s { + let _ = tx.clone().send(wrapper(s)).await; + } + } + Err(e) => { + let tx_ = tx.clone(); + let _ = tx_.send(CommandEvent::Error(e.to_string())).await; + break; + } + } + } +}