|
| 1 | +use jobserver::{Acquired, Client, HelperThread}; |
| 2 | +use std::{ |
| 3 | + env, |
| 4 | + mem::MaybeUninit, |
| 5 | + sync::{ |
| 6 | + mpsc::{self, Receiver, Sender}, |
| 7 | + Once, |
| 8 | + }, |
| 9 | +}; |
| 10 | + |
| 11 | +pub(crate) struct JobToken { |
| 12 | + /// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process. |
| 13 | + /// Both are valid values to put into queue. |
| 14 | + token: Option<Acquired>, |
| 15 | + /// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead |
| 16 | + /// of storing it back in the pool - see [`Self::forget()`] function for that. |
| 17 | + pool: Option<Sender<Option<Result<Acquired, crate::Error>>>>, |
| 18 | +} |
| 19 | + |
| 20 | +impl Drop for JobToken { |
| 21 | + fn drop(&mut self) { |
| 22 | + if let Some(pool) = &self.pool { |
| 23 | + // Always send back an Ok() variant as we know that the acquisition for this token has succeeded. |
| 24 | + let _ = pool.send(self.token.take().map(|token| Ok(token))); |
| 25 | + } |
| 26 | + } |
| 27 | +} |
| 28 | + |
| 29 | +impl JobToken { |
| 30 | + /// Ensure that this token is not put back into queue once it's dropped. |
| 31 | + /// This also leads to releasing it sooner for other processes to use, |
| 32 | + /// which is a correct thing to do once it is known that there won't be |
| 33 | + /// any more token acquisitions. |
| 34 | + pub(crate) fn forget(&mut self) { |
| 35 | + self.pool.take(); |
| 36 | + } |
| 37 | +} |
| 38 | + |
| 39 | +/// A thin wrapper around jobserver's Client. |
| 40 | +/// It would be perfectly fine to just use jobserver's Client, but we also want to reuse |
| 41 | +/// our own implicit token assigned for this build script. This struct manages that and |
| 42 | +/// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver. |
| 43 | +/// Furthermore, instead of giving up job tokens, it keeps them around |
| 44 | +/// for reuse if we know we're going to request another token after freeing the current one. |
| 45 | +pub(crate) struct JobTokenServer { |
| 46 | + helper: HelperThread, |
| 47 | + tx: Sender<Option<Result<Acquired, crate::Error>>>, |
| 48 | + rx: Receiver<Option<Result<Acquired, crate::Error>>>, |
| 49 | +} |
| 50 | + |
| 51 | +impl JobTokenServer { |
| 52 | + pub(crate) fn new() -> &'static Self { |
| 53 | + jobserver() |
| 54 | + } |
| 55 | + fn new_inner(client: Client) -> Result<Self, crate::Error> { |
| 56 | + let (tx, rx) = mpsc::channel(); |
| 57 | + // Push the implicit token. Since JobTokens only give back what they got, |
| 58 | + // there should be at most one global implicit token in the wild. |
| 59 | + tx.send(None).unwrap(); |
| 60 | + let pool = tx.clone(); |
| 61 | + let helper = client.into_helper_thread(move |acq| { |
| 62 | + let _ = pool.send(Some(acq.map_err(|e| e.into()))); |
| 63 | + })?; |
| 64 | + Ok(Self { helper, tx, rx }) |
| 65 | + } |
| 66 | + |
| 67 | + pub(crate) fn acquire(&self) -> Result<JobToken, crate::Error> { |
| 68 | + let token = if let Ok(token) = self.rx.try_recv() { |
| 69 | + // Opportunistically check if there's a token that can be reused. |
| 70 | + token |
| 71 | + } else { |
| 72 | + // Cold path, request a token and block |
| 73 | + self.helper.request_token(); |
| 74 | + self.rx.recv().unwrap() |
| 75 | + }; |
| 76 | + let token = if let Some(token) = token { |
| 77 | + Some(token?) |
| 78 | + } else { |
| 79 | + None |
| 80 | + }; |
| 81 | + Ok(JobToken { |
| 82 | + token, |
| 83 | + pool: Some(self.tx.clone()), |
| 84 | + }) |
| 85 | + } |
| 86 | +} |
| 87 | + |
| 88 | +/// Returns a suitable `JobTokenServer` used to coordinate |
| 89 | +/// parallelism between build scripts. A global `JobTokenServer` is used as this ensures |
| 90 | +/// that only one implicit job token is used in the wild. |
| 91 | +/// Having multiple separate job token servers would lead to each of them assuming that they have control |
| 92 | +/// over the implicit job token. |
| 93 | +/// As it stands, each caller of `jobserver` can receive an implicit job token and there will be at most |
| 94 | +/// one implicit job token in the wild. |
| 95 | +fn jobserver() -> &'static JobTokenServer { |
| 96 | + static INIT: Once = Once::new(); |
| 97 | + static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit(); |
| 98 | + |
| 99 | + fn _assert_sync<T: Sync>() {} |
| 100 | + _assert_sync::<jobserver::Client>(); |
| 101 | + |
| 102 | + unsafe { |
| 103 | + INIT.call_once(|| { |
| 104 | + let server = default_jobserver(); |
| 105 | + JOBSERVER = MaybeUninit::new( |
| 106 | + JobTokenServer::new_inner(server).expect("Job server initialization failed"), |
| 107 | + ); |
| 108 | + }); |
| 109 | + // Poor man's assume_init_ref, as that'd require a MSRV of 1.55. |
| 110 | + &*JOBSERVER.as_ptr() |
| 111 | + } |
| 112 | +} |
| 113 | + |
| 114 | +unsafe fn default_jobserver() -> jobserver::Client { |
| 115 | + // Try to use the environmental jobserver which Cargo typically |
| 116 | + // initializes for us... |
| 117 | + if let Some(client) = jobserver::Client::from_env() { |
| 118 | + return client; |
| 119 | + } |
| 120 | + |
| 121 | + // ... but if that fails for whatever reason select something |
| 122 | + // reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's |
| 123 | + // configured by Cargo) and otherwise just fall back to a |
| 124 | + // semi-reasonable number. Note that we could use `num_cpus` here |
| 125 | + // but it's an extra dependency that will almost never be used, so |
| 126 | + // it's generally not too worth it. |
| 127 | + let mut parallelism = 4; |
| 128 | + if let Ok(amt) = env::var("NUM_JOBS") { |
| 129 | + if let Ok(amt) = amt.parse() { |
| 130 | + parallelism = amt; |
| 131 | + } |
| 132 | + } |
| 133 | + |
| 134 | + // If we create our own jobserver then be sure to reserve one token |
| 135 | + // for ourselves. |
| 136 | + let client = jobserver::Client::new(parallelism).expect("failed to create jobserver"); |
| 137 | + client.acquire_raw().expect("failed to acquire initial"); |
| 138 | + return client; |
| 139 | +} |
0 commit comments