From 5b2cdaa2ecae93696c536b8ad2d288b1a802c995 Mon Sep 17 00:00:00 2001 From: MarioDu Date: Thu, 28 Dec 2023 10:44:30 +0800 Subject: [PATCH] feat: use ema to scale (#92) * feat: use ema to scale --- proto/noslated/common.proto | 9 + src/config/default.ts | 1 + src/config/index.ts | 6 + src/config/loader.ts | 10 +- .../__test__/e2e/ema_scaling.test.ts | 113 ++++++++ src/control_plane/__test__/environment.ts | 1 + .../scaling_contrast_ema.test.ts | 186 ++++++++++++ .../scaling_contrast_legacy_1.test.ts | 145 ++++++++++ .../scaling_contrast_legacy_2.test.ts | 147 ++++++++++ src/control_plane/capacity_manager.ts | 106 ++++++- src/control_plane/data_plane_client/client.ts | 11 +- .../data_plane_client/manager.ts | 10 +- .../data_plane_client/subscription.ts | 12 +- src/control_plane/worker_stats/broker.ts | 91 +++++- .../worker_stats/state_manager.ts | 7 +- src/lib/__test__/ema_concurrency.test.ts | 101 +++++++ src/lib/ema_concurrency.ts | 79 ++++++ src/lib/json/function_profile.ts | 18 ++ src/lib/json/function_profile_schema.json | 266 ++++++++++-------- src/sdk/client.ts | 4 +- 20 files changed, 1174 insertions(+), 149 deletions(-) create mode 100644 src/control_plane/__test__/e2e/ema_scaling.test.ts create mode 100644 src/control_plane/__test__/scaling_contrast/scaling_contrast_ema.test.ts create mode 100644 src/control_plane/__test__/scaling_contrast/scaling_contrast_legacy_1.test.ts create mode 100644 src/control_plane/__test__/scaling_contrast/scaling_contrast_legacy_2.test.ts create mode 100644 src/lib/__test__/ema_concurrency.test.ts create mode 100644 src/lib/ema_concurrency.ts diff --git a/proto/noslated/common.proto b/proto/noslated/common.proto index 70990ad..bbf8cd4 100644 --- a/proto/noslated/common.proto +++ b/proto/noslated/common.proto @@ -64,6 +64,15 @@ message FunctionProfileWorker { bool disableSeed = 10; bool disableRequestQueue = 11; string dispatchMode = 12; + int32 concurrencySlidingWindowSize = 13; + int32 concurrencySlidingBucketCount = 14; + float emaConcurrencyAlpha = 15; + float concurrencyExpandThreshold = 16; + float concurrencyShrinkThreshold = 17; + int32 expandCooldown = 18; + int32 shrinkCooldown = 19; + float scaleFactor = 20; + float precisionZeroThreshold = 21; } message PlaneHealthyResponse { diff --git a/src/config/default.ts b/src/config/default.ts index d7b6fc8..8458a3f 100644 --- a/src/config/default.ts +++ b/src/config/default.ts @@ -17,6 +17,7 @@ export default { workerTrafficStatsPullingMs: 10_000, workerRedundantVictimSpareTimes: 6, capacityScalingStage: 0.7, + useEmaScaling: false, }, dataPlane: { diff --git a/src/config/index.ts b/src/config/index.ts index 935045e..a9421f6 100644 --- a/src/config/index.ts +++ b/src/config/index.ts @@ -114,6 +114,12 @@ export interface ControlPlaneConfig { * Default: 0.7 */ capacityScalingStage: number; + /** + * 使用基于时间窗口的移动加权平均算法进行扩容 + * 默认为 false + * 此配置开启会强制设置 workerTrafficStatsPullingMs 为 1000ms,确保数据同步及时 + */ + useEmaScaling: boolean; } export interface DatePlaneConfig { diff --git a/src/config/loader.ts b/src/config/loader.ts index 80a0f66..45dfc11 100644 --- a/src/config/loader.ts +++ b/src/config/loader.ts @@ -116,7 +116,8 @@ export function resolveConfig(): Config { ); const userFileConfig = loadConfig(process.env.NOSLATED_CONFIG_PATH); const envConfig = resolveEnvConfig(); - return extend( + + const finalConfig = extend( /** deep */ true, config, defaultConfig, @@ -125,6 +126,13 @@ export function resolveConfig(): Config { userFileConfig, envConfig ); + + // 开启 useEmaScaling 时,强制 workerTrafficStatsPullingMs 为 1000ms,确保数据同步及时 + if (finalConfig.controlPlane.useEmaScaling === true) { + finalConfig.controlPlane.workerTrafficStatsPullingMs = 1000; + } + + return finalConfig; } export function dumpConfig(name: string, config: Config) { diff --git a/src/control_plane/__test__/e2e/ema_scaling.test.ts b/src/control_plane/__test__/e2e/ema_scaling.test.ts new file mode 100644 index 0000000..db76a61 --- /dev/null +++ b/src/control_plane/__test__/e2e/ema_scaling.test.ts @@ -0,0 +1,113 @@ +import * as common from '#self/test/common'; +import { baselineDir } from '#self/test/common'; +import assert from 'assert'; +import { bufferFromStream, sleep } from '#self/lib/util'; +import { DefaultEnvironment } from '#self/test/env/environment'; + +/** + * 仅观测行为 + */ +describe(common.testName(__filename), function () { + // Debug version of Node.js may take longer time to bootstrap. + this.timeout(30_000); + + const env = new DefaultEnvironment({ + createTestClock: true, + config: common.extendDefaultConfig({ + virtualMemoryPoolSize: '4gb', + controlPlane: { + useEmaScaling: true, + workerTrafficStatsPullingMs: 1000, + }, + }), + }); + + it('should scaling smooth when traffic change', async () => { + await env.agent.setFunctionProfile([ + { + name: 'aworker_echo_ema', + runtime: 'aworker', + url: `file://${baselineDir}/aworker_echo`, + sourceFile: 'sleep.js', + signature: 'md5:234234', + worker: { + maxActivateRequests: 1, + replicaCountLimit: 15, + }, + resourceLimit: { + memory: 200 * 1024 * 1024, + }, + }, + ]); + + const sequence = [1, 1, 1, 10, 1, 6, 5, 0, 0, 0]; + + for (const concurrency of sequence) { + await makeConcurrencyRequest('aworker_echo_ema', concurrency, env); + await sleep(1000); + } + + await sleep(1000); + }); +}); + +async function request(functionName: string, env: DefaultEnvironment) { + const data = Buffer.from('1000'); + + const response = await env.agent.invoke(functionName, data, { + method: 'POST', + }); + + assert.strictEqual(response.status, 200); + + return await bufferFromStream(response); +} + +function makeConcurrencyRequest( + functionName: string, + concurrency: number, + env: DefaultEnvironment +) { + const requests = new Array(concurrency).fill(0).map(() => { + return request(functionName, env); + }); + + return Promise.all(requests); +} + +const WEIGHTS = { + 0: 10, + 1: 10, + 2: 8, + 3: 8, + 4: 7, + 5: 6, + 6: 5, + 7: 4, + 8: 3, + 9: 2, + 10: 1, +}; + +function generateWeightedSequence( + length: number, + weights: Record +): number[] { + const sequence = []; + const weightedValues = []; + + // 构造根据权重扩展的值数组 + for (const [value, weight] of Object.entries(weights)) { + for (let i = 0; i < weight; i++) { + weightedValues.push(parseInt(value)); + } + } + + // 生成序列 + while (sequence.length < length) { + const randomIndex = Math.floor(Math.random() * weightedValues.length); + sequence.push(weightedValues[randomIndex]); + } + + return sequence; +} diff --git a/src/control_plane/__test__/environment.ts b/src/control_plane/__test__/environment.ts index 8cd8ee4..800c3e2 100644 --- a/src/control_plane/__test__/environment.ts +++ b/src/control_plane/__test__/environment.ts @@ -38,6 +38,7 @@ export class TestEnvironment extends MochaEnvironment { this.control = new ControlPlane({ clock: this.clock, containerManager: this.containerManager, + config: this.options?.config, }); await this.control.ready(); diff --git a/src/control_plane/__test__/scaling_contrast/scaling_contrast_ema.test.ts b/src/control_plane/__test__/scaling_contrast/scaling_contrast_ema.test.ts new file mode 100644 index 0000000..a0aa88a --- /dev/null +++ b/src/control_plane/__test__/scaling_contrast/scaling_contrast_ema.test.ts @@ -0,0 +1,186 @@ +import * as common from '#self/test/common'; +import { baselineDir } from '#self/test/common'; +import assert from 'assert'; +import { ControlPlane } from '#self/control_plane'; +import { DefaultController } from '#self/control_plane/controllers'; +import { DataPlaneClientManager } from '#self/control_plane/data_plane_client/manager'; +import { WorkerStatusReportEvent } from '#self/control_plane/events'; +import { WorkerLauncher } from '#self/control_plane/worker_launcher'; +import { StateManager } from '#self/control_plane/worker_stats/state_manager'; +import { WorkerStatusReport } from '#self/lib/constants'; +import { FunctionProfileManager } from '#self/lib/function_profile'; +import { TurfContainerStates } from '#self/lib/turf'; +import { DeepRequired } from '#self/lib/util'; +import sinon from 'sinon'; +import { TestEnvironment } from '../environment'; +import { registerWorkers } from '../util'; +import { noslated } from '#self/proto/root'; + +describe(common.testName(__filename), function () { + // Debug version of Node.js may take longer time to bootstrap. + this.timeout(30_000); + + const env = new TestEnvironment({ + createTestClock: true, + config: common.extendDefaultConfig({ + virtualMemoryPoolSize: '2gb', + controlPlane: { + useEmaScaling: true, + }, + }), + }); + + let controlPlane: ControlPlane; + let stateManager: StateManager; + let functionProfile: FunctionProfileManager; + let workerLauncher: WorkerLauncher; + let dataPlaneClientManager: DataPlaneClientManager; + let defaultController: DefaultController; + + beforeEach(async () => { + controlPlane = env.control; + stateManager = controlPlane._ctx.getInstance('stateManager'); + functionProfile = controlPlane._ctx.getInstance('functionProfile'); + workerLauncher = controlPlane._ctx.getInstance('workerLauncher'); + dataPlaneClientManager = controlPlane._ctx.getInstance( + 'dataPlaneClientManager' + ); + defaultController = controlPlane._ctx.getInstance('defaultController'); + }); + + /** + * 对照场景一: + * 对照原始扩容方案,当设置 1s 拉取一次,拉取 60 次,此时原方案遇到 brust 流量会扩容较多 + */ + it('should expand smooth when brust traffic', async () => { + // concurrency brust 10 + // legacy will expand 9 worker + // ema will expand 1 worker + await initWorker(functionProfile, env, stateManager); + + await env.testClock.tickAsync(1000); + + // brust 10 + await stateManager._syncBrokerData([ + generateBrokerData('aworker_echo_ema', 'aworker_echo_ema_1', 10), + ]); + + let tryLaunchCalled = 0; + + const stubTryLaunch = sinon + .stub(workerLauncher, 'tryLaunch') + .callsFake(async () => { + tryLaunchCalled++; + }); + + await defaultController['autoScale'](); + + assert.strictEqual(tryLaunchCalled, 1); + + tryLaunchCalled = 0; + stubTryLaunch.restore(); + }); + + /** + * 对照场景二: + * 对照原始扩容方案,当设置 10s 拉取一次,拉取 6 次,此时原方案如果第六次为 0,则会选择缩容。 + */ + it('should shrink smooth when traffic down to 0', async () => { + // concurrency decline 0 + // legacy will shrink 1 worker + // ema will keep worker + await initWorker(functionProfile, env, stateManager); + + for (let i = 0; i < 5; i++) { + await stateManager._syncBrokerData([ + generateBrokerData('aworker_echo_ema', 'aworker_echo_ema_1', 1), + ]); + await env.testClock.tickAsync(1000); + } + + // down to 0 + await stateManager._syncBrokerData([ + generateBrokerData('aworker_echo_ema', 'aworker_echo_ema_1', 0), + ]); + + await defaultController['autoScale'](); + + const spyTryLaunch = sinon.spy(workerLauncher, 'tryLaunch'); + const spyReduceCapacity = sinon.spy( + dataPlaneClientManager, + 'reduceCapacity' + ); + + assert(spyTryLaunch.notCalled); + assert(spyReduceCapacity.notCalled); + + spyTryLaunch.restore(); + spyReduceCapacity.restore(); + }); +}); + +function generateBrokerData( + funcName: string, + workerName: string, + activeRequestCount: number +): DeepRequired { + return { + functionName: funcName, + inspector: false, + workers: [ + { + name: workerName, + activeRequestCount, + }, + ], + }; +} + +async function initWorker( + functionProfile: FunctionProfileManager, + env: TestEnvironment, + stateManager: StateManager +) { + await functionProfile.setProfiles([ + { + name: 'aworker_echo_ema', + runtime: 'aworker', + url: `file://${baselineDir}/aworker_echo`, + sourceFile: 'index.js', + signature: 'md5:234234', + worker: { + maxActivateRequests: 1, + }, + resourceLimit: { + memory: 200 * 1024 * 1024, + }, + }, + ]); + + env.containerManager.setTestContainers([ + { + pid: 1, + name: 'aworker_echo_ema_1', + status: TurfContainerStates.running, + }, + ]); + + registerWorkers(stateManager, [ + { + funcName: 'aworker_echo_ema', + processName: 'aworker_echo_ema_1', + credential: 'aworker_echo_ema_1', + options: { inspect: false }, + toReserve: false, + }, + ]); + + stateManager._updateWorkerStatusByReport( + new WorkerStatusReportEvent({ + functionName: 'aworker_echo_ema', + name: 'aworker_echo_ema_1', + isInspector: false, + event: WorkerStatusReport.ContainerInstalled, + }) + ); +} diff --git a/src/control_plane/__test__/scaling_contrast/scaling_contrast_legacy_1.test.ts b/src/control_plane/__test__/scaling_contrast/scaling_contrast_legacy_1.test.ts new file mode 100644 index 0000000..f16f91e --- /dev/null +++ b/src/control_plane/__test__/scaling_contrast/scaling_contrast_legacy_1.test.ts @@ -0,0 +1,145 @@ +import * as common from '#self/test/common'; +import { baselineDir } from '#self/test/common'; +import assert from 'assert'; +import { ControlPlane } from '#self/control_plane'; +import { DefaultController } from '#self/control_plane/controllers'; +import { WorkerStatusReportEvent } from '#self/control_plane/events'; +import { WorkerLauncher } from '#self/control_plane/worker_launcher'; +import { StateManager } from '#self/control_plane/worker_stats/state_manager'; +import { WorkerStatusReport } from '#self/lib/constants'; +import { FunctionProfileManager } from '#self/lib/function_profile'; +import { TurfContainerStates } from '#self/lib/turf'; +import { DeepRequired } from '#self/lib/util'; +import sinon from 'sinon'; +import { TestEnvironment } from '../environment'; +import { registerWorkers } from '../util'; +import { noslated } from '#self/proto/root'; + +describe(common.testName(__filename), function () { + // Debug version of Node.js may take longer time to bootstrap. + this.timeout(30_000); + + const env = new TestEnvironment({ + createTestClock: true, + config: common.extendDefaultConfig({ + virtualMemoryPoolSize: '2gb', + controlPlane: { + workerTrafficStatsPullingMs: 1000, + workerRedundantVictimSpareTimes: 60, + }, + }), + }); + + let controlPlane: ControlPlane; + let stateManager: StateManager; + let functionProfile: FunctionProfileManager; + let workerLauncher: WorkerLauncher; + let defaultController: DefaultController; + + beforeEach(async () => { + controlPlane = env.control; + stateManager = controlPlane._ctx.getInstance('stateManager'); + functionProfile = controlPlane._ctx.getInstance('functionProfile'); + workerLauncher = controlPlane._ctx.getInstance('workerLauncher'); + defaultController = controlPlane._ctx.getInstance('defaultController'); + }); + + /** + * 对照场景一: + * 对照原始扩容方案,当设置 1s 拉取一次,拉取 60 次,此时该方案遇到 brust 流量会扩容较多 + */ + it('legacy scaling when brust traffic', async () => { + // concurrency brust 10 + // legacy will expand 9 worker + // ema will expand 1 worker + await initWorker(functionProfile, env, stateManager); + + await env.testClock.tickAsync(1000); + + // brust 10 + await stateManager._syncBrokerData([ + generateBrokerData('aworker_echo', 'aworker_echo_1', 10), + ]); + + let tryLaunchCalled = 0; + + const stubTryLaunch = sinon + .stub(workerLauncher, 'tryLaunch') + .callsFake(async () => { + tryLaunchCalled++; + }); + + await defaultController['autoScale'](); + + assert.strictEqual(tryLaunchCalled, 9); + + tryLaunchCalled = 0; + stubTryLaunch.restore(); + }); +}); + +function generateBrokerData( + funcName: string, + workerName: string, + activeRequestCount: number +): DeepRequired { + return { + functionName: funcName, + inspector: false, + workers: [ + { + name: workerName, + activeRequestCount, + }, + ], + }; +} + +async function initWorker( + functionProfile: FunctionProfileManager, + env: TestEnvironment, + stateManager: StateManager +) { + await functionProfile.setProfiles([ + { + name: 'aworker_echo', + runtime: 'aworker', + url: `file://${baselineDir}/aworker_echo`, + sourceFile: 'index.js', + signature: 'md5:234234', + worker: { + maxActivateRequests: 1, + }, + resourceLimit: { + memory: 200 * 1024 * 1024, + }, + }, + ]); + + env.containerManager.setTestContainers([ + { + pid: 1, + name: 'aworker_echo_1', + status: TurfContainerStates.running, + }, + ]); + + registerWorkers(stateManager, [ + { + funcName: 'aworker_echo', + processName: 'aworker_echo_1', + credential: 'aworker_echo_1', + options: { inspect: false }, + toReserve: false, + }, + ]); + + stateManager._updateWorkerStatusByReport( + new WorkerStatusReportEvent({ + functionName: 'aworker_echo', + name: 'aworker_echo_1', + isInspector: false, + event: WorkerStatusReport.ContainerInstalled, + }) + ); +} diff --git a/src/control_plane/__test__/scaling_contrast/scaling_contrast_legacy_2.test.ts b/src/control_plane/__test__/scaling_contrast/scaling_contrast_legacy_2.test.ts new file mode 100644 index 0000000..94db1b5 --- /dev/null +++ b/src/control_plane/__test__/scaling_contrast/scaling_contrast_legacy_2.test.ts @@ -0,0 +1,147 @@ +import * as common from '#self/test/common'; +import { baselineDir } from '#self/test/common'; +import assert from 'assert'; +import { ControlPlane } from '#self/control_plane'; +import { DefaultController } from '#self/control_plane/controllers'; +import { DataPlaneClientManager } from '#self/control_plane/data_plane_client/manager'; +import { WorkerStatusReportEvent } from '#self/control_plane/events'; +import { WorkerLauncher } from '#self/control_plane/worker_launcher'; +import { StateManager } from '#self/control_plane/worker_stats/state_manager'; +import { WorkerStatusReport } from '#self/lib/constants'; +import { FunctionProfileManager } from '#self/lib/function_profile'; +import { TurfContainerStates } from '#self/lib/turf'; +import { DeepRequired } from '#self/lib/util'; +import sinon from 'sinon'; +import { TestEnvironment } from '../environment'; +import { registerWorkers } from '../util'; +import { noslated } from '#self/proto/root'; + +describe(common.testName(__filename), function () { + // Debug version of Node.js may take longer time to bootstrap. + this.timeout(30_000); + + const env = new TestEnvironment({ + createTestClock: true, + }); + + let controlPlane: ControlPlane; + let stateManager: StateManager; + let functionProfile: FunctionProfileManager; + let workerLauncher: WorkerLauncher; + let dataPlaneClientManager: DataPlaneClientManager; + let defaultController: DefaultController; + + beforeEach(async () => { + controlPlane = env.control; + stateManager = controlPlane._ctx.getInstance('stateManager'); + functionProfile = controlPlane._ctx.getInstance('functionProfile'); + workerLauncher = controlPlane._ctx.getInstance('workerLauncher'); + dataPlaneClientManager = controlPlane._ctx.getInstance( + 'dataPlaneClientManager' + ); + defaultController = controlPlane._ctx.getInstance('defaultController'); + }); + + /** + * 对照场景二: + * 对照原始扩容方案,当设置 10s 拉取一次,拉取 6 次,此时该方案如果第六次为 0,则会选择缩容。 + */ + it('legacy scaling when traffic down to 0', async () => { + // concurrency decline 0 + // legacy will shrink 1 worker + // ema will keep worker + await initWorker(functionProfile, env, stateManager); + + // down to 0 + await stateManager._syncBrokerData([ + generateBrokerData('aworker_echo', 'aworker_echo_1', 0), + ]); + + stateManager.getBroker('aworker_echo', false)!.redundantTimes = 6; + + let reduceCapacityCalled = 0; + + const stubReduceCapacity = sinon + .stub(dataPlaneClientManager, 'reduceCapacity') + .callsFake(async (data: any) => { + const brokers = data.brokers; + + assert.strictEqual(brokers.length, 1); + assert.strictEqual(brokers[0].functionName, 'aworker_echo'); + + reduceCapacityCalled++; + return []; + }); + + await defaultController['autoScale'](); + assert.strictEqual(reduceCapacityCalled, 1); + + stubReduceCapacity.restore(); + }); +}); + +function generateBrokerData( + funcName: string, + workerName: string, + activeRequestCount: number +): DeepRequired { + return { + functionName: funcName, + inspector: false, + workers: [ + { + name: workerName, + activeRequestCount, + }, + ], + }; +} + +async function initWorker( + functionProfile: FunctionProfileManager, + env: TestEnvironment, + stateManager: StateManager +) { + await functionProfile.setProfiles([ + { + name: 'aworker_echo', + runtime: 'aworker', + url: `file://${baselineDir}/aworker_echo`, + sourceFile: 'index.js', + signature: 'md5:234234', + worker: { + maxActivateRequests: 1, + }, + resourceLimit: { + memory: 200 * 1024 * 1024, + }, + }, + ]); + + env.containerManager.setTestContainers([ + { + pid: 1, + name: 'aworker_echo_1', + status: TurfContainerStates.running, + }, + ]); + + registerWorkers(stateManager, [ + { + funcName: 'aworker_echo', + processName: 'aworker_echo_1', + credential: 'aworker_echo_1', + options: { inspect: false }, + toReserve: false, + }, + ]); + + stateManager._updateWorkerStatusByReport( + new WorkerStatusReportEvent({ + functionName: 'aworker_echo', + name: 'aworker_echo_1', + isInspector: false, + event: WorkerStatusReport.ContainerInstalled, + }) + ); +} diff --git a/src/control_plane/capacity_manager.ts b/src/control_plane/capacity_manager.ts index d9cbd58..46b348c 100644 --- a/src/control_plane/capacity_manager.ts +++ b/src/control_plane/capacity_manager.ts @@ -26,17 +26,18 @@ export class CapacityManager extends Base { private virtualMemoryPoolSize: number; private logger: Logger; private stateManager: StateManager; + private _useEmaScaling: boolean; constructor(ctx: ControlPlaneDependencyContext) { super(); const config = ctx.getInstance('config'); this.stateManager = ctx.getInstance('stateManager'); - this.virtualMemoryPoolSize = bytes(config.virtualMemoryPoolSize); this._shrinkRedundantTimes = config.controlPlane.workerRedundantVictimSpareTimes; this._scalingStage = config.controlPlane.capacityScalingStage; this.logger = loggers.get('capacity manager'); + this._useEmaScaling = config.controlPlane.useEmaScaling; } /** @@ -169,19 +170,85 @@ export class CapacityManager extends Base { } /** - * Evaluate the water level. - * @param {boolean} [expansionOnly] Whether do the expansion action only or not. - * @return {number} How much processes (workers) should be scale. (> 0 means expand, < 0 means shrink) + * 使用 ema concurrency 计算扩容水位 + * TODO: 进一步可以引入历史指标趋势计算 + * return 0; 什么都不做 + * return >0; 扩容 + * return <0; 缩容 + * @returns */ - evaluateWaterLevel(broker: Broker, expansionOnly = false) { - if (broker.disposable) { - return 0; - } + private evaluateWaterLevelByEMAConcurrency(broker: Broker) { + const { totalMaxActivateRequests } = broker; + const emaConcurrency = broker.getEMAConcurrency(); + const waterLevel = emaConcurrency / totalMaxActivateRequests; + const requiredCapacity = emaConcurrency / broker.scaleFactor; + const requiredWorkers = Math.ceil( + requiredCapacity / broker.profile.worker.maxActivateRequests + ); - if (!broker.activeWorkerCount) { - return 0; + let delta = 0; + const now = Date.now(); + + if ( + waterLevel > broker.concurrencyExpandThreshold || + broker.activeWorkerCount < broker.reservationCount + ) { + if (broker.isExpandCooldown(now)) { + this.logger.info( + '[Auto Scale] [%s] expand cooldown, ema concurrency is %d, water level is %d, required worker is %d, skip.', + broker.name, + emaConcurrency, + waterLevel, + requiredWorkers + ); + return 0; + } + const workersToAdd = + Math.max(requiredWorkers, broker.reservationCount) - + broker.activeWorkerCount; + + broker.resetExpandCooldownTime(now); + + delta = workersToAdd; + } else if ( + waterLevel < broker.concurrencyShrinkThreshold && + broker.activeWorkerCount > requiredWorkers + ) { + if (broker.isShirnkCooldown(now)) { + this.logger.info( + '[Auto Scale] [%s] shrink cooldown, ema concurrency is %d, water level is %d, required worker is %d, skip.', + broker.name, + emaConcurrency, + waterLevel, + requiredWorkers + ); + return 0; + } + const shouldRetainOneWorker = + broker.activeWorkerCount === 1 && emaConcurrency > 0; + const workersToRemove = shouldRetainOneWorker + ? 0 + : broker.activeWorkerCount - + Math.max(requiredWorkers, broker.reservationCount); + + broker.resetShrinkCooldownTime(now); + + delta = -workersToRemove; } + this.logger.info( + '[Auto Scale] ema concurrency is %d, water level is %d, required workers is %d, plan scale delta %d workers %s. ', + emaConcurrency, + waterLevel, + requiredWorkers, + delta, + broker.name + ); + + return delta; + } + + private evaluateWaterLevelLegacy(broker: Broker, expansionOnly: boolean) { const { totalMaxActivateRequests } = broker; const activeRequestCount = broker.getActiveRequestCount(); const waterLevel = activeRequestCount / totalMaxActivateRequests; @@ -263,6 +330,25 @@ export class CapacityManager extends Base { } } + /** + * Evaluate the water level. + * @param {boolean} [expansionOnly] Whether do the expansion action only or not. + * @return {number} How much processes (workers) should be scale. (> 0 means expand, < 0 means shrink) + */ + evaluateWaterLevel(broker: Broker, expansionOnly = false) { + if (broker.disposable) { + return 0; + } + + if (!broker.activeWorkerCount) { + return 0; + } + + return this._useEmaScaling + ? this.evaluateWaterLevelByEMAConcurrency(broker) + : this.evaluateWaterLevelLegacy(broker, expansionOnly); + } + assertExpandingAllowed( funcName: string, inspect: boolean, diff --git a/src/control_plane/data_plane_client/client.ts b/src/control_plane/data_plane_client/client.ts index 91a813a..05152f7 100644 --- a/src/control_plane/data_plane_client/client.ts +++ b/src/control_plane/data_plane_client/client.ts @@ -5,6 +5,7 @@ import { BasePlaneClient } from '#self/lib/base_plane_client'; import { Config } from '#self/config'; import * as root from '#self/proto/root'; import { EventBus } from '#self/lib/event-bus'; +import { Clock } from '#self/lib/clock'; // @ts-ignore protobuf's proprietary EventEmitter export interface DataPlaneClient extends root.noslated.data.DataPlane {} // eslint-disable-line @typescript-eslint/no-empty-interface @@ -12,7 +13,12 @@ export class DataPlaneClient extends BasePlaneClient { #serverSockPath: string; subscription: Subscription | null; - constructor(private eventBus: EventBus, planeId: number, config: Config) { + constructor( + private eventBus: EventBus, + planeId: number, + config: Config, + private _clock: Clock + ) { const dataPlaneSockPath = path.join( config.dirs.noslatedSock, `dp-${planeId}.sock` @@ -30,7 +36,8 @@ export class DataPlaneClient extends BasePlaneClient { this.subscription = new Subscription( this.eventBus, this, - this.config.controlPlane.workerTrafficStatsPullingMs + this.config.controlPlane.workerTrafficStatsPullingMs, + this._clock ); this.subscription.subscribe(); diff --git a/src/control_plane/data_plane_client/manager.ts b/src/control_plane/data_plane_client/manager.ts index 49ceeca..92478d9 100644 --- a/src/control_plane/data_plane_client/manager.ts +++ b/src/control_plane/data_plane_client/manager.ts @@ -7,6 +7,7 @@ import { RawFunctionProfile } from '#self/lib/json/function_profile'; import { ControlPlaneDependencyContext } from '../deps'; import { FunctionProfileManager } from '#self/lib/function_profile'; import { EventBus } from '#self/lib/event-bus'; +import { Clock } from '#self/lib/clock'; /** * Data plane client manager @@ -14,6 +15,7 @@ import { EventBus } from '#self/lib/event-bus'; export class DataPlaneClientManager extends BasePlaneClientManager { private _functionProfile: FunctionProfileManager; private _eventBus: EventBus; + private _clock: Clock; constructor(ctx: ControlPlaneDependencyContext) { const config = ctx.getInstance('config'); @@ -24,6 +26,7 @@ export class DataPlaneClientManager extends BasePlaneClientManager { ); this._functionProfile = ctx.getInstance('functionProfile'); this._eventBus = ctx.getInstance('eventBus'); + this._clock = ctx.getInstance('clock'); } /** @@ -32,7 +35,12 @@ export class DataPlaneClientManager extends BasePlaneClientManager { * @return {DataPlaneGuest} The created plane client. */ _createPlaneClient(planeId: number): DataPlaneClient { - return new DataPlaneClient(this._eventBus, planeId, this.config); + return new DataPlaneClient( + this._eventBus, + planeId, + this.config, + this._clock + ); } /** diff --git a/src/control_plane/data_plane_client/subscription.ts b/src/control_plane/data_plane_client/subscription.ts index 8a97e01..056b53b 100644 --- a/src/control_plane/data_plane_client/subscription.ts +++ b/src/control_plane/data_plane_client/subscription.ts @@ -9,6 +9,7 @@ import { WorkerStatusReportEvent, WorkerTrafficStatsEvent, } from '../events'; +import { Clock } from '#self/lib/clock'; export class DataPlaneSubscription { static SubscriptionNames = ['requestQueueing', 'containerStatusReport']; @@ -21,7 +22,8 @@ export class DataPlaneSubscription { constructor( private eventBus: EventBus, private client: DataPlaneClient, - private pullingInterval: number + private pullingInterval: number, + private _clock: Clock ) { this.client = client; this.logger = loggers.get('data plane subscription'); @@ -90,10 +92,10 @@ export class DataPlaneSubscription { if (this.closed) { return; } - this.trafficStatsTimeout = setTimeout( + this.trafficStatsTimeout = this._clock.setTimeout( this._onWorkerTrafficStatsTimeout, this.pullingInterval - ); + ) as NodeJS.Timeout; }); }; @@ -114,10 +116,10 @@ export class DataPlaneSubscription { } }); } - this.trafficStatsTimeout = setTimeout( + this.trafficStatsTimeout = this._clock.setTimeout( this._onWorkerTrafficStatsTimeout, this.pullingInterval - ); + ) as NodeJS.Timeout; } unsubscribe() { diff --git a/src/control_plane/worker_stats/broker.ts b/src/control_plane/worker_stats/broker.ts index 75fcf93..789ce25 100644 --- a/src/control_plane/worker_stats/broker.ts +++ b/src/control_plane/worker_stats/broker.ts @@ -3,7 +3,8 @@ import { RawWithDefaultsFunctionProfile, ReadonlyProfile, } from '#self/lib/json/function_profile'; -import { Worker, WorkerMetadata, WorkerStats } from './worker'; +import { Worker, WorkerMetadata } from './worker'; +import { EMAConcurrency } from '#self/lib/ema_concurrency'; class Broker { redundantTimes: number; @@ -16,15 +17,33 @@ class Broker { private _initiatingWorkerCount = 0; private _activeWorkerCount = 0; - - constructor(profile: RawWithDefaultsFunctionProfile, isInspector: boolean) { + private _emaConcurrency: EMAConcurrency | null = null; + private _lastExpandTime: number; + private _lastShrinkTime: number; + + constructor( + profile: RawWithDefaultsFunctionProfile, + isInspector: boolean, + private _useEmaScaling: boolean + ) { this.#profile = profile; this.name = this.#profile.name; this.isInspector = !!isInspector; + if (this._useEmaScaling) { + this._emaConcurrency = new EMAConcurrency( + this.concurrencySlidingWindowSize, + this.concurrencySlidingBucketCount, + this.emaConcurrencyAlpha, + this.precisionZeroThreshold + ); + } + this.workers = new Map(); this.redundantTimes = 0; + this._lastExpandTime = 0; + this._lastShrinkTime = 0; } get runtime() { @@ -55,6 +74,58 @@ class Broker { return this.#profile; } + get concurrencySlidingWindowSize() { + return this.#profile.worker.concurrencySlidingWindowSize || 60 * 1000; + } + + get concurrencySlidingBucketCount() { + return this.#profile.worker.concurrencySlidingBucketCount || 6; + } + + get emaConcurrencyAlpha() { + return this.#profile.worker.emaConcurrencyAlpha || 0.5; + } + + get concurrencyExpandThreshold() { + return this.#profile.worker.concurrencyExpandThreshold || 0.7; + } + + get concurrencyShrinkThreshold() { + return this.#profile.worker.concurrencyShrinkThreshold || 0.3; + } + + get expandCooldown() { + return this.#profile.worker.expandCooldown || 1000; + } + + get shrinkCooldown() { + return this.#profile.worker.shrinkCooldown || 60 * 1000; + } + + get scaleFactor() { + return this.#profile.worker.scaleFactor || 0.5; + } + + get precisionZeroThreshold() { + return this.#profile.worker.precisionZeroThreshold || 0.01; + } + + isExpandCooldown(now: number) { + return now - this._lastExpandTime < this.expandCooldown; + } + + isShirnkCooldown(now: number) { + return now - this._lastShrinkTime < this.shrinkCooldown; + } + + resetExpandCooldownTime(now: number) { + this._lastExpandTime = now; + } + + resetShrinkCooldownTime(now: number) { + this._lastShrinkTime = now; + } + /** * Register worker. * @param processName The process name (worker name). @@ -134,6 +205,20 @@ class Broker { return a; } + recalculateConcurrency() { + if (!this._useEmaScaling) return; + + const current = this.getActiveRequestCount(); + + this._emaConcurrency!.recalculate(current); + } + + getEMAConcurrency() { + if (!this._useEmaScaling) return this.getActiveRequestCount(); + + return this._emaConcurrency!.concurrency(); + } + /** * Get worker. * @param processName The process name (worker name). diff --git a/src/control_plane/worker_stats/state_manager.ts b/src/control_plane/worker_stats/state_manager.ts index 3d67261..0d11de4 100644 --- a/src/control_plane/worker_stats/state_manager.ts +++ b/src/control_plane/worker_stats/state_manager.ts @@ -32,6 +32,8 @@ export class StateManager extends Base { private _gcWorkers: Set = new Set(); + private _useEmaScaling: boolean; + constructor(ctx: ControlPlaneDependencyContext) { super(); this._logger = loggers.get('state manager'); @@ -62,6 +64,7 @@ export class StateManager extends Base { }); this._statLogger = new StatLogger(this._config); + this._useEmaScaling = this._config.controlPlane.useEmaScaling; } _updateWorkerStatusByReport(eve: WorkerStatusReportEvent) { @@ -137,6 +140,8 @@ export class StateManager extends Base { }`; worker.sync(allSyncData.get(name) ?? null); } + + broker.recalculateConcurrency(); } allSyncData.clear(); @@ -154,7 +159,7 @@ export class StateManager extends Base { if (broker) return broker; const profile = this._functionProfile.getProfile(functionName); if (profile == null) return null; - broker = new Broker(profile, isInspector); + broker = new Broker(profile, isInspector, this._useEmaScaling); this._brokers.set(Broker.getKey(functionName, isInspector), broker); return broker; } diff --git a/src/lib/__test__/ema_concurrency.test.ts b/src/lib/__test__/ema_concurrency.test.ts new file mode 100644 index 0000000..130e9f1 --- /dev/null +++ b/src/lib/__test__/ema_concurrency.test.ts @@ -0,0 +1,101 @@ +import * as common from '#self/test/common'; +import { EMAConcurrency } from '#self/lib/ema_concurrency'; +import sinon from 'sinon'; +import assert from 'assert'; + +const DELTA = 1e-10; // 允许的误差范围 + +describe(common.testName(__filename), () => { + const windowSize = 60000; + const bucketCount = 6; + const alpha = 0.5; + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + }); + + afterEach(() => { + clock.restore(); + }); + + it('should initialize properly', () => { + const ema = new EMAConcurrency(windowSize, bucketCount, alpha); + assert.strictEqual(ema.concurrency(), 0); + }); + + it('should update concurrency correctly within the same bucket', () => { + const ema = new EMAConcurrency(windowSize, bucketCount, alpha); + ema.recalculate(10); + // 10 0 0 0 0 0 + assert.strictEqual(ema.concurrency(), (10 / 6) * alpha); + // bucket time size 10s,same bucket + clock.tick(5000); + // 20 0 0 0 0 0 + ema.recalculate(20); + // 需要参考之前的计算结果,新的 ema 结果是之前结果的 50% + 新的均值 50% + const expectedEMA = + alpha * (20 / bucketCount) + alpha * ((10 / bucketCount) * alpha); + assert(Math.abs(ema.concurrency() - expectedEMA) < DELTA); + }); + + it('should clear the correct buckets when time advances but not clear the emaConcurrency', () => { + const ema = new EMAConcurrency(windowSize, bucketCount, alpha); + ema.recalculate(10); + // 10 0 0 0 0 0 + // 跳两个 bucket + clock.tick(20000); + // 10 0 20 0 0 0 + ema.recalculate(20); + // 因为过程中没有重新计算 ema,旧值仍会影响新值 + const previousEMA = (10 / bucketCount) * alpha; + const newBucketValue = 20 / bucketCount; + const expectedEMA = alpha * newBucketValue + alpha * previousEMA; + assert(Math.abs(ema.concurrency() - expectedEMA) < DELTA); + }); + + it('should not reset emaConcurrency when time exceeds window size', () => { + const ema = new EMAConcurrency(windowSize, bucketCount, alpha); + ema.recalculate(10); + // 10 0 0 0 0 0 + // 跳过一个 windowSize + clock.tick(60000); + // 10 20 0 0 0 0 + ema.recalculate(20); + // 因为过程中没有重新计算 ema,旧值仍会影响新值 + const previousEMA = (10 / bucketCount) * alpha; + const newBucketValue = 20 / bucketCount; + const expectedEMA = alpha * newBucketValue + alpha * previousEMA; + assert(Math.abs(ema.concurrency() - expectedEMA) < DELTA); + }); + + it('should handle recalculation after long periods of inactivity', () => { + const ema = new EMAConcurrency(windowSize, bucketCount, alpha); + // 10 0 0 0 0 0 + ema.recalculate(10); + // 过了 5min + clock.tick(300000); + // 20 0 0 0 0 0 + ema.recalculate(20); + // 因为过程中没有重新计算 ema,旧值仍会影响新值 + const newBucketValue = 20 / bucketCount; + const previousEMA = (10 / bucketCount) * alpha; + const expectedEMA = alpha * newBucketValue + alpha * previousEMA; + assert(Math.abs(ema.concurrency() - expectedEMA) < DELTA); + }); + + it('should gradually reduce emaConcurrency to near zero with prolonged zero activity', () => { + const ema = new EMAConcurrency(windowSize, bucketCount, alpha); + + // 初始设定并发度为 1 + ema.recalculate(1); + + // 每个 windowSize 更新一次 recalculate(0), 模拟长时间的零活动 + for (let i = 0; i < 300000; i += windowSize) { + clock.tick(windowSize); + ema.recalculate(0); + } + + assert.strictEqual(ema.concurrency(), 0); + }); +}); diff --git a/src/lib/ema_concurrency.ts b/src/lib/ema_concurrency.ts new file mode 100644 index 0000000..263f5dc --- /dev/null +++ b/src/lib/ema_concurrency.ts @@ -0,0 +1,79 @@ +/** + * 使用 指数移动平均 计算并发度 + */ +export class EMAConcurrency { + // 指数移动平均并发度 + private emaConcurrency = 0; + private buckets: number[]; + private lastUpdateTime: number; + + /** + * @param windowSize 滑动窗口大小,单位 ms + * @param bucketCount 窗口时间分桶数 + * @param alpha 指数平滑系数(0 < alpha <= 1) + * @param precisionZeroThreshold emaConcurrency 小于该值则视为 0 + */ + constructor( + private windowSize: number, + private bucketCount: number, + private alpha: number, + private precisionZeroThreshold: number = 0.01 + ) { + this.buckets = Array(this.bucketCount).fill(0); + this.lastUpdateTime = Date.now(); + } + + recalculate(concurrency: number) { + const now = Date.now(); + const timeDelta = now - this.lastUpdateTime; + const bucketIndex = Math.floor( + (now % this.windowSize) / (this.windowSize / this.bucketCount) + ); + + // (] 左开右闭 + if (timeDelta >= this.windowSize) { + // 如果距离上次更新超过了窗口大小,则清空所有桶 + this.buckets.fill(0); + // 不重置 emaConcurrency,防止短期的负载峰值频繁扩缩容 + // TODO: 观察下如果需要快速响应,则直接重置为 0 + if (this.emaConcurrency < this.precisionZeroThreshold) { + this.emaConcurrency = 0; + } + } else { + // 清空即将更新的桶,因为它已经不再代表最新时间段的数据 + this.buckets[bucketIndex] = 0; + + // 如果时间跨度超过了单个桶的大小, + // 清空所有在上次更新时间和当前时间之间的桶 + if (timeDelta > this.windowSize / this.bucketCount) { + const bucketsToClear = Math.ceil( + timeDelta / (this.windowSize / this.bucketCount) + ); + for (let i = 1; i <= bucketsToClear; i++) { + const indexToClear = + (bucketIndex - i + this.bucketCount) % this.bucketCount; + this.buckets[indexToClear] = 0; + } + } + } + + // 更新当前桶的并发度,覆盖,防止累计值将并发度拉高 + this.buckets[bucketIndex] = concurrency; + + // 更新 EMA 并发度 + const averageConcurrency = + this.buckets.reduce((sum, concurrency) => sum + concurrency, 0) / + this.bucketCount; + this.emaConcurrency = + this.alpha * averageConcurrency + (1 - this.alpha) * this.emaConcurrency; + + if (this.emaConcurrency < this.precisionZeroThreshold) { + this.emaConcurrency = 0; + } + this.lastUpdateTime = now; + } + + concurrency() { + return this.emaConcurrency; + } +} diff --git a/src/lib/json/function_profile.ts b/src/lib/json/function_profile.ts index 6688429..645352b 100644 --- a/src/lib/json/function_profile.ts +++ b/src/lib/json/function_profile.ts @@ -31,6 +31,24 @@ export interface ProcessFunctionProfile { * Not applicable if `disposable` is true. */ dispatchMode?: DispatchMode; + // 并发度滑动时间窗口大小,单位 ms,默认 60s + concurrencySlidingWindowSize?: number; + // 并发度滑动时间窗口分桶数,默认 6 + concurrencySlidingBucketCount?: number; + // 指数移动平均平滑系数 (0,1],默认 0.5 + emaConcurrencyAlpha?: number; + // 并发度扩容水位阈值,默认 0.7 + concurrencyExpandThreshold?: number; + // 并发度缩容水位阈值,默认 0.3 + concurrencyShrinkThreshold?: number; + // 扩容冷却时间,单位 ms,默认 1s + expandCooldown?: number; + // 缩容冷却时间,单位 ms,默认 60s + shrinkCooldown?: number; + // 扩缩容后并发度水位,影响扩缩容操作数量 + scaleFactor?: number; + // ema concurrency 小于该值则视为 0 + precisionZeroThreshold?: number; }; environments?: { key: string; diff --git a/src/lib/json/function_profile_schema.json b/src/lib/json/function_profile_schema.json index 83266a3..b2591a7 100644 --- a/src/lib/json/function_profile_schema.json +++ b/src/lib/json/function_profile_schema.json @@ -2,7 +2,7 @@ "$schema": "http://json-schema.org/draft-04/schema#", "title": "Noslated Function Metadata", "description": "the metadata schema for Functions in Noslated", - "type":"array", + "type": "array", "uniqueItems": true, "additionalItems": false, "definitions": { @@ -34,10 +34,7 @@ "description": "environment value" } }, - "required": [ - "key", - "value" - ] + "required": ["key", "value"] } }, "worker": { @@ -70,7 +67,7 @@ "shrinkStrategy": { "type": "string", "description": "The shrink strategy", - "enum": [ "FILO", "FIFO", "LCC" ] + "enum": ["FILO", "FIFO", "LCC"] }, "v8Options": { "type": "array", @@ -98,6 +95,42 @@ "type": "string", "description": "The dispatching mode", "enum": ["least-request-count", "round-robin"] + }, + "concurrencySlidingWindowSize": { + "type": "number", + "description": "Slide window size for ema concurrency" + }, + "concurrencySlidingBucketCount": { + "type": "number", + "description": "Slide window size time bucket count" + }, + "emaConcurrencyAlpha": { + "type": "number", + "description": "EMA alpha" + }, + "concurrencyExpandThreshold": { + "type": "number", + "description": "Concurrency threshold to expand" + }, + "concurrencyShrinkThreshold": { + "type": "number", + "description": "Concurrency threshold to shrink" + }, + "expandCooldown": { + "type": "number", + "description": "Expand cooldown time" + }, + "shrinkCooldown": { + "type": "number", + "description": "Shrink cooldown time" + }, + "scaleFactor": { + "type": "number", + "description": "Scale worker count factor" + }, + "precisionZeroThreshold": { + "type": "number", + "description": "If ema concurrency small then this threshold, it will be set to zero" } }, "additionalProperties": true @@ -121,126 +154,111 @@ } }, "items": { - "oneOf": [{ - "additionalProperties": false, - "type": "object", - "properties": { - "name": { - "type": "string", - "description": "the function name" - }, - "runtime": { - "type": "string", - "enum": [ "nodejs" ], - "description": "the function runtime" - }, - "url": { - "type": "string", - "description": "the code download URL", - "examples": [ - "file:///home/admin/code/A", - "https://oss/foobar/A.zip" - ] - }, - "signature": { - "type": "string", - "description": "the code sign", - "examples": [ - "md5:xxx" - ] - }, - "handler": { - "type": "string", - "pattern": "^[^\\.]+\\.[^\\.]+$", - "description": "the function handler. e.g. foo.bar" - }, - "initializer": { - "type": "string", - "pattern": "^[^\\.]+\\.[^\\.]+$", - "description": "the function initializer. e.g. foo.bar" - }, - "rateLimit": { - "$ref": "#/definitions/rateLimit" - }, - "resourceLimit": { - "$ref": "#/definitions/resourceLimit" - }, - "worker": { - "$ref": "#/definitions/worker" - }, - "environments": { - "$ref": "#/definitions/environments" + "oneOf": [ + { + "additionalProperties": false, + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "the function name" + }, + "runtime": { + "type": "string", + "enum": ["nodejs"], + "description": "the function runtime" + }, + "url": { + "type": "string", + "description": "the code download URL", + "examples": [ + "file:///home/admin/code/A", + "https://oss/foobar/A.zip" + ] + }, + "signature": { + "type": "string", + "description": "the code sign", + "examples": ["md5:xxx"] + }, + "handler": { + "type": "string", + "pattern": "^[^\\.]+\\.[^\\.]+$", + "description": "the function handler. e.g. foo.bar" + }, + "initializer": { + "type": "string", + "pattern": "^[^\\.]+\\.[^\\.]+$", + "description": "the function initializer. e.g. foo.bar" + }, + "rateLimit": { + "$ref": "#/definitions/rateLimit" + }, + "resourceLimit": { + "$ref": "#/definitions/resourceLimit" + }, + "worker": { + "$ref": "#/definitions/worker" + }, + "environments": { + "$ref": "#/definitions/environments" + }, + "namespace": { + "type": "string", + "description": "worker related resources namespace, eg. kv_storage" + } }, - "namespace": { - "type": "string", - "description": "worker related resources namespace, eg. kv_storage" - } + "required": ["name", "runtime", "url", "signature", "handler"] }, - "required": [ - "name", - "runtime", - "url", - "signature", - "handler" - ] - }, { - "additionalProperties": false, - "type": "object", - "properties": { - "name": { - "type": "string", - "description": "the function name" - }, - "runtime": { - "type": "string", - "enum": [ - "aworker" - ], - "description": "the function runtime" - }, - "url": { - "type": "string", - "description": "the code download URL", - "examples": [ - "file:///home/admin/code/A", - "https://oss/foobar/A.zip" - ] - }, - "signature": { - "type": "string", - "description": "the code sign", - "examples": [ - "md5:xxx" - ] - }, - "sourceFile": { - "type": "string", - "description": "the source code file of aworker.js function" - }, - "rateLimit": { - "$ref": "#/definitions/rateLimit" - }, - "resourceLimit": { - "$ref": "#/definitions/resourceLimit" - }, - "worker": { - "$ref": "#/definitions/worker" - }, - "environments": { - "$ref": "#/definitions/environments" + { + "additionalProperties": false, + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "the function name" + }, + "runtime": { + "type": "string", + "enum": ["aworker"], + "description": "the function runtime" + }, + "url": { + "type": "string", + "description": "the code download URL", + "examples": [ + "file:///home/admin/code/A", + "https://oss/foobar/A.zip" + ] + }, + "signature": { + "type": "string", + "description": "the code sign", + "examples": ["md5:xxx"] + }, + "sourceFile": { + "type": "string", + "description": "the source code file of aworker.js function" + }, + "rateLimit": { + "$ref": "#/definitions/rateLimit" + }, + "resourceLimit": { + "$ref": "#/definitions/resourceLimit" + }, + "worker": { + "$ref": "#/definitions/worker" + }, + "environments": { + "$ref": "#/definitions/environments" + }, + "namespace": { + "type": "string", + "description": "worker related resources namespace, eg. kv_storage" + } }, - "namespace": { - "type": "string", - "description": "worker related resources namespace, eg. kv_storage" - } - }, - "required": [ - "name", - "runtime", - "url", - "signature", - "sourceFile" - ] - }] + "required": ["name", "runtime", "url", "signature", "sourceFile"] + } + ] } } diff --git a/src/sdk/client.ts b/src/sdk/client.ts index 10d6ca6..e1a1086 100644 --- a/src/sdk/client.ts +++ b/src/sdk/client.ts @@ -162,7 +162,7 @@ export class NoslatedClient extends EventEmitter { } async checkControlPlaneHealth( - timeout: number = 1000 + timeout = 1000 ): Promise { try { const client = this.controlPlaneClientManager.sample(); @@ -181,7 +181,7 @@ export class NoslatedClient extends EventEmitter { } async checkDataPlaneHealth( - timeout: number = 1000 + timeout = 1000 ): Promise { const client = this.dataPlaneClientManager.sample();