Skip to content

Commit

Permalink
feat: dump worker traffic stats (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
mariodu authored Jan 4, 2024
1 parent 0f04f1d commit a5c4ad5
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 1 deletion.
1 change: 1 addition & 0 deletions src/config/default.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export default {
workerRedundantVictimSpareTimes: 6,
capacityScalingStage: 0.7,
useEmaScaling: false,
dumpWorkerTrafficStats: false,
},

dataPlane: {
Expand Down
5 changes: 5 additions & 0 deletions src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ export interface ControlPlaneConfig {
* 此配置开启会强制设置 workerTrafficStatsPullingMs 为 1000ms,确保数据同步及时
*/
useEmaScaling: boolean;
/**
* 输出 worker traffic stats 同步数据
* 默认为 false
*/
dumpWorkerTrafficStats: boolean;
}

export interface DatePlaneConfig {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import * as common from '#self/test/common';
import { DefaultEnvironment } from '#self/test/env/environment';
import assert from 'assert';
import sinon from 'sinon';
import { PrefixedLogger } from '#self/lib/loggers';
import { sleep } from '#self/lib/util';

describe(common.testName(__filename), () => {
const env = new DefaultEnvironment({
config: common.extendDefaultConfig({
controlPlane: {
dumpWorkerTrafficStats: true,
workerTrafficStatsPullingMs: 1000,
},
}),
});

it('should dump worker traffic stats', async () => {
const stateManager = env.control._ctx.getInstance('stateManager');

assert(stateManager['_dumpLogger'] instanceof PrefixedLogger);

const spy = sinon.spy(stateManager['_dumpLogger'], 'info');

await env.agent.setFunctionProfile([
{
name: 'aworker_echo',
runtime: 'aworker',
url: `file://${common.baselineDir}/aworker_echo`,
sourceFile: 'index.js',
signature: 'md5:234234',
worker: {
maxActivateRequests: 1,
reservationCount: 1,
},
},
]);

await sleep(1100);

assert(
spy.calledWithMatch(
'sync broker %s concurrency %d.',
'aworker_echo:noinspector',
0
)
);
});
});
16 changes: 15 additions & 1 deletion src/control_plane/worker_stats/state_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {
WorkerStatus,
WorkerStatusReport,
} from '#self/lib/constants';
import { Logger, loggers } from '#self/lib/loggers';
import { Logger, loggers, PrefixedLogger } from '#self/lib/loggers';
import {
ContainerReconciledEvent,
WorkerStatusReportEvent,
Expand Down Expand Up @@ -34,12 +34,20 @@ export class StateManager extends Base {

private _useEmaScaling: boolean;

private _dumpLogger: Pick<PrefixedLogger, 'info'> = {
info: () => {},
};

constructor(ctx: ControlPlaneDependencyContext) {
super();
this._logger = loggers.get('state manager');
this._functionProfile = ctx.getInstance('functionProfile');
this._config = ctx.getInstance('config');

if (this._config.controlPlane.dumpWorkerTrafficStats) {
this._dumpLogger = new PrefixedLogger('state manager', 'dump');
}

this._eventBus = ctx.getInstance('eventBus');
this._eventBus.subscribe(WorkerTrafficStatsEvent, {
next: event => {
Expand Down Expand Up @@ -130,6 +138,12 @@ export class StateManager extends Base {
const borkerKey = Broker.getKey(item.functionName!, item.inspector!);
concurrencyStats.set(borkerKey, item.concurrency!);

this._dumpLogger.info(
`sync broker %s concurrency %d.`,
borkerKey,
item.concurrency!
);

for (const workerData of item.workers ?? []) {
const name = `${borkerKey}#${workerData.name}`;
allSyncData.set(name, workerData);
Expand Down

0 comments on commit a5c4ad5

Please sign in to comment.