Skip to content

Commit

Permalink
feat: use ema to scale (#92)
Browse files Browse the repository at this point in the history
* feat: use ema to scale
  • Loading branch information
mariodu authored Dec 28, 2023
1 parent 3f79242 commit 5b2cdaa
Show file tree
Hide file tree
Showing 20 changed files with 1,174 additions and 149 deletions.
9 changes: 9 additions & 0 deletions proto/noslated/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ message FunctionProfileWorker {
bool disableSeed = 10;
bool disableRequestQueue = 11;
string dispatchMode = 12;
int32 concurrencySlidingWindowSize = 13;
int32 concurrencySlidingBucketCount = 14;
float emaConcurrencyAlpha = 15;
float concurrencyExpandThreshold = 16;
float concurrencyShrinkThreshold = 17;
int32 expandCooldown = 18;
int32 shrinkCooldown = 19;
float scaleFactor = 20;
float precisionZeroThreshold = 21;
}

message PlaneHealthyResponse {
Expand Down
1 change: 1 addition & 0 deletions src/config/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export default {
workerTrafficStatsPullingMs: 10_000,
workerRedundantVictimSpareTimes: 6,
capacityScalingStage: 0.7,
useEmaScaling: false,
},

dataPlane: {
Expand Down
6 changes: 6 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ export interface ControlPlaneConfig {
* Default: 0.7
*/
capacityScalingStage: number;
/**
* 使用基于时间窗口的移动加权平均算法进行扩容
* 默认为 false
* 此配置开启会强制设置 workerTrafficStatsPullingMs 为 1000ms,确保数据同步及时
*/
useEmaScaling: boolean;
}

export interface DatePlaneConfig {
Expand Down
10 changes: 9 additions & 1 deletion src/config/loader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ export function resolveConfig(): Config {
);
const userFileConfig = loadConfig(process.env.NOSLATED_CONFIG_PATH);
const envConfig = resolveEnvConfig();
return extend(

const finalConfig = extend(
/** deep */ true,
config,
defaultConfig,
Expand All @@ -125,6 +126,13 @@ export function resolveConfig(): Config {
userFileConfig,
envConfig
);

// 开启 useEmaScaling 时,强制 workerTrafficStatsPullingMs 为 1000ms,确保数据同步及时
if (finalConfig.controlPlane.useEmaScaling === true) {
finalConfig.controlPlane.workerTrafficStatsPullingMs = 1000;
}

return finalConfig;
}

export function dumpConfig(name: string, config: Config) {
Expand Down
113 changes: 113 additions & 0 deletions src/control_plane/__test__/e2e/ema_scaling.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import * as common from '#self/test/common';
import { baselineDir } from '#self/test/common';
import assert from 'assert';
import { bufferFromStream, sleep } from '#self/lib/util';
import { DefaultEnvironment } from '#self/test/env/environment';

/**
* 仅观测行为
*/
describe(common.testName(__filename), function () {
// Debug version of Node.js may take longer time to bootstrap.
this.timeout(30_000);

const env = new DefaultEnvironment({
createTestClock: true,
config: common.extendDefaultConfig({
virtualMemoryPoolSize: '4gb',
controlPlane: {
useEmaScaling: true,
workerTrafficStatsPullingMs: 1000,
},
}),
});

it('should scaling smooth when traffic change', async () => {
await env.agent.setFunctionProfile([
{
name: 'aworker_echo_ema',
runtime: 'aworker',
url: `file://${baselineDir}/aworker_echo`,
sourceFile: 'sleep.js',
signature: 'md5:234234',
worker: {
maxActivateRequests: 1,
replicaCountLimit: 15,
},
resourceLimit: {
memory: 200 * 1024 * 1024,
},
},
]);

const sequence = [1, 1, 1, 10, 1, 6, 5, 0, 0, 0];

for (const concurrency of sequence) {
await makeConcurrencyRequest('aworker_echo_ema', concurrency, env);
await sleep(1000);
}

await sleep(1000);
});
});

async function request(functionName: string, env: DefaultEnvironment) {
const data = Buffer.from('1000');

const response = await env.agent.invoke(functionName, data, {
method: 'POST',
});

assert.strictEqual(response.status, 200);

return await bufferFromStream(response);
}

function makeConcurrencyRequest(
functionName: string,
concurrency: number,
env: DefaultEnvironment
) {
const requests = new Array(concurrency).fill(0).map(() => {
return request(functionName, env);
});

return Promise.all(requests);
}

const WEIGHTS = {
0: 10,
1: 10,
2: 8,
3: 8,
4: 7,
5: 6,
6: 5,
7: 4,
8: 3,
9: 2,
10: 1,
};

function generateWeightedSequence(
length: number,
weights: Record<number, number>
): number[] {
const sequence = [];
const weightedValues = [];

// 构造根据权重扩展的值数组
for (const [value, weight] of Object.entries(weights)) {
for (let i = 0; i < weight; i++) {
weightedValues.push(parseInt(value));
}
}

// 生成序列
while (sequence.length < length) {
const randomIndex = Math.floor(Math.random() * weightedValues.length);
sequence.push(weightedValues[randomIndex]);
}

return sequence;
}
1 change: 1 addition & 0 deletions src/control_plane/__test__/environment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export class TestEnvironment extends MochaEnvironment {
this.control = new ControlPlane({
clock: this.clock,
containerManager: this.containerManager,
config: this.options?.config,
});

await this.control.ready();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import * as common from '#self/test/common';
import { baselineDir } from '#self/test/common';
import assert from 'assert';
import { ControlPlane } from '#self/control_plane';
import { DefaultController } from '#self/control_plane/controllers';
import { DataPlaneClientManager } from '#self/control_plane/data_plane_client/manager';
import { WorkerStatusReportEvent } from '#self/control_plane/events';
import { WorkerLauncher } from '#self/control_plane/worker_launcher';
import { StateManager } from '#self/control_plane/worker_stats/state_manager';
import { WorkerStatusReport } from '#self/lib/constants';
import { FunctionProfileManager } from '#self/lib/function_profile';
import { TurfContainerStates } from '#self/lib/turf';
import { DeepRequired } from '#self/lib/util';
import sinon from 'sinon';
import { TestEnvironment } from '../environment';
import { registerWorkers } from '../util';
import { noslated } from '#self/proto/root';

describe(common.testName(__filename), function () {
// Debug version of Node.js may take longer time to bootstrap.
this.timeout(30_000);

const env = new TestEnvironment({
createTestClock: true,
config: common.extendDefaultConfig({
virtualMemoryPoolSize: '2gb',
controlPlane: {
useEmaScaling: true,
},
}),
});

let controlPlane: ControlPlane;
let stateManager: StateManager;
let functionProfile: FunctionProfileManager;
let workerLauncher: WorkerLauncher;
let dataPlaneClientManager: DataPlaneClientManager;
let defaultController: DefaultController;

beforeEach(async () => {
controlPlane = env.control;
stateManager = controlPlane._ctx.getInstance('stateManager');
functionProfile = controlPlane._ctx.getInstance('functionProfile');
workerLauncher = controlPlane._ctx.getInstance('workerLauncher');
dataPlaneClientManager = controlPlane._ctx.getInstance(
'dataPlaneClientManager'
);
defaultController = controlPlane._ctx.getInstance('defaultController');
});

/**
* 对照场景一:
* 对照原始扩容方案,当设置 1s 拉取一次,拉取 60 次,此时原方案遇到 brust 流量会扩容较多
*/
it('should expand smooth when brust traffic', async () => {
// concurrency brust 10
// legacy will expand 9 worker
// ema will expand 1 worker
await initWorker(functionProfile, env, stateManager);

await env.testClock.tickAsync(1000);

// brust 10
await stateManager._syncBrokerData([
generateBrokerData('aworker_echo_ema', 'aworker_echo_ema_1', 10),
]);

let tryLaunchCalled = 0;

const stubTryLaunch = sinon
.stub(workerLauncher, 'tryLaunch')
.callsFake(async () => {
tryLaunchCalled++;
});

await defaultController['autoScale']();

assert.strictEqual(tryLaunchCalled, 1);

tryLaunchCalled = 0;
stubTryLaunch.restore();
});

/**
* 对照场景二:
* 对照原始扩容方案,当设置 10s 拉取一次,拉取 6 次,此时原方案如果第六次为 0,则会选择缩容。
*/
it('should shrink smooth when traffic down to 0', async () => {
// concurrency decline 0
// legacy will shrink 1 worker
// ema will keep worker
await initWorker(functionProfile, env, stateManager);

for (let i = 0; i < 5; i++) {
await stateManager._syncBrokerData([
generateBrokerData('aworker_echo_ema', 'aworker_echo_ema_1', 1),
]);
await env.testClock.tickAsync(1000);
}

// down to 0
await stateManager._syncBrokerData([
generateBrokerData('aworker_echo_ema', 'aworker_echo_ema_1', 0),
]);

await defaultController['autoScale']();

const spyTryLaunch = sinon.spy(workerLauncher, 'tryLaunch');
const spyReduceCapacity = sinon.spy(
dataPlaneClientManager,
'reduceCapacity'
);

assert(spyTryLaunch.notCalled);
assert(spyReduceCapacity.notCalled);

spyTryLaunch.restore();
spyReduceCapacity.restore();
});
});

function generateBrokerData(
funcName: string,
workerName: string,
activeRequestCount: number
): DeepRequired<noslated.data.IBrokerStats> {
return {
functionName: funcName,
inspector: false,
workers: [
{
name: workerName,
activeRequestCount,
},
],
};
}

async function initWorker(
functionProfile: FunctionProfileManager,
env: TestEnvironment,
stateManager: StateManager
) {
await functionProfile.setProfiles([
{
name: 'aworker_echo_ema',
runtime: 'aworker',
url: `file://${baselineDir}/aworker_echo`,
sourceFile: 'index.js',
signature: 'md5:234234',
worker: {
maxActivateRequests: 1,
},
resourceLimit: {
memory: 200 * 1024 * 1024,
},
},
]);

env.containerManager.setTestContainers([
{
pid: 1,
name: 'aworker_echo_ema_1',
status: TurfContainerStates.running,
},
]);

registerWorkers(stateManager, [
{
funcName: 'aworker_echo_ema',
processName: 'aworker_echo_ema_1',
credential: 'aworker_echo_ema_1',
options: { inspect: false },
toReserve: false,
},
]);

stateManager._updateWorkerStatusByReport(
new WorkerStatusReportEvent({
functionName: 'aworker_echo_ema',
name: 'aworker_echo_ema_1',
isInspector: false,
event: WorkerStatusReport.ContainerInstalled,
})
);
}
Loading

0 comments on commit 5b2cdaa

Please sign in to comment.