Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 42 additions & 16 deletions packages/grpc-js/src/retrying-call.ts
Original file line number Diff line number Diff line change
Expand Up @@ -760,11 +760,10 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
this.maybeStartHedgingTimer();
}

private handleChildWriteCompleted(childIndex: number) {
const childCall = this.underlyingCalls[childIndex];
const messageIndex = childCall.nextMessageToSend;
private handleChildWriteCompleted(childIndex: number, messageIndex: number) {
this.getBufferEntry(messageIndex).callback?.();
this.clearSentMessages();
const childCall = this.underlyingCalls[childIndex];
childCall.nextMessageToSend += 1;
this.sendNextChildMessage(childIndex);
}
Expand All @@ -774,19 +773,33 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
if (childCall.state === 'COMPLETED') {
return;
}
if (this.getBufferEntry(childCall.nextMessageToSend)) {
const bufferEntry = this.getBufferEntry(childCall.nextMessageToSend);
const messageIndex = childCall.nextMessageToSend;
if (this.getBufferEntry(messageIndex)) {
const bufferEntry = this.getBufferEntry(messageIndex);
switch (bufferEntry.entryType) {
case 'MESSAGE':
childCall.call.sendMessageWithContext(
{
callback: error => {
// Ignore error
this.handleChildWriteCompleted(childIndex);
this.handleChildWriteCompleted(childIndex, messageIndex);
},
},
bufferEntry.message!.message
);
// Optimization: if the next entry is HALF_CLOSE, send it immediately
// without waiting for the message callback. This is safe because the message
// has already been passed to the underlying transport.
const nextEntry = this.getBufferEntry(messageIndex + 1);
if (nextEntry.entryType === 'HALF_CLOSE') {
this.trace(
'Sending halfClose immediately after message to child [' +
childCall.call.getCallNumber() +
'] - optimizing for unary/final message'
);
childCall.nextMessageToSend += 1;
childCall.call.halfClose();
}
break;
case 'HALF_CLOSE':
childCall.nextMessageToSend += 1;
Expand All @@ -813,7 +826,11 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
};
this.writeBuffer.push(bufferEntry);
if (bufferEntry.allocated) {
context.callback?.();
// Run this in next tick to avoid suspending the current execution context
// otherwise it might cause half closing the call before sending message
process.nextTick(() => {
context.callback?.();
});
for (const [callIndex, call] of this.underlyingCalls.entries()) {
if (
call.state === 'ACTIVE' &&
Expand All @@ -823,7 +840,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
{
callback: error => {
// Ignore error
this.handleChildWriteCompleted(callIndex);
this.handleChildWriteCompleted(callIndex, messageIndex);
},
},
message
Expand All @@ -843,7 +860,7 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
{
callback: error => {
// Ignore error
this.handleChildWriteCompleted(this.committedCallIndex!);
this.handleChildWriteCompleted(this.committedCallIndex!, messageIndex);
},
},
message
Expand All @@ -868,12 +885,21 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
allocated: false,
});
for (const call of this.underlyingCalls) {
if (
call?.state === 'ACTIVE' &&
call.nextMessageToSend === halfCloseIndex
) {
call.nextMessageToSend += 1;
call.call.halfClose();
if (call?.state === 'ACTIVE') {
// Send halfClose to call when either:
// - nextMessageToSend === halfCloseIndex - 1: last message sent, callback pending (optimization)
// - nextMessageToSend === halfCloseIndex: all messages sent and acknowledged
if (call.nextMessageToSend === halfCloseIndex
|| call.nextMessageToSend === halfCloseIndex - 1) {
this.trace(
'Sending halfClose immediately to child [' +
call.call.getCallNumber() +
'] - all messages already sent'
);
call.nextMessageToSend += 1;
call.call.halfClose();
}
// Otherwise, halfClose will be sent by sendNextChildMessage when message callbacks complete
}
}
}
Expand All @@ -895,4 +921,4 @@ export class RetryingCall implements Call, DeadlineInfoProvider {
return null;
}
}
}
}
27 changes: 26 additions & 1 deletion packages/grpc-js/test/test-end-to-end.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import * as assert from 'assert';
import * as path from 'path';
import { loadProtoFile } from './common';
import { Metadata, Server, ServerDuplexStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, experimental, sendUnaryData } from '../src';
import { Metadata, Server, ServerCredentials, ServerDuplexStream, ServerReadableStream, ServerUnaryCall, ServiceClientConstructor, ServiceError, credentials, experimental, sendUnaryData } from '../src';
import { ServiceClient } from '../src/make-client';

const protoFile = path.join(__dirname, 'fixtures', 'echo_service.proto');
Expand All @@ -36,6 +36,15 @@ const echoServiceImplementation = {
call.end();
});
},
echoClientStream(call: ServerReadableStream<any, any>, callback: sendUnaryData<any>) {
const messages: any[] = [];
call.on('data', (message: any) => {
messages.push(message);
});
call.on('end', () => {
callback(null, { value: messages.map(m => m.value).join(','), value2: messages.length });
});
},
};

describe('Client should successfully communicate with server', () => {
Expand Down Expand Up @@ -77,4 +86,20 @@ describe('Client should successfully communicate with server', () => {
});
});
}).timeout(5000);

it('Client streaming with one message should work', done => {
server = new Server();
server.addService(EchoService.service, echoServiceImplementation);
server.bindAsync('localhost:0', ServerCredentials.createInsecure(), (error, port) => {
assert.ifError(error);
client = new EchoService(`localhost:${port}`, credentials.createInsecure());
const call = client.echoClientStream((error: ServiceError, response: any) => {
assert.ifError(error);
assert.deepStrictEqual(response, { value: 'test value', value2: 1 });
done();
});
call.write({ value: 'test value', value2: 42 });
call.end();
});
});
});