diff --git a/src/lib.rs b/src/lib.rs index edb941b25..75c4a1aeb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1395,7 +1395,7 @@ impl Build { } // Limit our parallelism globally with a jobserver. - let tokens = parallel::job_token::JobTokenServer::new(); + let tokens = parallel::job_token::ActiveJobTokenServer::new()?; // When compiling objects in parallel we do a few dirty tricks to speed // things up: diff --git a/src/parallel/job_token/mod.rs b/src/parallel/job_token/mod.rs index fda097da9..a04d7625b 100644 --- a/src/parallel/job_token/mod.rs +++ b/src/parallel/job_token/mod.rs @@ -21,7 +21,7 @@ impl Drop for JobToken { } } -pub(crate) enum JobTokenServer { +enum JobTokenServer { Inherited(inherited_jobserver::JobServer), InProcess(inprocess_jobserver::JobServer), } @@ -35,7 +35,7 @@ impl JobTokenServer { /// present), we will create a global in-process only jobserver /// that has to be static so that it will be shared by all cc /// compilation. - pub(crate) fn new() -> &'static Self { + fn new() -> &'static Self { static INIT: Once = Once::new(); static mut JOBSERVER: MaybeUninit = MaybeUninit::uninit(); @@ -50,11 +50,35 @@ impl JobTokenServer { &*JOBSERVER.as_ptr() } } +} + +pub(crate) struct ActiveJobTokenServer(&'static JobTokenServer); + +impl ActiveJobTokenServer { + pub(crate) fn new() -> Result { + let jobserver = JobTokenServer::new(); + + #[cfg(unix)] + if let JobTokenServer::Inherited(inherited_jobserver) = &jobserver { + inherited_jobserver.enter_active()?; + } + + Ok(Self(jobserver)) + } pub(crate) fn try_acquire(&self) -> Result, Error> { - match self { - Self::Inherited(jobserver) => jobserver.try_acquire(), - Self::InProcess(jobserver) => Ok(jobserver.try_acquire()), + match &self.0 { + JobTokenServer::Inherited(jobserver) => jobserver.try_acquire(), + JobTokenServer::InProcess(jobserver) => Ok(jobserver.try_acquire()), + } + } +} + +impl Drop for ActiveJobTokenServer { + fn drop(&mut self) { + #[cfg(unix)] + if let JobTokenServer::Inherited(inherited_jobserver) = &self.0 { + inherited_jobserver.exit_active(); } } } @@ -70,6 +94,9 @@ mod inherited_jobserver { }, }; + #[cfg(unix)] + use std::sync::{Mutex, MutexGuard, PoisonError}; + pub(crate) struct JobServer { /// Implicit token for this process which is obtained and will be /// released in parent. Since JobTokens only give back what they got, @@ -80,6 +107,10 @@ mod inherited_jobserver { /// the end of the process. global_implicit_token: AtomicBool, inner: sys::JobServerClient, + /// number of active clients is required to know when it is safe to clear non-blocking + /// flags + #[cfg(unix)] + active_clients_cnt: Mutex, } impl JobServer { @@ -117,9 +148,40 @@ mod inherited_jobserver { .map(|inner| Self { inner, global_implicit_token: AtomicBool::new(true), + #[cfg(unix)] + active_clients_cnt: Mutex::new(0), }) } + #[cfg(unix)] + fn get_locked_active_cnt(&self) -> MutexGuard<'_, usize> { + self.active_clients_cnt + .lock() + .unwrap_or_else(PoisonError::into_inner) + } + + #[cfg(unix)] + pub(super) fn enter_active(&self) -> Result<(), Error> { + let mut active_cnt = self.get_locked_active_cnt(); + if *active_cnt == 0 { + self.inner.prepare_for_acquires()?; + } + + *active_cnt += 1; + + Ok(()) + } + + #[cfg(unix)] + pub(super) fn exit_active(&self) { + let mut active_cnt = self.get_locked_active_cnt(); + *active_cnt -= 1; + + if *active_cnt == 0 { + self.inner.done_acquires(); + } + } + pub(super) fn try_acquire(&self) -> Result, Error> { if !self.global_implicit_token.swap(false, AcqRel) { // Cold path, no global implicit token, obtain one diff --git a/src/parallel/job_token/unix.rs b/src/parallel/job_token/unix.rs index 95dcdc846..7f8e1b881 100644 --- a/src/parallel/job_token/unix.rs +++ b/src/parallel/job_token/unix.rs @@ -7,7 +7,7 @@ use std::{ path::Path, }; -use crate::parallel::stderr::set_non_blocking; +use crate::parallel::stderr::{set_blocking, set_non_blocking}; pub(super) struct JobServerClient { read: File, @@ -74,13 +74,16 @@ impl JobServerClient { Some(libc::O_RDONLY) | Some(libc::O_RDWR), Some(libc::O_WRONLY) | Some(libc::O_RDWR), ) => { + // Optimization: Try converting it to a fifo by using /dev/fd + if let Some(jobserver) = + Self::from_fifo(Path::new(&format!("/dev/fd/{}", read.as_raw_fd()))) + { + return Some(jobserver); + } + let read = read.try_clone().ok()?; let write = write.try_clone().ok()?; - // Set read and write end to nonblocking - set_non_blocking(&read).ok()?; - set_non_blocking(&write).ok()?; - Some(Self { read, write: Some(write), @@ -90,6 +93,23 @@ impl JobServerClient { } } + pub(super) fn prepare_for_acquires(&self) -> Result<(), crate::Error> { + if let Some(write) = self.write.as_ref() { + set_non_blocking(&self.read)?; + set_non_blocking(write)?; + } + + Ok(()) + } + + pub(super) fn done_acquires(&self) { + if let Some(write) = self.write.as_ref() { + let _ = set_blocking(&self.read); + let _ = set_blocking(write); + } + } + + /// Must call `prepare_for_acquire` before using it. pub(super) fn try_acquire(&self) -> io::Result> { let mut fds = [libc::pollfd { fd: self.read.as_raw_fd(), diff --git a/src/parallel/stderr.rs b/src/parallel/stderr.rs index 8bdfd403e..9a4497f7d 100644 --- a/src/parallel/stderr.rs +++ b/src/parallel/stderr.rs @@ -7,34 +7,56 @@ use crate::{Error, ErrorKind}; compile_error!("Only unix and windows support non-blocking pipes! For other OSes, disable the parallel feature."); #[cfg(unix)] -pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> { - // On Unix, switch the pipe to non-blocking mode. - // On Windows, we have a different way to be non-blocking. - let fd = pipe.as_raw_fd(); +fn get_flags(fd: std::os::unix::io::RawFd) -> Result { let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) }; if flags == -1 { - return Err(Error::new( + Err(Error::new( ErrorKind::IOError, format!( "Failed to get flags for pipe {}: {}", fd, std::io::Error::last_os_error() ), - )); + )) + } else { + Ok(flags) } +} +#[cfg(unix)] +fn set_flags(fd: std::os::unix::io::RawFd, flags: std::os::raw::c_int) -> Result<(), Error> { if unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) } == -1 { - return Err(Error::new( + Err(Error::new( ErrorKind::IOError, format!( "Failed to set flags for pipe {}: {}", fd, std::io::Error::last_os_error() ), - )); + )) + } else { + Ok(()) } +} + +#[cfg(unix)] +pub fn set_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> { + // On Unix, switch the pipe to non-blocking mode. + // On Windows, we have a different way to be non-blocking. + let fd = pipe.as_raw_fd(); + + let flags = get_flags(fd)?; + set_flags(fd, flags & (!libc::O_NONBLOCK)) +} + +#[cfg(unix)] +pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> { + // On Unix, switch the pipe to non-blocking mode. + // On Windows, we have a different way to be non-blocking. + let fd = pipe.as_raw_fd(); - Ok(()) + let flags = get_flags(fd)?; + set_flags(fd, flags | libc::O_NONBLOCK) } pub fn bytes_available(stderr: &mut ChildStderr) -> Result {