Skip to content

Commit 83b52e1

Browse files
authored
fix: reduce duplicate code in streaming retries and add a test (#1636)
* fix: reduce duplicate code in streaming retries and add a test * add comments
1 parent b5d984a commit 83b52e1

File tree

2 files changed

+78
-14
lines changed

2 files changed

+78
-14
lines changed

gax/src/streamingCalls/streaming.ts

+12-14
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,15 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
119119
this.gaxServerStreamingRetries = gaxServerStreamingRetries;
120120
}
121121

122+
private shouldRetryRequest(error: Error, retry: RetryOptions): boolean {
123+
const e = GoogleError.parseGRPCStatusDetails(error);
124+
let shouldRetry = this.defaultShouldRetry(e!, retry);
125+
if (retry.shouldRetryFn) {
126+
shouldRetry = retry.shouldRetryFn(e!);
127+
}
128+
return shouldRetry;
129+
}
130+
122131
cancel() {
123132
if (this.stream) {
124133
this.stream.cancel();
@@ -228,13 +237,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
228237
}
229238

230239
this.retries!++;
231-
const e = GoogleError.parseGRPCStatusDetails(error);
232-
let shouldRetry = this.defaultShouldRetry(e!, retry);
233-
if (retry.shouldRetryFn) {
234-
shouldRetry = retry.shouldRetryFn(e!);
235-
}
236-
237-
if (shouldRetry) {
240+
if (this.shouldRetryRequest(error, retry)) {
238241
const toSleep = Math.random() * delay;
239242
setTimeout(() => {
240243
now = new Date();
@@ -246,6 +249,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
246249
timeout = Math.min(timeoutCal, rpcTimeout, newDeadline);
247250
}, toSleep);
248251
} else {
252+
const e = GoogleError.parseGRPCStatusDetails(error);
249253
e.note =
250254
'Exception occurred in retry method that was ' +
251255
'not classified as transient';
@@ -377,13 +381,7 @@ export class StreamProxy extends duplexify implements GRPCCallResult {
377381
const timeout = retry.backoffSettings.totalTimeoutMillis;
378382
const maxRetries = retry.backoffSettings.maxRetries!;
379383
if ((maxRetries && maxRetries > 0) || (timeout && timeout > 0)) {
380-
const e = GoogleError.parseGRPCStatusDetails(error);
381-
let shouldRetry = this.defaultShouldRetry(e!, retry);
382-
if (retry.shouldRetryFn) {
383-
shouldRetry = retry.shouldRetryFn(e!);
384-
}
385-
386-
if (shouldRetry) {
384+
if (this.shouldRetryRequest(error, retry)) {
387385
if (maxRetries && timeout!) {
388386
const newError = new GoogleError(
389387
'Cannot set both totalTimeoutMillis and maxRetries ' +

gax/test/test-application/src/index.ts

+66
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ async function testShowcase() {
151151
);
152152

153153
await testErrorMaxRetries0(grpcSequenceClientWithServerStreamingRetries);
154+
await testServerStreamingRetriesImmediatelywithRetryOptions(
155+
grpcSequenceClientWithServerStreamingRetries
156+
);
157+
154158
// ensure legacy tests pass with streaming retries client
155159
await testEcho(grpcClientWithServerStreamingRetries);
156160
await testEchoError(grpcClientWithServerStreamingRetries);
@@ -1219,6 +1223,68 @@ async function testErrorMaxRetries0(client: SequenceServiceClient) {
12191223
});
12201224
});
12211225
}
1226+
// a streaming call that retries two times and finishes successfully
1227+
async function testServerStreamingRetriesImmediatelywithRetryOptions(
1228+
client: SequenceServiceClient
1229+
) {
1230+
const finalData: string[] = [];
1231+
const backoffSettings = createBackoffSettings(
1232+
100,
1233+
1.2,
1234+
1000,
1235+
null,
1236+
1.5,
1237+
3000,
1238+
10000
1239+
);
1240+
1241+
// allow the two codes we are going to send as errors
1242+
const retryOptions = new RetryOptions([14, 4], backoffSettings);
1243+
1244+
const settings = {
1245+
retry: retryOptions,
1246+
};
1247+
1248+
client.initialize();
1249+
1250+
// errors immediately, then again after sending "This is"
1251+
const request = createStreamingSequenceRequestFactory(
1252+
[Status.UNAVAILABLE, Status.DEADLINE_EXCEEDED, Status.OK],
1253+
[0.1, 0.1, 0.1],
1254+
[0, 2, 11],
1255+
'This is testing the brand new and shiny StreamingSequence server 3'
1256+
);
1257+
1258+
const response = await client.createStreamingSequence(request);
1259+
await new Promise<void>((resolve, reject) => {
1260+
const sequence = response[0];
1261+
1262+
const attemptRequest =
1263+
new protos.google.showcase.v1beta1.AttemptStreamingSequenceRequest();
1264+
attemptRequest.name = sequence.name!;
1265+
1266+
const attemptStream = client.attemptStreamingSequence(
1267+
attemptRequest,
1268+
settings
1269+
);
1270+
attemptStream.on('data', (response: {content: string}) => {
1271+
finalData.push(response.content);
1272+
});
1273+
attemptStream.on('error', error => {
1274+
reject(error);
1275+
});
1276+
attemptStream.on('end', () => {
1277+
attemptStream.end();
1278+
1279+
resolve();
1280+
});
1281+
}).then(() => {
1282+
assert.equal(
1283+
finalData.join(' '),
1284+
'This is This is testing the brand new and shiny StreamingSequence server 3'
1285+
);
1286+
});
1287+
}
12221288

12231289
async function main() {
12241290
const showcaseServer = new ShowcaseServer();

0 commit comments

Comments
 (0)