Skip to content

Commit ecfdf7d

Browse files
committed
PinkyBroadcaster -> PinkyErrorBroadcaster
Signed-off-by: Marc-Antoine Perennou <[email protected]>
1 parent 62e5f89 commit ecfdf7d

File tree

2 files changed

+45
-28
lines changed

2 files changed

+45
-28
lines changed

src/lib.rs

+29-24
Original file line numberDiff line numberDiff line change
@@ -192,14 +192,27 @@ impl<T> fmt::Debug for Pinky<T> {
192192
}
193193
}
194194

195-
/// A PinkyBroadcaster allows you to broacast a promise resolution to several subscribers.
196-
#[must_use = "PinkyBroadcaster must be subscribed"]
197-
pub struct PinkyBroadcaster<T: Clone> {
195+
/// A PinkyErrorBroadcaster allows you to broacast the success/error of a promise resolution to several subscribers.
196+
pub struct PinkyErrorBroadcaster<T, E: Clone> {
198197
marker: Arc<RwLock<Option<String>>>,
199-
inner: Arc<Mutex<BroadcasterInner<T>>>,
198+
inner: Arc<Mutex<ErrorBroadcasterInner<E>>>,
199+
pinky: Pinky<Result<T, E>>,
200200
}
201201

202-
impl<T: Send + Clone + 'static> PinkyBroadcaster<T> {
202+
impl<T: Send + 'static, E: Send + Clone + 'static> PinkyErrorBroadcaster<T, E> {
203+
/// Create a new promise with associated error broadcaster
204+
pub fn new() -> (PinkySwear<Result<T, E>>, Self) {
205+
let (promise, pinky) = PinkySwear::new();
206+
(
207+
promise,
208+
Self {
209+
marker: Default::default(),
210+
inner: Arc::new(Mutex::new(ErrorBroadcasterInner(Vec::default()))),
211+
pinky,
212+
},
213+
)
214+
}
215+
203216
/// Add a marker to logs
204217
pub fn set_marker(&self, marker: String) {
205218
for subscriber in self.inner.lock().0.iter() {
@@ -209,42 +222,34 @@ impl<T: Send + Clone + 'static> PinkyBroadcaster<T> {
209222
}
210223

211224
/// Subscribe to receive a broacast when the underlying promise get henoured.
212-
pub fn subscribe(&self) -> PinkySwear<T> {
225+
pub fn subscribe(&self) -> PinkySwear<Result<(), E>> {
213226
self.inner.lock().subscribe(self.marker.read().clone())
214227
}
215228

216229
/// Unsubscribe a promise from the broadcast.
217-
pub fn unsubscribe(&self, promise: PinkySwear<T>) {
230+
pub fn unsubscribe(&self, promise: PinkySwear<Result<(), E>>) {
218231
self.inner.lock().unsubscribe(promise);
219232
}
220233

221234
/// Resolve the underlying promise and broadcast the result to subscribers.
222-
pub fn swear(&self, data: T) {
235+
pub fn swear(&self, data: Result<T, E>) {
223236
for subscriber in self.inner.lock().0.iter() {
224-
subscriber.swear(data.clone())
225-
}
226-
}
227-
}
228-
229-
impl<T: Send + Clone + 'static> Default for PinkyBroadcaster<T> {
230-
fn default() -> Self {
231-
Self {
232-
marker: Default::default(),
233-
inner: Arc::new(Mutex::new(BroadcasterInner(Vec::default()))),
237+
subscriber.swear(data.as_ref().map(|_| ()).map_err(Clone::clone))
234238
}
239+
self.pinky.swear(data);
235240
}
236241
}
237242

238-
impl<T: Clone> fmt::Debug for PinkyBroadcaster<T> {
243+
impl<T, E: Clone> fmt::Debug for PinkyErrorBroadcaster<T, E> {
239244
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
240-
write!(f, "PinkyBroadcaster")
245+
write!(f, "PinkyErrorBroadcaster")
241246
}
242247
}
243248

244-
struct BroadcasterInner<T>(Vec<Pinky<T>>);
249+
struct ErrorBroadcasterInner<E>(Vec<Pinky<Result<(), E>>>);
245250

246-
impl<T: Send + 'static> BroadcasterInner<T> {
247-
fn subscribe(&mut self, marker: Option<String>) -> PinkySwear<T> {
251+
impl<E: Send + 'static> ErrorBroadcasterInner<E> {
252+
fn subscribe(&mut self, marker: Option<String>) -> PinkySwear<Result<(), E>> {
248253
let (promise, pinky) = PinkySwear::new();
249254
self.0.push(pinky);
250255
if let Some(marker) = marker {
@@ -253,7 +258,7 @@ impl<T: Send + 'static> BroadcasterInner<T> {
253258
promise
254259
}
255260

256-
fn unsubscribe(&mut self, promise: PinkySwear<T>) {
261+
fn unsubscribe(&mut self, promise: PinkySwear<Result<(), E>>) {
257262
self.0.retain(|pinky| pinky != &promise.pinky)
258263
}
259264
}

tests/broadcast.rs

+16-4
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,23 @@
1-
use pinky_swear::PinkyBroadcaster;
1+
use pinky_swear::PinkyErrorBroadcaster;
22

33
#[test]
4-
fn broadcaster() {
5-
let broadcaster = PinkyBroadcaster::<Result<(), ()>>::default();
4+
fn broadcaster_ok() {
5+
let (promise, broadcaster) = PinkyErrorBroadcaster::<u8, ()>::new();
66
let sub1 = broadcaster.subscribe();
77
let sub2 = broadcaster.subscribe();
8-
broadcaster.swear(Ok(()));
8+
broadcaster.swear(Ok(42));
9+
assert_eq!(promise.wait(), Ok(42));
910
assert_eq!(sub1.wait(), Ok(()));
1011
assert_eq!(sub2.wait(), Ok(()));
1112
}
13+
14+
#[test]
15+
fn broadcaster_err() {
16+
let (promise, broadcaster) = PinkyErrorBroadcaster::<(), u8>::new();
17+
let sub1 = broadcaster.subscribe();
18+
let sub2 = broadcaster.subscribe();
19+
broadcaster.swear(Err(42));
20+
assert_eq!(promise.wait(), Err(42));
21+
assert_eq!(sub1.wait(), Err(42));
22+
assert_eq!(sub2.wait(), Err(42));
23+
}

0 commit comments

Comments
 (0)