Skip to content

Commit

Permalink
Feat: borker concurrency stats (#93)
Browse files Browse the repository at this point in the history
* feat: broker concurrency stats
  • Loading branch information
mariodu authored Dec 29, 2023
1 parent 5b2cdaa commit 39f87b4
Show file tree
Hide file tree
Showing 24 changed files with 513 additions and 19 deletions.
1 change: 1 addition & 0 deletions proto/noslated/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ message FunctionProfileWorker {
int32 shrinkCooldown = 19;
float scaleFactor = 20;
float precisionZeroThreshold = 21;
string concurrencyStatsMode = 22;
}

message PlaneHealthyResponse {
Expand Down
1 change: 1 addition & 0 deletions proto/noslated/data-plane.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ message BrokerStats {
string functionName = 1;
bool inspector = 2;
repeated WorkerStats workers = 3;
float concurrency = 4;
}

message WorkerTrafficStatsResponse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ describe(common.testName(__filename), () => {
activeRequestCount: 6,
},
],
concurrency: 7,
};

const brokerData2: DeepRequired<noslated.data.IBrokerStats> = {
Expand All @@ -79,6 +80,7 @@ describe(common.testName(__filename), () => {
activeRequestCount: 4,
},
],
concurrency: 8,
};

describe('#autoScale()', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ function generateBrokerData(
activeRequestCount,
},
],
// instant concurrency stats concurrency = sum(worker.activeRequestCount)
concurrency: activeRequestCount,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ function generateBrokerData(
activeRequestCount,
},
],
concurrency: activeRequestCount,
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ function generateBrokerData(
activeRequestCount,
},
],
concurrency: activeRequestCount,
};
}

Expand Down
2 changes: 2 additions & 0 deletions src/control_plane/__test__/worker_stats/test_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export const brokerData: DeepRequired<noslated.data.IBrokerStats>[] = [
activeRequestCount: 1,
},
],
concurrency: 1,
},
{
functionName: 'func',
Expand All @@ -55,5 +56,6 @@ export const brokerData: DeepRequired<noslated.data.IBrokerStats>[] = [
activeRequestCount: 6,
},
],
concurrency: 6,
},
];
10 changes: 5 additions & 5 deletions src/control_plane/data_plane_client/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ import {
WorkerStatusReportEvent,
WorkerTrafficStatsEvent,
} from '../events';
import { Clock } from '#self/lib/clock';
import { Clock, TimerHandle } from '#self/lib/clock';

export class DataPlaneSubscription {
static SubscriptionNames = ['requestQueueing', 'containerStatusReport'];

private logger: Logger;
private closed = false;

private trafficStatsTimeout: NodeJS.Timeout | null = null;
private trafficStatsTimeout: TimerHandle | null = null;

constructor(
private eventBus: EventBus,
Expand Down Expand Up @@ -95,7 +95,7 @@ export class DataPlaneSubscription {
this.trafficStatsTimeout = this._clock.setTimeout(
this._onWorkerTrafficStatsTimeout,
this.pullingInterval
) as NodeJS.Timeout;
);
});
};

Expand All @@ -119,13 +119,13 @@ export class DataPlaneSubscription {
this.trafficStatsTimeout = this._clock.setTimeout(
this._onWorkerTrafficStatsTimeout,
this.pullingInterval
) as NodeJS.Timeout;
);
}

unsubscribe() {
this.closed = true;
if (this.trafficStatsTimeout) {
clearTimeout(this.trafficStatsTimeout);
this._clock.clearTimeout(this.trafficStatsTimeout);
}
}
}
6 changes: 2 additions & 4 deletions src/control_plane/worker_stats/broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,12 +205,10 @@ class Broker {
return a;
}

