Skip to content

Commit

Permalink
feat: max queue size for pool
Browse files Browse the repository at this point in the history
  • Loading branch information
SkeLLLa committed Jan 2, 2025
1 parent f062a2d commit 2987196
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/client/broker/transport/json/undici.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import {
export class PinotBrokerJSONTransport implements IPinotBrokerTransport {
protected readonly pool: Pool;
protected readonly token: string;
protected readonly maxQueueSize: number | undefined;

constructor({
bodyTimeout = 60000,
Expand All @@ -25,6 +26,7 @@ export class PinotBrokerJSONTransport implements IPinotBrokerTransport {
headersTimeout = 60000,
keepAliveMaxTimeout = 60000,
token,
maxQueueSize,
}: IBrokerTransportConfig) {
this.pool = new Pool(brokerUrl, {
connections: connections ?? null,
Expand All @@ -38,6 +40,7 @@ export class PinotBrokerJSONTransport implements IPinotBrokerTransport {
// },
});
this.token = token;
this.maxQueueSize = maxQueueSize;
}

/**
Expand All @@ -55,6 +58,16 @@ export class PinotBrokerJSONTransport implements IPinotBrokerTransport {
path,
query,
}: IBrokerTransportRequestOptions): Promise<TResponse> {
if (this.maxQueueSize && this.pool.stats.queued >= this.maxQueueSize) {
// Throw error
throw new PinotError({
data: { body },
message: `Pinot transport error: Max queue size reached`,
type: EPinotErrorType.TRANSPORT,
code: EBrokerTransportErrorCode.LIMIT_EXCEEDED,
});
}

const reqOptions: Dispatcher.RequestOptions = {
method,
headers: {
Expand Down
11 changes: 11 additions & 0 deletions src/client/broker/transport/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ export interface IBrokerTransportConfig {
* @defaultValue 1000
*/
headersTimeout?: number;
/**
* Max pool queue size. If undefined or 0, queue is infinite.
* If a request comes and queue is already at maximum size it will be discarded with LIMIT_EXCEEDED error.
*
* @defaultValue value undefined
*/
maxQueueSize?: number;
}

/**
Expand Down Expand Up @@ -114,4 +121,8 @@ export const enum EBrokerTransportErrorCode {
* Timeout
*/
TIMEOUT,
/**
* Limit exceded
*/
LIMIT_EXCEEDED,
}

0 comments on commit 2987196

Please sign in to comment.