From 50560902914c8faabc79c566b3328defc36f367c Mon Sep 17 00:00:00 2001 From: MarioDu Date: Wed, 24 Jan 2024 11:08:08 +0800 Subject: [PATCH] feat: options for shrink cooldown on worker starter (#101) * feat: options for shrink cooldown on worker starter --- proto/noslated/common.proto | 1 + .../e2e/ema_startup_shrink_cooldown.test.ts | 97 +++++++++++++++++++ src/control_plane/worker_stats/broker.ts | 7 +- src/lib/json/function_profile.ts | 2 + src/lib/json/function_profile_schema.json | 4 + 5 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 src/control_plane/__test__/e2e/ema_startup_shrink_cooldown.test.ts diff --git a/proto/noslated/common.proto b/proto/noslated/common.proto index 7e14e26..6ae4990 100644 --- a/proto/noslated/common.proto +++ b/proto/noslated/common.proto @@ -74,6 +74,7 @@ message FunctionProfileWorker { float scaleFactor = 20; float precisionZeroThreshold = 21; string concurrencyStatsMode = 22; + bool shrinkCooldownOnStartup = 23; } message PlaneHealthyResponse { diff --git a/src/control_plane/__test__/e2e/ema_startup_shrink_cooldown.test.ts b/src/control_plane/__test__/e2e/ema_startup_shrink_cooldown.test.ts new file mode 100644 index 0000000..ce4c1b1 --- /dev/null +++ b/src/control_plane/__test__/e2e/ema_startup_shrink_cooldown.test.ts @@ -0,0 +1,97 @@ +import * as common from '#self/test/common'; +import { baselineDir } from '#self/test/common'; +import assert from 'assert'; +import { bufferFromStream, sleep } from '#self/lib/util'; +import { DefaultEnvironment } from '#self/test/env/environment'; + +describe(common.testName(__filename), function () { + // Debug version of Node.js may take longer time to bootstrap. + this.timeout(30_000); + + const env = new DefaultEnvironment({ + createTestClock: true, + config: common.extendDefaultConfig({ + virtualMemoryPoolSize: '4gb', + controlPlane: { + useEmaScaling: true, + workerTrafficStatsPullingMs: 1000, + }, + }), + }); + + it('first shrink should work on request finish', async () => { + await env.agent.setFunctionProfile([ + { + name: 'aworker_echo_ema', + runtime: 'aworker', + url: `file://${baselineDir}/aworker_echo`, + sourceFile: 'index.js', + signature: 'md5:234234', + worker: { + maxActivateRequests: 1, + shrinkCooldownOnStartup: false, + }, + resourceLimit: { + memory: 200 * 1024 * 1024, + }, + }, + ]); + + const statManager = env.control._ctx.getInstance('stateManager'); + + await request('aworker_echo_ema', env); + + const brokerBefore = statManager.getBroker('aworker_echo_ema', false); + + assert(brokerBefore?.workerCount === 1); + + await sleep(10000); + + const brokerAfter = statManager.getBroker('aworker_echo_ema', false); + assert(brokerAfter == null); + }); + + it('first shrink should cooldown on request finish when shrinkCooldownOnStartup=true', async () => { + await env.agent.setFunctionProfile([ + { + name: 'aworker_echo_ema', + runtime: 'aworker', + url: `file://${baselineDir}/aworker_echo`, + sourceFile: 'index.js', + signature: 'md5:234234', + worker: { + maxActivateRequests: 1, + shrinkCooldownOnStartup: true, + }, + resourceLimit: { + memory: 200 * 1024 * 1024, + }, + }, + ]); + + const statManager = env.control._ctx.getInstance('stateManager'); + + await request('aworker_echo_ema', env); + + const brokerBefore = statManager.getBroker('aworker_echo_ema', false); + + assert(brokerBefore?.workerCount === 1); + + await sleep(10000); + + const brokerAfter = statManager.getBroker('aworker_echo_ema', false); + assert(brokerAfter?.workerCount === 1); + }); +}); + +async function request(functionName: string, env: DefaultEnvironment) { + const data = Buffer.from('200'); + + const response = await env.agent.invoke(functionName, data, { + method: 'POST', + }); + + assert.strictEqual(response.status, 200); + + return await bufferFromStream(response); +} diff --git a/src/control_plane/worker_stats/broker.ts b/src/control_plane/worker_stats/broker.ts index 9a70fbc..e64a6cd 100644 --- a/src/control_plane/worker_stats/broker.ts +++ b/src/control_plane/worker_stats/broker.ts @@ -43,7 +43,8 @@ class Broker { this.redundantTimes = 0; this._lastExpandTime = 0; - this._lastShrinkTime = 0; + // 开启时,启动后缩容进入冷却,防止 worker 被过快回收 + this._lastShrinkTime = this.shrinkCooldownOnStartup ? Date.now() : 0; } get runtime() { @@ -110,6 +111,10 @@ class Broker { return this.#profile.worker.precisionZeroThreshold || 0.01; } + get shrinkCooldownOnStartup() { + return this.#profile.worker.shrinkCooldownOnStartup ?? true; + } + isExpandCooldown(now: number) { return now - this._lastExpandTime < this.expandCooldown; } diff --git a/src/lib/json/function_profile.ts b/src/lib/json/function_profile.ts index c00c75b..4f8c062 100644 --- a/src/lib/json/function_profile.ts +++ b/src/lib/json/function_profile.ts @@ -51,6 +51,8 @@ export interface ProcessFunctionProfile { precisionZeroThreshold?: number; // worker 并发度统计算法 concurrencyStatsMode?: ConcurrencyStatsMode; + // 启动后是否进入缩容冷却期,默认为 true + shrinkCooldownOnStartup?: boolean; }; environments?: { key: string; diff --git a/src/lib/json/function_profile_schema.json b/src/lib/json/function_profile_schema.json index db18216..9639790 100644 --- a/src/lib/json/function_profile_schema.json +++ b/src/lib/json/function_profile_schema.json @@ -136,6 +136,10 @@ "type": "string", "description": "The concurrency stats mode, default is instant", "enum": ["instant", "periodic_max", "periodic_avg"] + }, + "shrinkCooldownOnStartup": { + "type": "boolean", + "description": "whether shrink cooldown on worker startup" } }, "additionalProperties": true