Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1395,7 +1395,7 @@ impl Build {
}

// Limit our parallelism globally with a jobserver.
let tokens = parallel::job_token::JobTokenServer::new();
let tokens = parallel::job_token::ActiveJobTokenServer::new()?;

// When compiling objects in parallel we do a few dirty tricks to speed
// things up:
Expand Down
72 changes: 67 additions & 5 deletions src/parallel/job_token/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl Drop for JobToken {
}
}

pub(crate) enum JobTokenServer {
enum JobTokenServer {
Inherited(inherited_jobserver::JobServer),
InProcess(inprocess_jobserver::JobServer),
}
Expand All @@ -35,7 +35,7 @@ impl JobTokenServer {
/// present), we will create a global in-process only jobserver
/// that has to be static so that it will be shared by all cc
/// compilation.
pub(crate) fn new() -> &'static Self {
fn new() -> &'static Self {
static INIT: Once = Once::new();
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();

Expand All @@ -50,11 +50,35 @@ impl JobTokenServer {
&*JOBSERVER.as_ptr()
}
}
}

pub(crate) struct ActiveJobTokenServer(&'static JobTokenServer);

impl ActiveJobTokenServer {
pub(crate) fn new() -> Result<Self, Error> {
let jobserver = JobTokenServer::new();

#[cfg(unix)]
if let JobTokenServer::Inherited(inherited_jobserver) = &jobserver {
inherited_jobserver.enter_active()?;
}

Ok(Self(jobserver))
}

pub(crate) fn try_acquire(&self) -> Result<Option<JobToken>, Error> {
match self {
Self::Inherited(jobserver) => jobserver.try_acquire(),
Self::InProcess(jobserver) => Ok(jobserver.try_acquire()),
match &self.0 {
JobTokenServer::Inherited(jobserver) => jobserver.try_acquire(),
JobTokenServer::InProcess(jobserver) => Ok(jobserver.try_acquire()),
}
}
}

impl Drop for ActiveJobTokenServer {
fn drop(&mut self) {
#[cfg(unix)]
if let JobTokenServer::Inherited(inherited_jobserver) = &self.0 {
inherited_jobserver.exit_active();
}
}
}
Expand All @@ -70,6 +94,9 @@ mod inherited_jobserver {
},
};

#[cfg(unix)]
use std::sync::{Mutex, MutexGuard, PoisonError};

pub(crate) struct JobServer {
/// Implicit token for this process which is obtained and will be
/// released in parent. Since JobTokens only give back what they got,
Expand All @@ -80,6 +107,10 @@ mod inherited_jobserver {
/// the end of the process.
global_implicit_token: AtomicBool,
inner: sys::JobServerClient,
/// number of active clients is required to know when it is safe to clear non-blocking
/// flags
#[cfg(unix)]
active_clients_cnt: Mutex<usize>,
}

impl JobServer {
Expand Down Expand Up @@ -117,9 +148,40 @@ mod inherited_jobserver {
.map(|inner| Self {
inner,
global_implicit_token: AtomicBool::new(true),
#[cfg(unix)]
active_clients_cnt: Mutex::new(0),
})
}

#[cfg(unix)]
fn get_locked_active_cnt(&self) -> MutexGuard<'_, usize> {
self.active_clients_cnt
.lock()
.unwrap_or_else(PoisonError::into_inner)
}

#[cfg(unix)]
pub(super) fn enter_active(&self) -> Result<(), Error> {
let mut active_cnt = self.get_locked_active_cnt();
if *active_cnt == 0 {
self.inner.prepare_for_acquires()?;
}

*active_cnt += 1;

Ok(())
}

#[cfg(unix)]
pub(super) fn exit_active(&self) {
let mut active_cnt = self.get_locked_active_cnt();
*active_cnt -= 1;

if *active_cnt == 0 {
self.inner.done_acquires();
}
}

pub(super) fn try_acquire(&self) -> Result<Option<JobToken>, Error> {
if !self.global_implicit_token.swap(false, AcqRel) {
// Cold path, no global implicit token, obtain one
Expand Down
31 changes: 26 additions & 5 deletions src/parallel/job_token/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
path::Path,
};

use crate::parallel::stderr::set_non_blocking;
use crate::parallel::stderr::{set_blocking, set_non_blocking};

pub(super) struct JobServerClient {
read: File,
Expand Down Expand Up @@ -74,13 +74,17 @@ impl JobServerClient {
Some(libc::O_RDONLY) | Some(libc::O_RDWR),
Some(libc::O_WRONLY) | Some(libc::O_RDWR),
) => {
// Optimization: Try converting it to a fifo by using /dev/fd
#[cfg(target_os = "linux")]
if let Some(jobserver) =
Self::from_fifo(Path::new(&format!("/dev/fd/{}", read.as_raw_fd())))
{
return Some(jobserver);
}

let read = read.try_clone().ok()?;
let write = write.try_clone().ok()?;

// Set read and write end to nonblocking
set_non_blocking(&read).ok()?;
set_non_blocking(&write).ok()?;

Some(Self {
read,
write: Some(write),
Expand All @@ -90,6 +94,23 @@ impl JobServerClient {
}
}

pub(super) fn prepare_for_acquires(&self) -> Result<(), crate::Error> {
if let Some(write) = self.write.as_ref() {
set_non_blocking(&self.read)?;
set_non_blocking(write)?;
}

Ok(())
}

pub(super) fn done_acquires(&self) {
if let Some(write) = self.write.as_ref() {
let _ = set_blocking(&self.read);
let _ = set_blocking(write);
}
}

/// Must call `prepare_for_acquire` before using it.
pub(super) fn try_acquire(&self) -> io::Result<Option<()>> {
let mut fds = [libc::pollfd {
fd: self.read.as_raw_fd(),
Expand Down
40 changes: 31 additions & 9 deletions src/parallel/stderr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,34 +7,56 @@ use crate::{Error, ErrorKind};
compile_error!("Only unix and windows support non-blocking pipes! For other OSes, disable the parallel feature.");

#[cfg(unix)]
pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> {
// On Unix, switch the pipe to non-blocking mode.
// On Windows, we have a different way to be non-blocking.
let fd = pipe.as_raw_fd();
fn get_flags(fd: std::os::unix::io::RawFd) -> Result<i32, Error> {
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL, 0) };
if flags == -1 {
return Err(Error::new(
Err(Error::new(
ErrorKind::IOError,
format!(
"Failed to get flags for pipe {}: {}",
fd,
std::io::Error::last_os_error()
),
));
))
} else {
Ok(flags)
}
}

#[cfg(unix)]
fn set_flags(fd: std::os::unix::io::RawFd, flags: std::os::raw::c_int) -> Result<(), Error> {
if unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) } == -1 {
return Err(Error::new(
Err(Error::new(
ErrorKind::IOError,
format!(
"Failed to set flags for pipe {}: {}",
fd,
std::io::Error::last_os_error()
),
));
))
} else {
Ok(())
}
}

#[cfg(unix)]
pub fn set_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> {
// On Unix, switch the pipe to non-blocking mode.
// On Windows, we have a different way to be non-blocking.
let fd = pipe.as_raw_fd();

let flags = get_flags(fd)?;
set_flags(fd, flags & (!libc::O_NONBLOCK))
}

#[cfg(unix)]
pub fn set_non_blocking(pipe: &impl std::os::unix::io::AsRawFd) -> Result<(), Error> {
// On Unix, switch the pipe to non-blocking mode.
// On Windows, we have a different way to be non-blocking.
let fd = pipe.as_raw_fd();

Ok(())
let flags = get_flags(fd)?;
set_flags(fd, flags | libc::O_NONBLOCK)
}

pub fn bytes_available(stderr: &mut ChildStderr) -> Result<usize, Error> {
Expand Down