-
Notifications
You must be signed in to change notification settings - Fork 29
Simple singleflight implementation. #449
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces a singleflight implementation to prevent duplicate work execution when multiple clients make concurrent calls for the same operation. The implementation provides a way to coalesce concurrent calls so only one execution happens, with all callers receiving the same result.
Key changes:
- Added
SingleFlightGroup<T>
for type-specific call coalescing - Added
SingleFlightManager
for managing multiple singleflight groups by string keys - Integrated the new module into the library's public interface
Reviewed Changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
xet_threadpool/src/simple_singleflight.rs |
Implements the core singleflight functionality with groups and manager |
xet_threadpool/src/lib.rs |
Adds the new module to the library |
Comments suppressed due to low confidence (1)
xet_threadpool/src/simple_singleflight.rs:14
- The module is added to lib.rs but not exported publicly. Users cannot access the
SingleFlightGroup
andSingleFlightManager
types. Consider addingpub use simple_singleflight::{SingleFlightGroup, SingleFlightManager};
to lib.rs to make these types available to library users.
///
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
Ok(v) => Ok(v), | ||
Err(e) => { | ||
let msg = format!( | ||
"Singleflight leader failed before broadcasing result, possible shutdown in progress. {e:?}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The word 'broadcasing' is misspelled and should be 'broadcasting'.
"Singleflight leader failed before broadcasing result, possible shutdown in progress. {e:?}" | |
"Singleflight leader failed before broadcasting result, possible shutdown in progress. {e:?}" |
Copilot uses AI. Check for mistakes.
let res = group.run(retrieval_function).await; | ||
|
||
// Now, clear out our key here; no need to keep a completed group around. | ||
let _ = self.groups.lock().await.remove(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The group is removed from the registry after every run completion, which means the type-checking benefit is lost for subsequent calls with the same key. This could lead to unexpected behavior where a key that was previously used with type T can suddenly be used with a different type, potentially causing confusion. Consider keeping completed groups in the registry or documenting this cleanup behavior more clearly.
let _ = self.groups.lock().await.remove(key); | |
// Do not automatically remove the group from the registry here. | |
// If you want to reuse the key with a different type, call `clear(key)` explicitly. |
Copilot uses AI. Check for mistakes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's possibly a race condition that causes an issue (maybe/right?):
There's a leader (L) running, any number of waiters on the same key, and in the meanwhile a new call (N) to a SingleFlightManager.run_single_flight finds a group for the same key.
both L and N need to acquire the group lock, L when work is finished, N in entering SingleFlightGroup.run, L finishes the work right when N has SingleFlightGroup.run called and manages to acquire the lock first, it then sends all the results to the waiters and releases the lock. Now N adds itself to the waiters and never receives a finish signal, rather is should get an error when the oneshot sender is dropped.
state: Arc<Mutex<Option<Vec<oneshot::Sender<T>>>>>, | ||
} | ||
|
||
impl<T: Clone + Send + Sync + 'static> SingleFlightGroup<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
enforcing the Clone
binding is iffy, I would like to do it, but it forces us to make all Result/Error types clone-able, which is not always available cleanly.
any aws/http errors tend to contain a streaming or at least non-clonable body, so we have to Arc wrap a bunch of errors possibly.
That being said, I'm on board with this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although simpler than the existing singleflight implementation, it is more strict to use (can't use with non-Clone Error types (e.g. anyhow)) and doesn't behave well when there are panics/cancellations of the leader task. It also re-introduces the deadlock bug we had with singleflight in the past.
/// Notes: | ||
/// - If the retrieval future panics, followers will receive a canceled oneshot and this function will panic in | ||
/// followers with a clear message. | ||
/// - If you want error handling instead of panics, change `T` to `Result<U, E>` and broadcast the result. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This requires that the error type E
implement Clone
, which is not the case for many of the places we use singleflight.
&self, | ||
key: &str, | ||
retrieval_function: F, | ||
) -> Result<T, MultithreadedRuntimeError> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Callers may (and in our case do) want to know if they were the leader/owner or follower/waiter (e.g. for monitoring purposes), so we need to return that back.
} | ||
} else { | ||
// We're the leader: get and run the retrieval *without* holding the lock. | ||
let value = retrieval_function().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the leader panics or is cancelled here, then the followers will never be informed and will thus wait forever.
}; | ||
|
||
// lock released; delegate | ||
let res = group.run(retrieval_function).await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the retrieval_function is run as a part of the leader task, then we are susceptible to deadlock situations where a chain of dependencies occurs. This is a situation we ran into in the past, leading to us needing to spawn the leader as a new tokio task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See singleflight's test_deadlock for an example of how such a situation can arise.
let res = group.run(retrieval_function).await; | ||
|
||
// Now, clear out our key here; no need to keep a completed group around. | ||
let _ = self.groups.lock().await.remove(key); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is going to be done by every task (leader + followers) that runs. Ideally, the leader should be the one to remove this to minimize the contention.
This PR proposes a simple singleflight implementation that can be associated with the runtime and used globally to disambiguate client calls using singleflight.
The idea is to associate this with the runtime so that multiple clients can use a common instance.