Skip to content

Commit 59531b3

Browse files
committed
First version of the Delay crate
1 parent 7c183f6 commit 59531b3

File tree

6 files changed

+343
-2
lines changed

6 files changed

+343
-2
lines changed

.gitignore

+5
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,8 @@ Cargo.lock
88

99
# These are backup files generated by rustfmt
1010
**/*.rs.bk
11+
12+
# IDEs
13+
/.idea/
14+
/.vscode/
15+

Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
[workspace]
2+
members = [
3+
"delay"
4+
]

README.md

+7-2
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,7 @@
1-
# rs-holdup
2-
A collection of trait and classes to make your thread wait (and timeout)
1+
# rs-delay
2+
A collection of trait and classes to make your thread wait (and timeout).
3+
4+
# Usage
5+
6+
7+

delay/Cargo.toml

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
[package]
2+
name = "delay"
3+
version = "0.1.0"
4+
authors = ["Hans Larsen <[email protected]>"]
5+
edition = "2018"
6+
7+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
8+
9+
[dependencies]

delay/src/lib.rs

+275
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
use std::sync::{Arc, Mutex};
2+
use std::time::{Duration, Instant};
3+
4+
#[cfg(test)]
5+
mod tests;
6+
7+
/// An error happened while waiting.
8+
#[derive(Copy, Clone, Debug, Ord, PartialOrd, Eq, PartialEq)]
9+
pub enum WaiterError {
10+
Timeout,
11+
}
12+
13+
/// A waiter trait, that can be used for executing a delay. Waiters need to be
14+
/// multi-threaded and cloneable.
15+
pub trait Waiter {
16+
fn start(&mut self) {}
17+
fn wait(&self) -> Result<(), WaiterError>;
18+
fn stop(&self) {}
19+
}
20+
21+
/// A Delay struct that encapsulates a Waiter.
22+
///
23+
/// To use this class, first create an instance of it by either calling a method
24+
/// on it (like [Delay::timeout]) or create a builder and add multiple Waiters.
25+
/// Then when you're ready to start a process that needs to wait, use the [start()]
26+
/// function. Every wait period, call the [wait()] function on it (it may block the
27+
/// thread). When you're done, you may call [stop()].
28+
///
29+
/// Waiters can be reused and re-started, but most would expect to have [stop()]
30+
/// called on them when you do, to free any additional resources.
31+
#[derive(Clone)]
32+
pub struct Delay {
33+
inner: Arc<dyn Waiter>,
34+
}
35+
36+
impl Delay {
37+
fn from(inner: Arc<dyn Waiter>) -> Self {
38+
Delay { inner }
39+
}
40+
41+
/// A Delay that never waits. This can hog resources, so careful.
42+
pub fn instant() -> Self {
43+
Self::from(Arc::new(InstantWaiter {}))
44+
}
45+
46+
/// A Delay that doesn't wait, but times out after a while.
47+
pub fn timeout(timeout: Duration) -> Self {
48+
Self::from(Arc::new(TimeoutWaiter::new(timeout)))
49+
}
50+
51+
/// A Delay that times out after waiting a certain number of times.
52+
pub fn count_timeout(count: u64) -> Self {
53+
Self::from(Arc::new(CountTimeoutWaiter::new(count)))
54+
}
55+
56+
/// A delay that waits every wait() call for a certain time.
57+
pub fn throttle(throttle: Duration) -> Self {
58+
Self::from(Arc::new(ThrottleWaiter::new(throttle)))
59+
}
60+
61+
/// A delay that recalculate a wait time every wait() calls and exponentially waits.
62+
/// The calculation is new_wait_time = max(current_wait_time * multiplier, cap).
63+
pub fn exponential_backoff_capped(initial: Duration, multiplier: f32, cap: Duration) -> Self {
64+
Self::from(Arc::new(ExponentialBackoffWaiter::new(
65+
initial, multiplier, cap,
66+
)))
67+
}
68+
69+
/// A delay that recalculate a wait time every wait() calls and exponentially waits.
70+
/// The calculation is new_wait_time = current_wait_time * multiplier.
71+
/// There is no limit for this backoff.
72+
pub fn exponential_backoff(initial: Duration, multiplier: f32) -> Self {
73+
Self::exponential_backoff_capped(initial, multiplier, Duration::from_secs(std::u64::MAX))
74+
}
75+
76+
pub fn builder() -> DelayBuilder {
77+
DelayBuilder { inner: None }
78+
}
79+
}
80+
81+
impl Waiter for Delay {
82+
fn start(&mut self) {
83+
Arc::get_mut(&mut self.inner).unwrap().start()
84+
}
85+
fn wait(&self) -> Result<(), WaiterError> {
86+
self.inner.wait()
87+
}
88+
fn stop(&self) {
89+
self.inner.stop()
90+
}
91+
}
92+
93+
pub struct DelayBuilder {
94+
inner: Option<Delay>,
95+
}
96+
impl DelayBuilder {
97+
/// Add a delay to the current builder. If a builder implements multiple delays, they
98+
/// will run sequentially, so if you have 2 Throttle delays, they will wait one after the
99+
/// other. This composer can be used though with a Throttle and a Timeout to throttle a
100+
/// thread, and error out if it throttles too long.
101+
pub fn with(mut self, other: Delay) -> Self {
102+
self.inner = Some(match self.inner.take() {
103+
None => other,
104+
Some(w) => Delay::from(Arc::new(DelayComposer::new(w, other))),
105+
});
106+
self
107+
}
108+
pub fn timeout(self, timeout: Duration) -> Self {
109+
self.with(Delay::timeout(timeout))
110+
}
111+
pub fn throttle(self, throttle: Duration) -> Self {
112+
self.with(Delay::throttle(throttle))
113+
}
114+
pub fn exponential_backoff(self, initial: Duration, multiplier: f32) -> Self {
115+
self.with(Delay::exponential_backoff(initial, multiplier))
116+
}
117+
pub fn exponential_backoff_capped(
118+
self,
119+
initial: Duration,
120+
multiplier: f32,
121+
cap: Duration,
122+
) -> Self {
123+
self.with(Delay::exponential_backoff_capped(initial, multiplier, cap))
124+
}
125+
pub fn build(mut self) -> Delay {
126+
self.inner.take().unwrap_or_else(Delay::instant)
127+
}
128+
}
129+
130+
#[derive(Clone)]
131+
struct DelayComposer {
132+
a: Delay,
133+
b: Delay,
134+
}
135+
impl DelayComposer {
136+
fn new(a: Delay, b: Delay) -> Self {
137+
Self { a, b }
138+
}
139+
}
140+
impl Waiter for DelayComposer {
141+
fn start(&mut self) {
142+
self.a.start();
143+
self.b.start();
144+
}
145+
fn wait(&self) -> Result<(), WaiterError> {
146+
self.a.wait()?;
147+
self.b.wait()?;
148+
Ok(())
149+
}
150+
fn stop(&self) {
151+
self.a.stop();
152+
self.b.stop();
153+
}
154+
}
155+
156+
#[derive(Clone)]
157+
struct InstantWaiter {}
158+
impl Waiter for InstantWaiter {
159+
fn wait(&self) -> Result<(), WaiterError> {
160+
Ok(())
161+
}
162+
}
163+
164+
#[derive(Clone)]
165+
struct TimeoutWaiter {
166+
timeout: Duration,
167+
start: Instant,
168+
}
169+
impl TimeoutWaiter {
170+
pub fn new(timeout: Duration) -> Self {
171+
Self {
172+
timeout,
173+
start: Instant::now(),
174+
}
175+
}
176+
}
177+
impl Waiter for TimeoutWaiter {
178+
fn start(&mut self) {
179+
self.start = Instant::now();
180+
}
181+
fn wait(&self) -> Result<(), WaiterError> {
182+
if self.start.elapsed() > self.timeout {
183+
Err(WaiterError::Timeout)
184+
} else {
185+
Ok(())
186+
}
187+
}
188+
}
189+
190+
#[derive(Clone)]
191+
struct CountTimeoutWaiter {
192+
max_count: u64,
193+
count: Arc<Mutex<u64>>,
194+
}
195+
impl CountTimeoutWaiter {
196+
pub fn new(max_count: u64) -> Self {
197+
CountTimeoutWaiter {
198+
max_count,
199+
count: Arc::new(Mutex::new(0)),
200+
}
201+
}
202+
}
203+
impl Waiter for CountTimeoutWaiter {
204+
fn start(&mut self) {
205+
*self.count.lock().unwrap() = 0;
206+
}
207+
208+
fn wait(&self) -> Result<(), WaiterError> {
209+
let current = *self.count.lock().unwrap() + 1;
210+
*self.count.lock().unwrap() = current;
211+
212+
if current >= self.max_count {
213+
Err(WaiterError::Timeout)
214+
} else {
215+
Ok(())
216+
}
217+
}
218+
}
219+
220+
#[derive(Clone)]
221+
struct ThrottleWaiter {
222+
throttle: Duration,
223+
}
224+
impl ThrottleWaiter {
225+
pub fn new(throttle: Duration) -> Self {
226+
Self { throttle }
227+
}
228+
}
229+
impl Waiter for ThrottleWaiter {
230+
fn wait(&self) -> Result<(), WaiterError> {
231+
std::thread::sleep(self.throttle);
232+
233+
Ok(())
234+
}
235+
}
236+
237+
#[derive(Clone)]
238+
struct ExponentialBackoffWaiter {
239+
next: Arc<Mutex<Duration>>,
240+
initial: Duration,
241+
multiplier: f32,
242+
cap: Duration,
243+
}
244+
impl ExponentialBackoffWaiter {
245+
pub fn new(initial: Duration, multiplier: f32, cap: Duration) -> Self {
246+
ExponentialBackoffWaiter {
247+
next: Arc::new(Mutex::new(initial)),
248+
initial,
249+
multiplier,
250+
cap,
251+
}
252+
}
253+
}
254+
impl Waiter for ExponentialBackoffWaiter {
255+
fn start(&mut self) {
256+
self.next = Arc::new(Mutex::new(self.initial));
257+
}
258+
259+
fn wait(&self) -> Result<(), WaiterError> {
260+
let current = *self.next.lock().unwrap();
261+
let current_nsec = current.as_nanos() as f32;
262+
263+
// Find the next throttle.
264+
let mut next_duration = Duration::from_nanos((current_nsec * self.multiplier) as u64);
265+
if next_duration > self.cap {
266+
next_duration = self.cap;
267+
}
268+
269+
*self.next.lock().unwrap() = next_duration;
270+
271+
std::thread::sleep(current);
272+
273+
Ok(())
274+
}
275+
}

delay/src/tests.rs

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#![cfg(test)]
2+
use crate::{Delay, Waiter};
3+
use std::time::{Duration, Instant};
4+
5+
#[test]
6+
fn throttle_works() {
7+
let start = Instant::now();
8+
9+
let mut waiter = Delay::throttle(Duration::from_millis(50));
10+
waiter.start();
11+
waiter.wait().unwrap();
12+
waiter.stop();
13+
14+
assert!(Instant::now().duration_since(start).as_millis() >= 50);
15+
}
16+
17+
#[test]
18+
fn timeout_works() {
19+
let mut waiter = Delay::timeout(Duration::from_millis(50));
20+
waiter.start();
21+
22+
assert!(waiter.wait().is_ok());
23+
assert!(waiter.wait().is_ok());
24+
std::thread::sleep(Duration::from_millis(10));
25+
assert!(waiter.wait().is_ok());
26+
std::thread::sleep(Duration::from_millis(50));
27+
assert!(waiter.wait().is_err());
28+
29+
waiter.stop();
30+
}
31+
32+
#[test]
33+
fn counter_works() {
34+
let mut waiter = Delay::count_timeout(3);
35+
waiter.start();
36+
37+
assert!(waiter.wait().is_ok());
38+
assert!(waiter.wait().is_ok());
39+
assert!(waiter.wait().is_err());
40+
assert!(waiter.wait().is_err());
41+
42+
waiter.stop();
43+
}

0 commit comments

Comments
 (0)