Skip to content

Commit

Permalink
feat: useNewWorker flag on request logger (#102)
Browse files Browse the repository at this point in the history
* feat: useNewWorker flag on request logger
  • Loading branch information
mariodu authored Jan 24, 2024
1 parent 5056090 commit 7105bf7
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 12 deletions.
17 changes: 14 additions & 3 deletions src/data_plane/data_flow_controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ export class DataFlowController extends BaseOf(EventEmitter) {
error,
invokeName,
(error as ErrorWithInvokeDetail).workerName || kDefaultWorkerName,
(error as ErrorWithInvokeDetail).useNewWorker,
metadata,
TriggerErrorStatus.INTERNAL,
{
Expand Down Expand Up @@ -529,7 +530,8 @@ export class DataFlowController extends BaseOf(EventEmitter) {
queueing: response!.queueing || 0,
rt: Date.now() - start,
},
bytesSent
bytesSent,
response.useNewWorker
);
});

Expand All @@ -539,6 +541,7 @@ export class DataFlowController extends BaseOf(EventEmitter) {
e,
invokeName,
response.workerName || kDefaultWorkerName,
response.useNewWorker,
metadata,
TriggerErrorStatus.ABORT,
{
Expand All @@ -555,19 +558,27 @@ export class DataFlowController extends BaseOf(EventEmitter) {
error: Error,
invokeName: string,
workerName: string,
useNewWorker: boolean,
metadata: Metadata,
status: number,
timing: RequestTiming,
bytesSent?: number
) {
this.requestLogger.error(invokeName, workerName, error, metadata.requestId);
this.requestLogger.error(
invokeName,
workerName,
error,
metadata.requestId,
useNewWorker
);
this.requestLogger.access(
invokeName,
workerName,
metadata,
String(status),
timing,
bytesSent ?? 0
bytesSent ?? 0,
useNewWorker
);
}

Expand Down
20 changes: 13 additions & 7 deletions src/data_plane/request_logger.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
import { Metadata } from '#self/delegate/request_response';
import { Loggers } from '#self/lib/loggers';
import { ILogger } from '@midwayjs/logger';
import { kDefaultRequestId, kDefaultWorkerName } from '#self/lib/constants';
import {
kDefaultRequestId,
kDefaultWorkerName,
UseNewWorkerState,
} from '#self/lib/constants';
import { Config } from '#self/config';
import dayjs from 'dayjs';

Expand All @@ -20,13 +24,14 @@ export class RequestLogger {
funcName: string,
workerName: string = kDefaultWorkerName,
err: Error,
requestId: string = kDefaultRequestId
requestId: string = kDefaultRequestId,
useNewWorker = false
) {
// logTime, requestId, dataPlanePid, functionName, workerName, error
// logTime, requestId, dataPlanePid, functionName, workerName, useNewWorker, error
this.errorLogger.write(
`${dayjs().format(this.timestampFormat)} ${requestId} ${
process.pid
} ${funcName} ${workerName} - `,
} ${funcName} ${workerName} ${useNewWorker ?? false} - `,
err
);
}
Expand All @@ -37,16 +42,17 @@ export class RequestLogger {
metadata: Metadata,
status = `${TriggerErrorStatus.DEFAULT}`,
performance: RequestTiming,
bytesSent = 0
bytesSent = 0,
useNewWorker = false
) {
// logTime, requestId, dataPlanePid, functionName, workerName, method, url, invokeSuccess, timeToFirstByte, timeForQueueing, rt, statusCode, responseSize
// logTime, requestId, dataPlanePid, functionName, workerName, useNewWorker, method, url, invokeSuccess, timeToFirstByte, timeForQueueing, rt, statusCode, responseSize
const { method = '-', url = '-', requestId = kDefaultRequestId } = metadata;
const { ttfb = 0, queueing = 0, rt = 0 } = performance;

this.accessLogger.write(
`${dayjs().format(this.timestampFormat)} ${requestId} ${
process.pid
} ${funcName} ${workerName} ${method} ${url} ` +
} ${funcName} ${workerName} ${useNewWorker ?? false} ${method} ${url} ` +
`${
`${status}` === '200'
} ${ttfb} ${queueing} ${rt} ${status} ${bytesSent}`
Expand Down
16 changes: 14 additions & 2 deletions src/data_plane/worker_broker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export class Worker extends EventEmitter {
debuggerTag: string | undefined;

private _dispatcherData: unknown;
private _unworked = true;

constructor(
public delegate: NoslatedDelegateService,
Expand Down Expand Up @@ -193,6 +194,7 @@ export class Worker extends EventEmitter {

ret.queueing = waitMs;
ret.workerName = this.name;
ret.useNewWorker = this._unworked;

// do not await the response body finishing.
ret.finish().finally(() => {
Expand All @@ -204,32 +206,42 @@ export class Worker extends EventEmitter {

return ret;
} catch (e: unknown) {
extendErrorWithInvokeDetail(e as Error, this.name, waitMs);
extendErrorWithInvokeDetail(
e as Error,
this.name,
waitMs,
this._unworked
);

this.activeRequestCount--;
if (this.activeRequestCount === 0) {
this.emit('downToZero');
}

throw e;
} finally {
this._unworked = false;
}
}
}

export function extendErrorWithInvokeDetail(
error: Error,
workerName: string,
queueing: number
queueing: number,
useNewWorker: boolean
) {
if (error instanceof Error) {
error['workerName'] = workerName;
error['queueing'] = queueing;
error['useNewWorker'] = useNewWorker;
}
}

export interface ErrorWithInvokeDetail extends Error {
workerName: string;
queueing: number;
useNewWorker: boolean;
}

interface BrokerOptions {
Expand Down
10 changes: 10 additions & 0 deletions src/delegate/request_response.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class TriggerResponse extends Readable {
// time for request wait to be invoked
#queueing: number;
#workerName: string;
#useNewWorker: boolean;

constructor(init?: TriggerResponseInit) {
super({
Expand All @@ -106,6 +107,7 @@ class TriggerResponse extends Readable {
this.#metadata = metadata as Metadata;
this.#queueing = kDefaultQueueingTime;
this.#workerName = kDefaultWorkerName;
this.#useNewWorker = false;
this.#finishDeferred = createDeferred<boolean>();

this.once('close', () => {
Expand Down Expand Up @@ -149,6 +151,14 @@ class TriggerResponse extends Readable {
return this.#workerName;
}

get useNewWorker(): boolean {
return this.#useNewWorker;
}

set useNewWorker(val: boolean) {
this.#useNewWorker = val;
}

async finish(): Promise<boolean> {
return this.#finishDeferred.promise;
}
Expand Down
6 changes: 6 additions & 0 deletions src/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,9 @@ export enum TurfStatusEvent {
export const kDefaultRequestId = 'unknown';
export const kDefaultWorkerName = 'unknown';
export const kDefaultQueueingTime = -1;

export enum UseNewWorkerState {
TRUE = 'true',
FALSE = 'false',
UNKNOWN = 'unknown',
}

0 comments on commit 7105bf7

Please sign in to comment.