Skip to content

Commit

Permalink
feat: close traffic timeout (#104)
Browse files Browse the repository at this point in the history
Feat: AONE#54728515
  • Loading branch information
mariodu authored Jan 25, 2024
1 parent 828ae00 commit f2c11f6
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 5 deletions.
22 changes: 22 additions & 0 deletions fixtures/baseline/aworker_stream/stream_with_delay.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
'use strict';

addEventListener('fetch', event => {
event.respondWith(
Promise.resolve().then(async () => {
const time = await event.request.text();
const body = new ReadableStream({
start(controller) {
controller.enqueue('foobar');
setTimeout(() => {
controller.enqueue('end');
controller.close();
}, parseInt(time, 10));
},
});

return new Response(body, {
status: 200,
});
})
);
});
1 change: 1 addition & 0 deletions src/config/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export default {
daprAdaptorModulePath: undefined,
daprAdaptorModuleOptions: undefined,
beaconHostModulePath: undefined,
closeTrafficTimeout: 30_000,
},

dirs: {
Expand Down
4 changes: 4 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ export interface DatePlaneConfig {
* Beacon Host 模块路径
*/
beaconHostModulePath?: string;
/**
* 关闭流量超时时间,单位毫秒,默认 30s
*/
closeTrafficTimeout: number;
}

export interface DirConfig {
Expand Down
122 changes: 122 additions & 0 deletions src/data_plane/__test__/e2e/close_traffic.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import * as common from '#self/test/common';
import { baselineDir } from '#self/test/common';
import assert from 'assert';
import { bufferFromStream, sleep } from '#self/lib/util';
import { DefaultEnvironment } from '#self/test/env/environment';

describe(common.testName(__filename), function () {
// Debug version of Node.js may take longer time to bootstrap.
this.timeout(60_000);

const env = new DefaultEnvironment({
config: common.extendDefaultConfig({
controlPlane: {
useEmaScaling: true,
dumpWorkerTrafficStats: true,
},
dataPlane: {
closeTrafficTimeout: 2000,
},
}),
});

it('should close worker', async () => {
await env.agent.setFunctionProfile([
{
name: 'aworker_stream',
runtime: 'aworker',
url: `file://${baselineDir}/aworker_stream`,
sourceFile: 'stream_with_delay.js',
signature: 'md5:234234',
worker: {
maxActivateRequests: 1,
disposable: true,
},
resourceLimit: {
memory: 200 * 1024 * 1024,
},
},
]);

await request('aworker_stream', env);

await sleep(1000);

const broker = env.data.dataFlowController.brokers.get(
'aworker_stream$$noinspect'
);

assert.strictEqual(broker?.workerCount, 0);
});

it('should close worker with timeout', async () => {
await env.agent.setFunctionProfile([
{
name: 'aworker_stream_delay',
runtime: 'aworker',
url: `file://${baselineDir}/aworker_stream`,
sourceFile: 'stream_with_delay.js',
signature: 'md5:234234',
worker: {
maxActivateRequests: 1,
concurrencyShrinkThreshold: 0.6,
},
resourceLimit: {
memory: 200 * 1024 * 1024,
},
},
]);

request('aworker_stream_delay', env, 30000).catch(error => {
assert(error);
assert(error.message.includes('Peer connection closed'));
});

await sleep(1000);

let broker = env.data.dataFlowController.brokers.get(
'aworker_stream_delay$$noinspect'
);

const worker = Array.from(
broker?.['_workerMap'].values() || []
).pop() as any;

env.data.dataFlowController.closeTraffic([
{
functionName: 'aworker_stream_delay',
inspector: false,
workers: [
{
credential: worker!.worker!.credential,
},
],
},
]);

await sleep(3000);

broker = env.data.dataFlowController.brokers.get(
'aworker_stream_delay$$noinspect'
);

assert.strictEqual(broker?.workerCount, 0);
});
});

async function request(
functionName: string,
env: DefaultEnvironment,
timeout = 0
) {
const data = Buffer.from(timeout + '');

const response = await env.agent.invoke(functionName, data, {
method: 'POST',
timeout: 30000,
});

assert.strictEqual(response.status, 200);

return await bufferFromStream(response);
}
9 changes: 7 additions & 2 deletions src/data_plane/data_flow_controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ export class DataFlowController extends BaseOf(EventEmitter) {
const tag = worker
? (worker as Worker).name || `{${credential}}`
: `{${credential}}`;
logger.info('worker(%s) disconnected %s', tag);
logger.info('worker(%s) disconnected', tag);

broker.removeWorker(credential);
this.credentialBrokerMap.delete(credential);
Expand Down Expand Up @@ -314,7 +314,12 @@ export class DataFlowController extends BaseOf(EventEmitter) {
code: RpcStatus.NOT_FOUND,
});
}
const broker = new WorkerBroker(this, profile, options);
const broker = new WorkerBroker(
this,
profile,
options,
this.config.dataPlane.closeTrafficTimeout
);
this.brokers.set(key, broker);
return broker;
}
Expand Down
25 changes: 22 additions & 3 deletions src/data_plane/worker_broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,38 @@ export class Worker extends EventEmitter {
/**
* Close this worker's traffic.
*/
async closeTraffic() {
async closeTraffic(timeout: number) {
this.trafficOff = true;
this.logger.info(
'traffic closed, waiting for activeRequestCount down to 0...'
);

if (this.activeRequestCount <= 0) {
return Promise.resolve(true);
}

const { promise, resolve } = utils.createDeferred<boolean>();

let timer: NodeJS.Timeout | undefined = undefined;

const downToZero: (...args: any[]) => void = () => {
clearTimeout(timer);
this.logger.debug('activeRequestCount down to 0.');
resolve(true);
};

this.once('downToZero', downToZero);

timer = setTimeout(() => {
this.logger.warn(
`waiting for activeRequestCount down to 0 timeout, activeRequestCount is ${this.activeRequestCount}, will force kill.`
);
resolve(true);
this.removeListener('downToZero', downToZero);
}, timeout);

timer.unref();

return promise;
}

Expand Down Expand Up @@ -277,7 +295,8 @@ export class WorkerBroker extends Base implements DispatcherDelegate {
constructor(
public dataFlowController: DataFlowController,
private _profile: RawWithDefaultsFunctionProfile,
public options: BrokerOptions = {}
public options: BrokerOptions = {},
private _closeTrafficTimeout = 10_000
) {
super();

Expand Down Expand Up @@ -354,7 +373,7 @@ export class WorkerBroker extends Base implements DispatcherDelegate {
async closeTraffic(worker: Worker) {
try {
this._dispatcher.unregisterWorker(worker);
await worker.closeTraffic();
await worker.closeTraffic(this._closeTrafficTimeout);

// 同步 RequestDrained
this.host.broadcastContainerStatusReport({
Expand Down

0 comments on commit f2c11f6

Please sign in to comment.