Skip to content

Commit

Permalink
lib: child process queue pending messages
Browse files Browse the repository at this point in the history
It fixes the problem for the child process not receiving messages.

Fixes: #41134
  • Loading branch information
ErickWendel committed Dec 17, 2021
1 parent 0d9f3bd commit 0f48517
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
25 changes: 24 additions & 1 deletion lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const {
ObjectDefineProperty,
ObjectSetPrototypeOf,
ReflectApply,
SafeSet,
StringPrototypeSlice,
Symbol,
Uint8Array,
Expand Down Expand Up @@ -81,6 +82,7 @@ let HTTPParser;
const MAX_HANDLE_RETRANSMISSIONS = 3;
const kChannelHandle = Symbol('kChannelHandle');
const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');
const kPendingMessages = Symbol('pendingMessages');

// This object contain function to convert TCP objects to native handle objects
// and back again.
Expand Down Expand Up @@ -526,6 +528,7 @@ class Control extends EventEmitter {
constructor(channel) {
super();
this.#channel = channel;
this[kPendingMessages] = new SafeSet();
}

// The methods keeping track of the counter are being used to track the
Expand Down Expand Up @@ -699,6 +702,19 @@ function setupChannel(target, channel, serializationMode) {
});
});

target.on('newListener', function() {
if (!target.channel) return;

const messages = target.channel[kPendingMessages];
if (!messages.size) return;

for (const messageParams of messages) {
process.nextTick(() => ReflectApply(target.emit, target, messageParams));
}

messages.clear();
});

target.send = function(message, handle, options, callback) {
if (typeof handle === 'function') {
callback = handle;
Expand Down Expand Up @@ -912,7 +928,14 @@ function setupChannel(target, channel, serializationMode) {
};

function emit(event, message, handle) {
target.emit(event, message, handle);
const isInternalMessage = 'internalMessage' === event;
const hasListenersInstalled = target.listenerCount('message');
if (hasListenersInstalled || isInternalMessage) {
target.emit(event, message, handle);
return;
}

target.channel[kPendingMessages].add([event, message, handle]);
}

function handleMessage(message, handle, internal) {
Expand Down
20 changes: 20 additions & 0 deletions test/es-module/test-esm-child-process-fork-main.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import '../common/index.mjs';
import assert from 'assert';
import { fork } from 'child_process';
import { once } from 'events';

if (process.argv[2] !== 'child') {
const currentFile = 'test-esm-child-process-fork-main.mjs';
const { pathname: filename } = new URL(currentFile, import.meta.url);
const cp = fork(filename, ['child']);
const message = 'Hello World';
cp.send(message);

const [received] = await once(cp, 'message');

assert.deepStrictEqual(received, message);

cp.disconnect();
} else {
process.on('message', (msg) => process.send(msg));
}

0 comments on commit 0f48517

Please sign in to comment.