diff --git a/apps/oxlint/src/tsgolint.rs b/apps/oxlint/src/tsgolint.rs index 05b9fac6092f0..bd74cfecced12 100644 --- a/apps/oxlint/src/tsgolint.rs +++ b/apps/oxlint/src/tsgolint.rs @@ -1,6 +1,6 @@ use std::{ ffi::OsStr, - io::{Read, Write}, + io::{ErrorKind, Read, Write}, path::{Path, PathBuf}, sync::Arc, }; @@ -117,75 +117,133 @@ impl<'a> TsGoLintState<'a> { let mut stdin = child.stdin.take().expect("Failed to open tsgolint stdin"); - std::thread::spawn(move || { - let json = serde_json::to_string(&json_input).expect("Failed to serialize JSON"); - - stdin.write_all(json.as_bytes()).expect("Failed to write to tsgolint stdin"); - }); - - // TODO: Stream diagnostics as they are emitted, rather than waiting - let output = child.wait_with_output().expect("Failed to wait for tsgolint process"); - - if !output.status.success() { - return Err(format!("tsgolint process exited with status: {}", output.status)); + // Write the input synchronously and handle BrokenPipe gracefully in case the child + // exits early and closes its stdin. + let json = serde_json::to_string(&json_input).expect("Failed to serialize JSON"); + if let Err(e) = stdin.write_all(json.as_bytes()) { + // If the child closed stdin early, avoid crashing on SIGPIPE/BrokenPipe. + if e.kind() != ErrorKind::BrokenPipe { + return Err(format!("Failed to write to tsgolint stdin: {e}")); + } } + // Explicitly drop stdin to send EOF to the child. + drop(stdin); + + // Stream diagnostics as they are emitted, rather than waiting for all output + let mut stdout = child.stdout.take().expect("Failed to open tsgolint stdout"); + + // Process stdout stream in a separate thread to send diagnostics as they arrive + let cwd_clone = self.cwd.clone(); + + let stdout_handler = std::thread::spawn(move || -> Result<(), String> { + let mut buffer = Vec::with_capacity(8192); + let mut read_buf = [0u8; 8192]; + + loop { + match stdout.read(&mut read_buf) { + Ok(0) => break, // EOF + Ok(n) => { + buffer.extend_from_slice(&read_buf[..n]); + + // Try to parse complete messages from buffer + let mut cursor = std::io::Cursor::new(buffer.as_slice()); + let mut processed_up_to: u64 = 0; + + while cursor.position() < buffer.len() as u64 { + let start_pos = cursor.position(); + match parse_single_message(&mut cursor) { + Ok(Some(tsgolint_diagnostic)) => { + processed_up_to = cursor.position(); + + // For now, ignore any `tsgolint` errors. + if tsgolint_diagnostic.r#type == MessageType::Error { + continue; + } + + let path = tsgolint_diagnostic.file_path.clone(); + let Some(resolved_config) = resolved_configs.get(&path) + else { + // If we don't have a resolved config for this path, skip it. We should always + // have a resolved config though, since we processed them already above. + continue; + }; + + let severity = resolved_config.rules.iter().find_map( + |(rule, status)| { + if rule.name() == tsgolint_diagnostic.rule { + Some(*status) + } else { + None + } + }, + ); + + let oxc_diagnostic: OxcDiagnostic = + tsgolint_diagnostic.into(); + let Some(severity) = severity else { + // If the severity is not found, we should not report the diagnostic + continue; + }; + let oxc_diagnostic = oxc_diagnostic.with_severity( + if severity == AllowWarnDeny::Deny { + Severity::Error + } else { + Severity::Warning + }, + ); + + let diagnostics = DiagnosticService::wrap_diagnostics( + cwd_clone.clone(), + path.clone(), + &read_to_string(&path) + .unwrap_or_else(|_| String::new()), + vec![oxc_diagnostic], + ); + + if error_sender.send((path, diagnostics)).is_err() { + // Receiver has been dropped, stop processing + return Ok(()); + } + } + Ok(None) => { + // Successfully parsed but no diagnostic to add + processed_up_to = cursor.position(); + } + Err(_) => { + // Could not parse a complete message, break and keep remaining data + cursor.set_position(start_pos); + break; + } + } + } - match parse_tsgolint_output(&output.stdout) { - Ok(parsed) => { - let mut oxc_diagnostics: FxHashMap> = - FxHashMap::default(); - for tsgolint_diagnostic in parsed { - // For now, ignore any `tsgolint` errors. - if tsgolint_diagnostic.r#type == MessageType::Error { - continue; - } - - let path = tsgolint_diagnostic.file_path.clone(); - let Some(resolved_config) = resolved_configs.get(&path) else { - // If we don't have a resolved config for this path, skip it. We should always - // have a resolved config though, since we processed them already above. - continue; - }; - - let severity = resolved_config.rules.iter().find_map(|(rule, status)| { - if rule.name() == tsgolint_diagnostic.rule { - Some(*status) - } else { - None + // Keep unprocessed data for next iteration + if processed_up_to > 0 { + #[expect(clippy::cast_possible_truncation)] + buffer.drain(..processed_up_to as usize); } - }); - - let oxc_diagnostic: OxcDiagnostic = tsgolint_diagnostic.into(); - let Some(severity) = severity else { - // If the severity is not found, we should not report the diagnostic - continue; - }; - let oxc_diagnostic = - oxc_diagnostic.with_severity(if severity == AllowWarnDeny::Deny { - Severity::Error - } else { - Severity::Warning - }); - - oxc_diagnostics.entry(path.clone()).or_default().push(oxc_diagnostic); + } + Err(e) => { + return Err(format!("Failed to read from tsgolint stdout: {e}")); + } } + } - for (file_path, diagnostics) in oxc_diagnostics { - let diagnostics = DiagnosticService::wrap_diagnostics( - self.cwd.clone(), - file_path.clone(), - &read_to_string(&file_path).unwrap_or_else(|_| String::new()), - diagnostics, - ); - error_sender - .send((file_path.clone(), diagnostics)) - .expect("Failed to send diagnostic"); - } + Ok(()) + }); - Ok(()) - } + // Wait for process to complete and stdout processing to finish + let exit_status = child.wait().expect("Failed to wait for tsgolint process"); + let stdout_result = stdout_handler.join(); + + if !exit_status.success() { + return Err(format!("tsgolint process exited with status: {exit_status}")); + } - Err(err) => Err(format!("Failed to parse tsgolint output: {err}")), + match stdout_result { + Ok(Ok(())) => Ok(()), + Ok(Err(err)) => Err(err), + Err(_) => Err("Failed to join stdout processing thread".to_string()), } }); @@ -304,27 +362,9 @@ impl MessageType { } } -/// Parses the binary output from `tsgolint` and returns the diagnostic data. +/// Parses a single message from the binary tsgolint output. // Messages are encoded as follows: // | Payload Size (uint32 LE) - 4 bytes | Message Type (uint8) - 1 byte | Payload | -pub fn parse_tsgolint_output(output: &[u8]) -> Result, String> { - let mut diagnostics: Vec = Vec::new(); - - // Parse the output binary data - let mut cursor = std::io::Cursor::new(output); - - while cursor.position() < output.len() as u64 { - match parse_single_message(&mut cursor) { - Ok(Some(diagnostic)) => diagnostics.push(diagnostic), - // Do nothing if we successfully parsed a message but it was not a diagnostic we want to add - Ok(None) => {} - Err(e) => return Err(format!("Failed to parse tsgolint output: {e}")), - } - } - - Ok(diagnostics) -} - fn parse_single_message( cursor: &mut std::io::Cursor<&[u8]>, ) -> Result, String> {