recalculateConcurrency() {
recalculateConcurrency(concurrency: number) {
if (!this._useEmaScaling) return;

const current = this.getActiveRequestCount();

this._emaConcurrency!.recalculate(current);
this._emaConcurrency!.recalculate(concurrency);
}

getEMAConcurrency() {
Expand Down
19 changes: 12 additions & 7 deletions src/control_plane/worker_stats/state_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,26 +124,31 @@ export class StateManager extends Base {

async _syncBrokerData(data: root.noslated.data.IBrokerStats[]) {
const allSyncData = new Map<string, WorkerStats>();
const concurrencyStats = new Map<string, number>();

for (const item of data) {
const borkerKey = Broker.getKey(item.functionName!, item.inspector!);
concurrencyStats.set(borkerKey, item.concurrency!);

for (const workerData of item.workers ?? []) {
const name = `${Broker.getKey(item.functionName!, item.inspector!)}#${
workerData.name
}`;
const name = `${borkerKey}#${workerData.name}`;
allSyncData.set(name, workerData);
}
}

for (const broker of this.brokers()) {
const brokerKey = Broker.getKey(broker.name, broker.isInspector);

for (const worker of broker.workers.values()) {
const name = `${Broker.getKey(broker.name, broker.isInspector)}#${
worker.name
}`;
const name = `${brokerKey}#${worker.name}`;
worker.sync(allSyncData.get(name) ?? null);
}

broker.recalculateConcurrency();
const concurrency = concurrencyStats.get(brokerKey) ?? 0;
broker.recalculateConcurrency(concurrency);
}

concurrencyStats.clear();
allSyncData.clear();
}

Expand Down
113 changes: 113 additions & 0 deletions src/data_plane/__test__/concurrency_stats.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import { bufferFromStream, sleep } from '#self/lib/util';
import * as common from '#self/test/common';
import { DefaultEnvironment } from '#self/test/env/environment';
import sinon from 'sinon';
import assert from 'assert';
import { Metadata } from '#self/delegate/request_response';
import { TriggerErrorStatus } from '../request_logger';
import { Broker } from '#self/control_plane/worker_stats/broker';
import { ConcurrencyStatsMode } from '#self/lib/json/function_profile';
import { assertCloseTo } from '#self/test/util';

const { baselineDir } = common;

describe(common.testName(__filename), function () {
this.timeout(60_000);

const env = new DefaultEnvironment();

it('should instant concurrency stats work as legacy mode', async () => {
await env.agent.setFunctionProfile([
{
name: 'aworker_echo',
runtime: 'aworker',
url: `file://${baselineDir}/aworker_echo`,
sourceFile: 'sleep.js',
signature: 'md5:234234',
worker: {
maxActivateRequests: 1,
},
},
]);

request(env, 500);
request(env, 500);

await sleep(100);

const dataFlowController = env.data.dataFlowController;
const broker = dataFlowController.brokers.get('aworker_echo$$noinspect');

assert.strictEqual(broker?.toJSON().concurrency, 2);

await sleep(1000);

assert.strictEqual(broker?.toJSON().concurrency, 0);
});

it('should periodic_max concurrency stats work', async () => {
await env.agent.setFunctionProfile([
{
name: 'aworker_echo',
runtime: 'aworker',
url: `file://${baselineDir}/aworker_echo`,
sourceFile: 'sleep.js',
signature: 'md5:234234',
worker: {
maxActivateRequests: 1,
concurrencyStatsMode: ConcurrencyStatsMode.PERIODIC_MAX,
},
},
]);

request(env, 500);
request(env, 500);

await sleep(100);

const dataFlowController = env.data.dataFlowController;
const broker = dataFlowController.brokers.get('aworker_echo$$noinspect');

await sleep(1000);

assert.strictEqual(broker?.toJSON().concurrency, 2);
});

it('should periodic_avg concurrency stats work', async () => {
await env.agent.setFunctionProfile([
{
name: 'aworker_echo',
runtime: 'aworker',
url: `file://${baselineDir}/aworker_echo`,
sourceFile: 'sleep.js',
signature: 'md5:234234',
worker: {
maxActivateRequests: 1,
concurrencyStatsMode: ConcurrencyStatsMode.PERIODIC_AVG,
},
},
]);

request(env, 500);
request(env, 500);

await sleep(1000);

const dataFlowController = env.data.dataFlowController;
const broker = dataFlowController.brokers.get('aworker_echo$$noinspect');

assertCloseTo(broker?.toJSON().concurrency!, 1, 0.2);
});
});

async function request(env: DefaultEnvironment, timeout: number) {
const response = await env.agent.invoke(
'aworker_echo',
Buffer.from('' + timeout),
{
method: 'POST',
}
);

await bufferFromStream(response);
}
16 changes: 15 additions & 1 deletion src/data_plane/worker_broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ import { Dispatcher, DispatcherDelegate } from './dispatcher/dispatcher';
import { DisposableDispatcher } from './dispatcher/disposable';
import { LeastRequestCountDispatcher } from './dispatcher/least_request_count';
import { RoundRobinDispatcher } from './dispatcher/round_robin';
import {
ConcurrencyStatsFactory,
ConcurrencyStats,
} from '#self/lib/concurrency_stats/index';

enum CredentialStatus {
PENDING = 1,
Expand Down Expand Up @@ -253,6 +257,8 @@ export class WorkerBroker extends Base implements DispatcherDelegate {
private _workerMap: Map<string, WorkerItem>;
private tokenBucket: TokenBucket | undefined = undefined;

public concurrencyStats: ConcurrencyStats;

/**
* TODO(chengzhong.wcz): dependency review;
*/
Expand Down Expand Up @@ -287,6 +293,10 @@ export class WorkerBroker extends Base implements DispatcherDelegate {
if (rateLimit) {
this.tokenBucket = new TokenBucket(this.rateLimit as TokenBucketConfig);
}

this.concurrencyStats = ConcurrencyStatsFactory.factory(
this._profile.worker.concurrencyStatsMode
);
}

get workerCount() {
Expand Down Expand Up @@ -477,6 +487,7 @@ export class WorkerBroker extends Base implements DispatcherDelegate {
name: item.name,
activeRequestCount: item.worker?.activeRequestCount ?? 0,
})),
concurrency: this.concurrencyStats.getConcurrency(),
};
}

Expand Down Expand Up @@ -587,6 +598,7 @@ export class WorkerBroker extends Base implements DispatcherDelegate {
inputStream: Readable | Buffer,
metadata: Metadata
): Promise<TriggerResponse> {
const id = this.concurrencyStats.requestStarted();
await this.ready();
const acquiredToken = this.tokenBucket?.acquire() ?? true;
if (!acquiredToken) {
Expand All @@ -595,7 +607,9 @@ export class WorkerBroker extends Base implements DispatcherDelegate {
});
}

return this._dispatcher.invoke(inputStream, metadata);
return this._dispatcher.invoke(inputStream, metadata).finally(() => {
this.concurrencyStats.requestFinished(id);
});
}

/**
Expand Down
35 changes: 35 additions & 0 deletions src/lib/__test__/concurrency_stats/avg_concurrency_stats.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import * as common from '#self/test/common';
import { AvgConcurrencyStats } from '#self/lib/concurrency_stats/avg_concurrency_stats';
import assert from 'assert';
import { sleep } from '#self/lib/util';
import { assertCloseTo } from '#self/test/util';

describe(common.testName(__filename), function () {
this.timeout(30_000);

it('should calculate concurrency correctly', async () => {
const calculator: AvgConcurrencyStats = new AvgConcurrencyStats(console);
// 假设有三个请求同时到达
const r1 = calculator.requestStarted();
const r2 = calculator.requestStarted();
const r3 = calculator.requestStarted();

// 请求结束
setTimeout(() => calculator.requestFinished(r1), 100);
setTimeout(() => calculator.requestFinished(r2), 200);
setTimeout(() => calculator.requestFinished(r3), 1500);

await sleep(1000);
// 剩一个 1500 未结束
// (3 / 1) * (((100 + 200 + 1000) / 3) / 1000)
assertCloseTo(calculator.getConcurrency(), 1.3, 0.01);

await sleep(1000);
// 1500 的剩 500
// (1 / 1) * (((500) / 1) / 1000)
assertCloseTo(calculator.getConcurrency(), 0.5, 0.01);

await sleep(1000);
assert.strictEqual(calculator.getConcurrency(), 0);
});
});
Loading

0 comments on commit 39f87b4

Please sign in to comment.