Skip to content

Commit

Permalink
chore: split structure (#558)
Browse files Browse the repository at this point in the history
  • Loading branch information
metcoder95 authored May 20, 2024
1 parent db2226b commit 5b46247
Show file tree
Hide file tree
Showing 17 changed files with 369 additions and 278 deletions.
8 changes: 4 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
"name": "piscina",
"version": "4.5.0",
"description": "A fast, efficient Node.js Worker Thread Pool implementation",
"main": "./dist/src/main.js",
"main": "./dist/main.js",
"types": "./dist/index.d.ts",
"exports": {
"types": "./dist/src/index.d.ts",
"types": "./dist/index.d.ts",
"import": "./dist/esm-wrapper.mjs",
"require": "./dist/src/main.js"
"require": "./dist/main.js"
},
"types": "./dist/src/index.d.ts",
"scripts": {
"build": "tsc && gen-esm-wrapper . dist/esm-wrapper.mjs",
"lint": "standardx \"**/*.{ts,mjs,js,cjs}\" | snazzy",
Expand Down
41 changes: 41 additions & 0 deletions src/abort.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
interface AbortSignalEventTargetAddOptions {
once: boolean;
}

export interface AbortSignalEventTarget {
addEventListener: (
name: 'abort',
listener: () => void,
options?: AbortSignalEventTargetAddOptions
) => void;
removeEventListener: (name: 'abort', listener: () => void) => void;
aborted?: boolean;
reason?: unknown;
}

export interface AbortSignalEventEmitter {
off: (name: 'abort', listener: () => void) => void;
once: (name: 'abort', listener: () => void) => void;
}

export type AbortSignalAny = AbortSignalEventTarget | AbortSignalEventEmitter;

export class AbortError extends Error {
constructor (reason?: AbortSignalEventTarget['reason']) {
// TS does not recognizes the cause clause
// @ts-expect-error
super('The task has been aborted', { cause: reason });
}

get name () {
return 'AbortError';
}
}

export function onabort (abortSignal: AbortSignalAny, listener: () => void) {
if ('addEventListener' in abortSignal) {
abortSignal.addEventListener('abort', listener, { once: true });
} else {
abortSignal.once('abort', listener);
}
}
137 changes: 67 additions & 70 deletions src/common.ts
Original file line number Diff line number Diff line change
@@ -1,59 +1,42 @@
import type { MessagePort } from 'worker_threads';
import type { Histogram } from 'node:perf_hooks';
import { fileURLToPath, URL } from 'node:url';

export const READY = '_WORKER_READY';

export interface StartupMessage {
filename : string | null;
name : string;
port : MessagePort;
sharedBuffer : Int32Array;
useAtomics : boolean;
niceIncrement : number;
}
import type { HistogramSummary } from './types';
import { kMovable, kTransferable, kValue } from './symbols';

export interface RequestMessage {
taskId : number;
task : any;
filename: string;
name : string;
}

export interface ReadyMessage {
[READY]: true
};

export interface ResponseMessage {
taskId : number;
result : any;
error: Error | null;
}
export const commonState = {
isWorkerThread: false,
workerData: undefined
};

// Internal symbol used to mark Transferable objects returned
// by the Piscina.move() function
const kMovable = Symbol('Piscina.kMovable');
export const kTransferable = Symbol.for('Piscina.transferable');
export const kValue = Symbol.for('Piscina.valueOf');
export const kQueueOptions = Symbol.for('Piscina.queueOptions');
// States wether the worker is ready to receive tasks
export const READY = '_WORKER_READY';

// True if the object implements the Transferable interface
export function isTransferable (value : any) : boolean {
return value != null &&
typeof value === 'object' &&
kTransferable in value &&
kValue in value;
/**
* True if the object implements the Transferable interface
*
* @export
* @param {unknown} value
* @return {*} {boolean}
*/
export function isTransferable (value: unknown): boolean {
return (
value != null &&
typeof value === 'object' &&
kTransferable in value &&
kValue in value
);
}

// True if object implements Transferable and has been returned
// by the Piscina.move() function
export function isMovable (value : any) : boolean {
/**
* True if object implements Transferable and has been returned
* by the Piscina.move() function
*
* TODO: narrow down the type of value
* @export
* @param {(unknown & PiscinaMovable)} value
* @return {*} {boolean}
*/
export function isMovable (value: any): boolean {
return isTransferable(value) && value[kMovable] === true;
}

export function markMovable (value : object) : void {
export function markMovable (value: {}): void {
Object.defineProperty(value, kMovable, {
enumerable: false,
configurable: true,
Expand All @@ -62,31 +45,45 @@ export function markMovable (value : object) : void {
});
}

export interface Transferable {
readonly [kTransferable] : object;
readonly [kValue] : object;
}
// State of Piscina pool
export const commonState = {
isWorkerThread: false,
workerData: undefined
};

export interface Task {
readonly [kQueueOptions] : object | null;
}
export function createHistogramSummary (histogram: Histogram): HistogramSummary {
const { mean, stddev, min, max } = histogram;

export interface TaskQueue {
readonly size : number;
shift () : Task | null;
remove (task : Task) : void;
push (task : Task) : void;
return {
average: mean / 1000,
mean: mean / 1000,
stddev,
min: min / 1000,
max: max / 1000,
p0_001: histogram.percentile(0.001) / 1000,
p0_01: histogram.percentile(0.01) / 1000,
p0_1: histogram.percentile(0.1) / 1000,
p1: histogram.percentile(1) / 1000,
p2_5: histogram.percentile(2.5) / 1000,
p10: histogram.percentile(10) / 1000,
p25: histogram.percentile(25) / 1000,
p50: histogram.percentile(50) / 1000,
p75: histogram.percentile(75) / 1000,
p90: histogram.percentile(90) / 1000,
p97_5: histogram.percentile(97.5) / 1000,
p99: histogram.percentile(99) / 1000,
p99_9: histogram.percentile(99.9) / 1000,
p99_99: histogram.percentile(99.99) / 1000,
p99_999: histogram.percentile(99.999) / 1000
};
}

export function isTaskQueue (value : any) : boolean {
return typeof value === 'object' &&
value !== null &&
'size' in value &&
typeof value.shift === 'function' &&
typeof value.remove === 'function' &&
typeof value.push === 'function';
export function toHistogramIntegerNano (milliseconds: number): number {
return Math.max(1, Math.trunc(milliseconds * 1000));
}

export const kRequestCountField = 0;
export const kResponseCountField = 1;
export const kFieldCount = 2;
export function maybeFileURLToPath (filename : string) : string {
return filename.startsWith('file:')
? fileURLToPath(new URL(filename))
: filename;
}
3 changes: 2 additions & 1 deletion src/fixed-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
* Source: https://github.com/nodejs/node/blob/de7b37880f5a541d5f874c1c2362a65a4be76cd0/lib/internal/fixed_queue.js
*/
import assert from 'node:assert';
import { TaskQueue, Task } from './common';
import { TaskQueue } from './task_queue';
import { Task } from './types';
// Currently optimal queue size, tested on V8 6.0 - 6.6. Must be power of two.
const kSize = 2048;
const kMask = kSize - 1;
Expand Down
Loading

0 comments on commit 5b46247

Please sign in to comment.