From 610428453feed47d9e7a2357300a7c12d5a25b75 Mon Sep 17 00:00:00 2001 From: mox692 Date: Thu, 8 Feb 2024 00:23:36 +0900 Subject: [PATCH 1/3] Add decrease_permit method to semaphore --- tokio/src/sync/batch_semaphore.rs | 25 +++++++++++++++++ tokio/src/sync/semaphore.rs | 8 ++++++ tokio/src/sync/tests/loom_semaphore_batch.rs | 28 ++++++++++++++++++++ tokio/src/sync/tests/semaphore_batch.rs | 21 +++++++++++++++ 4 files changed, 82 insertions(+) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index aa23dea7d3c..04dd72718ee 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -368,6 +368,31 @@ impl Semaphore { assert_eq!(rem, 0); } + /// Decrease a semaphore's permits by a maximum of `n`. + /// + /// If there are insufficient permits and it's not possible to reduce by n, + /// return the number of permits that were actually reduced. + pub(crate) fn decrease_permits(&self, n: usize) -> usize { + if n == 0 { + return 0; + } + + let mut curr_bits = self.permits.load(Acquire); + loop { + let curr = curr_bits >> Self::PERMIT_SHIFT; + let new = curr.saturating_sub(n); + match self.permits.compare_exchange_weak( + curr_bits, + new << Self::PERMIT_SHIFT, + AcqRel, + Acquire, + ) { + Ok(_) => return std::cmp::min(curr, n), + Err(actual) => curr_bits = actual, + }; + } + } + fn poll_acquire( &self, cx: &mut Context<'_>, diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index 25e4134373c..bcebac08012 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -481,6 +481,14 @@ impl Semaphore { self.ll_sem.release(n); } + /// Decrease a semaphore's permits by a maximum of `n`. + /// + /// If there are insufficient permits and it's not possible to reduce by n, + /// return the number of permits that were actually reduced. + pub fn decrease_permits(&self, n: usize) -> usize { + self.ll_sem.decrease_permits(n) + } + /// Acquires a permit from the semaphore. /// /// If the semaphore has been closed, this returns an [`AcquireError`]. diff --git a/tokio/src/sync/tests/loom_semaphore_batch.rs b/tokio/src/sync/tests/loom_semaphore_batch.rs index 76a1bc00626..ea838bfdae9 100644 --- a/tokio/src/sync/tests/loom_semaphore_batch.rs +++ b/tokio/src/sync/tests/loom_semaphore_batch.rs @@ -213,3 +213,31 @@ fn release_during_acquire() { assert_eq!(10, semaphore.available_permits()); }) } + +#[test] +fn concurrent_permit_updates() { + loom::model(move || { + let semaphore = Arc::new(Semaphore::new(5)); + let t1 = { + let semaphore = semaphore.clone(); + thread::spawn(move || semaphore.release(3)) + }; + let t2 = { + let semaphore = semaphore.clone(); + thread::spawn(move || { + semaphore + .try_acquire(1) + .expect("try_acquire should succeed") + }) + }; + let t3 = { + let semaphore = semaphore.clone(); + thread::spawn(move || semaphore.decrease_permits(2)) + }; + + t1.join().unwrap(); + t2.join().unwrap(); + t3.join().unwrap(); + assert_eq!(semaphore.available_permits(), 5); + }) +} diff --git a/tokio/src/sync/tests/semaphore_batch.rs b/tokio/src/sync/tests/semaphore_batch.rs index 391797b3f66..813e0d6e646 100644 --- a/tokio/src/sync/tests/semaphore_batch.rs +++ b/tokio/src/sync/tests/semaphore_batch.rs @@ -287,3 +287,24 @@ fn release_permits_at_drop() { assert!(fut.as_mut().poll(&mut cx).is_pending()); } } + +#[test] +fn decrease_permits_basic() { + let s = Semaphore::new(10); + assert_eq!(s.decrease_permits(4), 4); + assert_eq!(s.available_permits(), 6); + assert_eq!(s.decrease_permits(10), 6); + assert_eq!(s.available_permits(), 0); +} + +#[test] +fn update_permits_many_times() { + let s = Semaphore::new(5); + let mut acquire = task::spawn(s.acquire(7)); + assert_pending!(acquire.poll()); + s.release(5); + assert_ready_ok!(acquire.poll()); + assert_eq!(s.available_permits(), 3); + assert_eq!(s.decrease_permits(3), 3); + assert_eq!(s.available_permits(), 0); +} From da1d394464fd0209e3c8fd3a1c2ca8805c4ffb85 Mon Sep 17 00:00:00 2001 From: mox692 Date: Thu, 8 Feb 2024 22:41:14 +0900 Subject: [PATCH 2/3] rename to forget_permits --- tokio/src/sync/batch_semaphore.rs | 2 +- tokio/src/sync/semaphore.rs | 4 ++-- tokio/src/sync/tests/loom_semaphore_batch.rs | 2 +- tokio/src/sync/tests/semaphore_batch.rs | 8 ++++---- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index 04dd72718ee..dd5e4541215 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -372,7 +372,7 @@ impl Semaphore { /// /// If there are insufficient permits and it's not possible to reduce by n, /// return the number of permits that were actually reduced. - pub(crate) fn decrease_permits(&self, n: usize) -> usize { + pub(crate) fn forget_permits(&self, n: usize) -> usize { if n == 0 { return 0; } diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index bcebac08012..ec7d6215384 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -485,8 +485,8 @@ impl Semaphore { /// /// If there are insufficient permits and it's not possible to reduce by n, /// return the number of permits that were actually reduced. - pub fn decrease_permits(&self, n: usize) -> usize { - self.ll_sem.decrease_permits(n) + pub fn forget_permits(&self, n: usize) -> usize { + self.ll_sem.forget_permits(n) } /// Acquires a permit from the semaphore. diff --git a/tokio/src/sync/tests/loom_semaphore_batch.rs b/tokio/src/sync/tests/loom_semaphore_batch.rs index ea838bfdae9..85cd584d2d4 100644 --- a/tokio/src/sync/tests/loom_semaphore_batch.rs +++ b/tokio/src/sync/tests/loom_semaphore_batch.rs @@ -232,7 +232,7 @@ fn concurrent_permit_updates() { }; let t3 = { let semaphore = semaphore.clone(); - thread::spawn(move || semaphore.decrease_permits(2)) + thread::spawn(move || semaphore.forget_permits(2)) }; t1.join().unwrap(); diff --git a/tokio/src/sync/tests/semaphore_batch.rs b/tokio/src/sync/tests/semaphore_batch.rs index 813e0d6e646..09610ce71f2 100644 --- a/tokio/src/sync/tests/semaphore_batch.rs +++ b/tokio/src/sync/tests/semaphore_batch.rs @@ -289,11 +289,11 @@ fn release_permits_at_drop() { } #[test] -fn decrease_permits_basic() { +fn forget_permits_basic() { let s = Semaphore::new(10); - assert_eq!(s.decrease_permits(4), 4); + assert_eq!(s.forget_permits(4), 4); assert_eq!(s.available_permits(), 6); - assert_eq!(s.decrease_permits(10), 6); + assert_eq!(s.forget_permits(10), 6); assert_eq!(s.available_permits(), 0); } @@ -305,6 +305,6 @@ fn update_permits_many_times() { s.release(5); assert_ready_ok!(acquire.poll()); assert_eq!(s.available_permits(), 3); - assert_eq!(s.decrease_permits(3), 3); + assert_eq!(s.forget_permits(3), 3); assert_eq!(s.available_permits(), 0); } From 8f2ec765258c706c3f1f13e6a228f37a24eafc0c Mon Sep 17 00:00:00 2001 From: mox692 Date: Thu, 8 Feb 2024 22:42:22 +0900 Subject: [PATCH 3/3] minor update --- tokio/src/sync/batch_semaphore.rs | 2 +- tokio/src/sync/semaphore.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/batch_semaphore.rs b/tokio/src/sync/batch_semaphore.rs index dd5e4541215..d7eb1d6b77e 100644 --- a/tokio/src/sync/batch_semaphore.rs +++ b/tokio/src/sync/batch_semaphore.rs @@ -370,7 +370,7 @@ impl Semaphore { /// Decrease a semaphore's permits by a maximum of `n`. /// - /// If there are insufficient permits and it's not possible to reduce by n, + /// If there are insufficient permits and it's not possible to reduce by `n`, /// return the number of permits that were actually reduced. pub(crate) fn forget_permits(&self, n: usize) -> usize { if n == 0 { diff --git a/tokio/src/sync/semaphore.rs b/tokio/src/sync/semaphore.rs index ec7d6215384..d0ee12591ee 100644 --- a/tokio/src/sync/semaphore.rs +++ b/tokio/src/sync/semaphore.rs @@ -483,7 +483,7 @@ impl Semaphore { /// Decrease a semaphore's permits by a maximum of `n`. /// - /// If there are insufficient permits and it's not possible to reduce by n, + /// If there are insufficient permits and it's not possible to reduce by `n`, /// return the number of permits that were actually reduced. pub fn forget_permits(&self, n: usize) -> usize { self.ll_sem.forget_permits(n)