Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

diagnostics_channel: fix ref counting bug when reaching zero subscribers #47520

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 31 additions & 21 deletions lib/diagnostics_channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const {
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypeSplice,
SafeFinalizationRegistry,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Promise,
Expand All @@ -29,14 +30,29 @@ const { triggerUncaughtException } = internalBinding('errors');

const { WeakReference } = internalBinding('util');

function decRef(channel) {
if (channels.get(channel.name).decRef() === 0) {
channels.delete(channel.name);
// Can't delete when weakref count reaches 0 as it could increment again.
// Only GC can be used as a valid time to clean up the channels map.
class WeakRefMap extends SafeMap {
#finalizers = new SafeFinalizationRegistry((key) => {
this.delete(key);
});

set(key, value) {
this.#finalizers.register(value, key);
return super.set(key, new WeakReference(value));
}
}

function incRef(channel) {
channels.get(channel.name).incRef();
get(key) {
return super.get(key)?.get();
}

incRef(key) {
return super.get(key)?.incRef();
}

decRef(key) {
return super.get(key)?.decRef();
}
}

function markActive(channel) {
Expand Down Expand Up @@ -81,7 +97,7 @@ class ActiveChannel {
subscribe(subscription) {
validateFunction(subscription, 'subscription');
ArrayPrototypePush(this._subscribers, subscription);
incRef(this);
channels.incRef(this.name);
}

unsubscribe(subscription) {
Expand All @@ -90,15 +106,15 @@ class ActiveChannel {

ArrayPrototypeSplice(this._subscribers, index, 1);

decRef(this);
channels.decRef(this.name);
maybeMarkInactive(this);

return true;
}

bindStore(store, transform) {
const replacing = this._stores.has(store);
if (!replacing) incRef(this);
if (!replacing) channels.incRef(this.name);
this._stores.set(store, transform);
}

Expand All @@ -109,7 +125,7 @@ class ActiveChannel {

this._stores.delete(store);

decRef(this);
channels.decRef(this.name);
maybeMarkInactive(this);

return true;
Expand Down Expand Up @@ -154,7 +170,7 @@ class Channel {
this._stores = undefined;
this.name = name;

channels.set(name, new WeakReference(this));
channels.set(name, this);
}

static [SymbolHasInstance](instance) {
Expand Down Expand Up @@ -192,12 +208,10 @@ class Channel {
}
}

const channels = new SafeMap();
const channels = new WeakRefMap();

function channel(name) {
let channel;
const ref = channels.get(name);
if (ref) channel = ref.get();
const channel = channels.get(name);
if (channel) return channel;

if (typeof name !== 'string' && typeof name !== 'symbol') {
Expand All @@ -216,12 +230,8 @@ function unsubscribe(name, subscription) {
}

function hasSubscribers(name) {
let channel;
const ref = channels.get(name);
if (ref) channel = ref.get();
if (!channel) {
return false;
}
const channel = channels.get(name);
if (!channel) return false;

return channel.hasSubscribers;
}
Expand Down
7 changes: 7 additions & 0 deletions test/parallel/test-diagnostics-channel-pub-sub.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,10 @@ assert.ok(!dc.unsubscribe(name, subscriber));
assert.throws(() => {
dc.subscribe(name, null);
}, { code: 'ERR_INVALID_ARG_TYPE' });

// Reaching zero subscribers should not delete from the channels map as there
// will be no more weakref to incRef if another subscribe happens while the
// channel object itself exists.
channel.subscribe(subscriber);
Qard marked this conversation as resolved.
Show resolved Hide resolved
channel.unsubscribe(subscriber);
channel.subscribe(subscriber);