Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 35 additions & 18 deletions rpc/src/rpc_subscription_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ pub struct SignatureSubscriptionParams {

#[derive(Clone)]
pub struct SubscriptionControl(Arc<SubscriptionControlInner>);
pub struct WeakSubscriptionTokenRef(Weak<SubscriptionTokenInner>, SubscriptionId);

struct SubscriptionControlInner {
subscriptions: DashMap<SubscriptionParams, Weak<SubscriptionTokenInner>>,
subscriptions: DashMap<SubscriptionParams, WeakSubscriptionTokenRef>,
next_id: AtomicU64,
max_active_subscriptions: usize,
sender: crossbeam_channel::Sender<TimestampedNotificationEntry>,
Expand Down Expand Up @@ -195,33 +196,44 @@ impl SubscriptionControl {
self.0.subscriptions.len()
);
let count = self.0.subscriptions.len();
match self.0.subscriptions.entry(params) {
DashEntry::Occupied(entry) => Ok(SubscriptionToken(
entry
.get()
.upgrade()
.expect("dead subscription encountered in SubscriptionControl"),
let create_token_and_weak_ref = |id, params| {
let token = SubscriptionToken(
Arc::new(SubscriptionTokenInner {
control: Arc::clone(&self.0),
params,
id,
}),
self.0.counter.create_token(),
)),
);
let weak_ref = WeakSubscriptionTokenRef(Arc::downgrade(&token.0), token.0.id);
(token, weak_ref)
};

match self.0.subscriptions.entry(params) {
DashEntry::Occupied(mut entry) => match entry.get().0.upgrade() {
Some(token_ref) => Ok(SubscriptionToken(token_ref, self.0.counter.create_token())),
// This means the last Arc for this Weak pointer entered the drop just before us,
// but could not remove the entry since we are holding the write lock.
// See `Drop` implementation for `SubscriptionTokenInner` for further info.
None => {
let (token, weak_ref) =
create_token_and_weak_ref(entry.get().1, entry.key().clone());
entry.insert(weak_ref);
Ok(token)
}
},
DashEntry::Vacant(entry) => {
if count >= self.0.max_active_subscriptions {
inc_new_counter_info!("rpc-subscription-refused-limit-reached", 1);
return Err(Error::TooManySubscriptions);
}
let id = SubscriptionId::from(self.0.next_id.fetch_add(1, Ordering::AcqRel));
let token = SubscriptionToken(
Arc::new(SubscriptionTokenInner {
control: Arc::clone(&self.0),
params: entry.key().clone(),
id,
}),
self.0.counter.create_token(),
);
let (token, weak_ref) = create_token_and_weak_ref(id, entry.key().clone());
let _ = self
.0
.sender
.send(NotificationEntry::Subscribed(token.0.params.clone(), id).into());
entry.insert(Arc::downgrade(&token.0));
entry.insert(weak_ref);
datapoint_info!(
"rpc-subscription",
("total", self.0.subscriptions.len(), i64)
Expand Down Expand Up @@ -505,7 +517,9 @@ impl Drop for SubscriptionTokenInner {
DashEntry::Vacant(_) => {
warn!("Subscriptions inconsistency (missing entry in by_params)");
}
DashEntry::Occupied(entry) => {
// Check the strong refs count to ensure no other thread recreated this subscription (not token)
// while we were acquiring the lock.
DashEntry::Occupied(entry) if entry.get().0.strong_count() == 0 => {
let _ = self
.control
.sender
Expand All @@ -516,6 +530,9 @@ impl Drop for SubscriptionTokenInner {
("total", self.control.subscriptions.len(), i64)
);
}
// This branch handles the case in which this entry got recreated
// while we were waiting for the lock (inside the `DashMap::entry` method).
DashEntry::Occupied(_entry) /* if _entry.get().0.strong_count() > 0 */ => (),
}
}
}
Expand Down