Skip to content

Commit

Permalink
feat: support async iterator for paging method (#708)
Browse files Browse the repository at this point in the history
* async iteratoe logic & unit test

* run all unit tests

* add end-to-end test for async iterator

* add end-to-end test for page stream

* fix lint

* system-test

* clean up

* system-test

* fix

* fix test

* fix unit tests

* fix end-to-end test

* feedback

* clean

* change name to asyncIterate

* lint

* system-test

* proper type for iterable

* clean up

* feedback

* test

* put common code in a function

* lint

* clean up

* debugging

* make it work

* feedback

* remove extra params

* fix

* make it work first

* resolve request & func

* clean up

* polish test

* clean

* add timeout for end-to-end test

* expand timeout for testExpand

* make test work

* expand timeout

* fix

* test

* test will work

* feedback

* lint

* fix: no third party promises anymore

Co-authored-by: Alexander Fenster <[email protected]>
  • Loading branch information
xiaozhenliu-gg5 and alexander-fenster authored Mar 20, 2020
1 parent 511fc23 commit 3ac5afb
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 12 deletions.
104 changes: 103 additions & 1 deletion src/paginationCalls/pageDescriptor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,32 @@ import * as ended from 'is-stream-ended';
import {PassThrough, Transform} from 'stream';

import {APICaller} from '../apiCaller';
import {GaxCall, APICallback} from '../apitypes';
import {
GaxCall,
APICallback,
SimpleCallbackFunction,
RequestType,
} from '../apitypes';
import {Descriptor} from '../descriptor';
import {CallSettings} from '../gax';
import {NormalApiCaller} from '../normalCalls/normalApiCaller';

import {PagedApiCaller} from './pagedApiCaller';
import * as call from '../call';

export interface ResponseType {
[index: string]: string;
}
/**
* A descriptor for methods that support pagination.
*/
export class PageDescriptor implements Descriptor {
resolveParams: Function;
requestPageTokenField: string;
responsePageTokenField: string;
requestPageSizeField?: string;
resourceField: string;
cache: Array<{}>;

constructor(
requestPageTokenField: string,
Expand All @@ -42,6 +53,8 @@ export class PageDescriptor implements Descriptor {
this.requestPageTokenField = requestPageTokenField;
this.responsePageTokenField = responsePageTokenField;
this.resourceField = resourceField;
this.resolveParams = () => {};
this.cache = [];
}

/**
Expand Down Expand Up @@ -103,6 +116,95 @@ export class PageDescriptor implements Descriptor {
return stream;
}

/**
* Create an async iterable which can be recursively called for data on-demand.
*/
asyncIterate(
apiCall: GaxCall,
request: RequestType,
options: CallSettings
): AsyncIterable<{} | undefined> {
const iterable = this.createIterator(options);
const funcPromise =
typeof apiCall === 'function' ? Promise.resolve(apiCall) : apiCall;
funcPromise
.then((func: GaxCall) => {
this.makeCall(request, func, options);
})
.catch(error => {
throw new Error(error);
});
return iterable;
}

createIterator(options: CallSettings): AsyncIterable<{} | undefined> {
const self = this;
const asyncIterable = {
[Symbol.asyncIterator]() {
const paramPromise: Promise<[
RequestType,
SimpleCallbackFunction
]> = new Promise(resolve => {
self.resolveParams = resolve;
});
let nextPageRequest: RequestType | null = {};
let firstCall = true;
return {
async next() {
const ongoingCall = new call.OngoingCallPromise();
const [request, func] = await paramPromise;
if (self.cache.length > 0) {
return Promise.resolve({done: false, value: self.cache.shift()});
}
if (!firstCall && !nextPageRequest) {
return Promise.resolve({done: true, value: undefined});
}
nextPageRequest = await self.getNextPageRequest(
func,
firstCall ? request : nextPageRequest!,
ongoingCall
);
firstCall = false;
if (self.cache.length === 0) {
nextPageRequest = null;
return Promise.resolve({done: true, value: undefined});
}
return Promise.resolve({done: false, value: self.cache.shift()});
},
};
},
};
return asyncIterable;
}

async getNextPageRequest(
func: SimpleCallbackFunction,
request: RequestType,
ongoingCall: call.OngoingCallPromise
): Promise<RequestType | null> {
ongoingCall.call(func, request);
let nextPageRequest = null;
const [response, nextRequest, rawResponse] = await ongoingCall.promise;
const pageToken = (response as ResponseType)[this.responsePageTokenField];
if (pageToken) {
nextPageRequest = Object.assign({}, request);
nextPageRequest[this.requestPageTokenField] = pageToken;
}
const responses = (response as ResponseType)[this.resourceField];
this.cache.push(...responses);
return nextPageRequest;
}

makeCall(request: RequestType, func: GaxCall, settings: CallSettings) {
if (settings.pageToken) {
request[this.requestPageTokenField] = settings.pageToken;
}
if (settings.pageSize) {
request[this.requestPageSizeField!] = settings.pageSize;
}
this.resolveParams([request, func]);
}

getApiCaller(settings: CallSettings): APICaller {
if (!settings.autoPaginate) {
return new NormalApiCaller();
Expand Down
53 changes: 44 additions & 9 deletions test/fixtures/google-gax-packaging-test-app/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,11 @@ async function testShowcase() {
return {
getRequestHeaders: () => {
return {
'Authorization': 'Bearer zzzz'
Authorization: 'Bearer zzzz',
};
}
},
};
}
},
};

const fallbackClientOpts = {
Expand All @@ -75,6 +75,7 @@ async function testShowcase() {
await testEcho(grpcClient);
await testExpand(grpcClient);
await testPagedExpand(grpcClient);
await testPagedExpandAsync(grpcClient);
await testCollect(grpcClient);
await testChat(grpcClient);
await testWait(grpcClient);
Expand All @@ -85,24 +86,34 @@ async function testShowcase() {

// Fallback clients do not currently support streaming
try {
await testExpand(fallbackClient)
throw new Error("Expand did not throw an error: Streaming calls should fail with fallback clients")
await testExpand(fallbackClient);
throw new Error(
'Expand did not throw an error: Streaming calls should fail with fallback clients'
);
} catch (err) {}
try {
await testCollect(fallbackClient)
throw new Error("Collect did not throw an error: Streaming calls should fail with fallback clients")
await testCollect(fallbackClient);
throw new Error(
'Collect did not throw an error: Streaming calls should fail with fallback clients'
);
} catch (err) {}
try {
await testChat(fallbackClient)
throw new Error("Chat did not throw an error: Streaming calls should fail with fallback clients")
await testChat(fallbackClient);
throw new Error(
'Chat did not throw an error: Streaming calls should fail with fallback clients'
);
} catch (err) {}
}

async function testEcho(client) {
const request = {
content: 'test',
};
const timer = setTimeout(() => {
throw new Error('End-to-end testEcho method fails with timeout');
}, 12000);
const [response] = await client.echo(request);
clearTimeout(timer);
assert.deepStrictEqual(request.content, response.content);
}

Expand Down Expand Up @@ -131,11 +142,35 @@ async function testPagedExpand(client) {
content: words.join(' '),
pageSize: 2,
};
const timer = setTimeout(() => {
throw new Error('End-to-end testPagedExpand method fails with timeout');
}, 12000);
const [response] = await client.pagedExpand(request);
clearTimeout(timer);
const result = response.map(r => r.content);
assert.deepStrictEqual(words, result);
}

async function testPagedExpandAsync(client) {
const words = ['nobody', 'ever', 'reads', 'test', 'input'];
const request = {
content: words.join(' '),
pageSize: 2,
};
const response = [];
const iterable = client.pagedExpandAsync(request);
const timer = setTimeout(() => {
throw new Error(
'End-to-end testPagedExpandAsync method fails with timeout'
);
}, 12000);
for await (const resource of iterable) {
response.push(resource.content);
}
clearTimeout(timer);
assert.deepStrictEqual(words, response);
}

async function testCollect(client) {
const words = ['nobody', 'ever', 'reads', 'test', 'input'];
const result = await new Promise((resolve, reject) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,9 @@ class EchoClient {
'wait',
'pagedExpand',
];
this._innerCallPromises = {};
for (const methodName of echoStubMethods) {
const innerCallPromise = echoStub.then(
this._innerCallPromises[methodName] = echoStub.then(
stub => (...args) => {
return stub[methodName].apply(stub, args);
},
Expand All @@ -205,7 +206,7 @@ class EchoClient {
}
);
this._innerApiCalls[methodName] = gaxModule.createApiCall(
innerCallPromise,
this._innerCallPromises[methodName],
defaults[methodName],
this._descriptors.page[methodName] ||
this._descriptors.stream[methodName] ||
Expand Down Expand Up @@ -582,6 +583,13 @@ class EchoClient {

return this._innerApiCalls.pagedExpand(request, options, callback);
}

pagedExpandAsync(request, options) {
options = options || {};
request = request || {};
const callSettings = new gax.CallSettings(options);
return this._descriptors.page.pagedExpand.asyncIterate(this._innerCallPromises['pagedExpand'], request, callSettings);
}
}

module.exports = EchoClient;
35 changes: 35 additions & 0 deletions test/fixtures/google-gax-packaging-test-app/test/gapic-v1beta1.js
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,41 @@ describe('EchoClient', () => {
});
});

it('invokes pagedExpand using async iterator', async () => {
const client = new showcaseModule.v1beta1.EchoClient({
credentials: {client_email: 'bogus', private_key: 'bogus'},
projectId: 'bogus',
});

// Mock request
const request = {};
const expectedResponse = [1, 2, 3, 4, 5, 6, 7, 8, 9];

client._descriptors.page.pagedExpand.asyncIterate = (apiCall, request, options) => {
let count = 0;
const asyncIterable = {
[Symbol.asyncIterator]() {
return {
async next(){
count = count + 1;
if(count === 10) return Promise.resolve({done: true, value: undefined});
return Promise.resolve({done: false, value: count});
}
}
}
}
return asyncIterable;
}

// test paging method by async iterator
const response = [];
const iterable = client.pagedExpandAsync(request);
for await (const resource of iterable){
response.push(resource);
}
assert.deepStrictEqual(response, expectedResponse);
});

it('invokes pagedExpand with error', done => {
const client = new showcaseModule.v1beta1.EchoClient({
credentials: {client_email: 'bogus', private_key: 'bogus'},
Expand Down
24 changes: 24 additions & 0 deletions test/unit/pagedIteration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {APICallback, GaxCallPromise} from '../../src/apitypes';

import * as util from './utils';
import {Stream} from 'stream';
import * as gax from '../../src/gax';

describe('paged iteration', () => {
const pageSize = 3;
Expand Down Expand Up @@ -194,6 +195,29 @@ describe('paged iteration', () => {
});
});

describe('use async iterator', () => {
const spy = sinon.spy(func);
let apiCall: GaxCallPromise;
beforeEach(() => {
apiCall = util.createApiCall(spy, createOptions);
});

async function iterableChecker(iterable: AsyncIterable<{} | undefined>) {
let counter = 0;
for await (const resource of iterable) {
counter++;
if (counter === 10) break;
}
expect(counter).to.equal(10);
}
it('returns an iterable, count to 10', () => {
const settings = new gax.CallSettings(
(createOptions && createOptions.settings) || {}
);
iterableChecker(descriptor.asyncIterate(apiCall, {}, settings));
});
});

describe('stream conversion', () => {
// tslint:disable-next-line no-any
let spy: any;
Expand Down

0 comments on commit 3ac5afb

Please sign in to comment.