Skip to content

Commit

Permalink
feat: Support for log message handlers (#2347)
Browse files Browse the repository at this point in the history
DH-18086: Support for log message handlers in the gRPC transport
  • Loading branch information
bmingles authored Jan 23, 2025
1 parent bbba404 commit cbab7a2
Showing 1 changed file with 40 additions and 6 deletions.
46 changes: 40 additions & 6 deletions packages/jsapi-nodejs/src/NodeHttp2gRPCTransport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { assertNotNull } from '@deephaven/utils';

const logger = Log.module('@deephaven/jsapi-nodejs.NodeHttp2gRPCTransport');

type LogLevel = 'debug' | 'error';
type GrpcTransport = DhcType.grpc.GrpcTransport;
type GrpcTransportFactory = DhcType.grpc.GrpcTransportFactory;
type GrpcTransportOptions = DhcType.grpc.GrpcTransportOptions;
Expand All @@ -21,6 +22,24 @@ type GrpcTransportOptions = DhcType.grpc.GrpcTransportOptions;
* })
*/
export class NodeHttp2gRPCTransport implements GrpcTransport {
private static readonly logMessageHandlers = new Set<
(logLevel: LogLevel, ...args: unknown[]) => void
>();

/**
* Log a message to the logger + any registered log message handlers.
* @param logLevel The log level
* @param args Additional args to log
*/
static logMessage = (logLevel: LogLevel, ...args: unknown[]): void => {
logger[logLevel](...args);

// eslint-disable-next-line no-restricted-syntax
for (const handler of NodeHttp2gRPCTransport.logMessageHandlers) {
handler(logLevel, ...args);
}
};

private static sessionMap: Map<string, http2.ClientHttp2Session> = new Map();

/**
Expand All @@ -38,7 +57,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport {
if (!NodeHttp2gRPCTransport.sessionMap.has(origin)) {
const session = http2.connect(origin);
session.on('error', err => {
logger.error('Session error', err);
NodeHttp2gRPCTransport.logMessage('error', 'Session error', err);
});
NodeHttp2gRPCTransport.sessionMap.set(origin, session);
}
Expand All @@ -61,6 +80,20 @@ export class NodeHttp2gRPCTransport implements GrpcTransport {
},
};

/**
* Register a log message handler.
* @param handleLogMessage function to handle log messages
* @returns function to unregister the handler
*/
static onLogMessage = (
handleLogMessage: (logLevel: LogLevel, ...args: unknown[]) => void
): (() => void) => {
NodeHttp2gRPCTransport.logMessageHandlers.add(handleLogMessage);
return () => {
NodeHttp2gRPCTransport.logMessageHandlers.delete(handleLogMessage);
};
};

/**
* Private constructor to limit instantiation to the static factory method.
* @param options Transport options.
Expand Down Expand Up @@ -91,7 +124,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport {
): http2.ClientHttp2Stream => {
const url = new URL(this.options.url);

logger.debug('createRequest', url.pathname);
NodeHttp2gRPCTransport.logMessage('debug', 'createRequest', url.pathname);

const req = this.session.request({
...headers,
Expand Down Expand Up @@ -134,7 +167,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport {
* @param metadata - the headers to send the server when opening the connection
*/
start(metadata: { [key: string]: string | Array<string> }): void {
logger.debug('start', metadata.headersMap);
NodeHttp2gRPCTransport.logMessage('debug', 'start', metadata);

if (this.request != null) {
throw new Error('start called more than once');
Expand All @@ -153,7 +186,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport {
* @param msgBytes - bytes to send to the server
*/
sendMessage(msgBytes: Uint8Array): void {
logger.debug('sendMessage', msgBytes);
NodeHttp2gRPCTransport.logMessage('debug', 'sendMessage', msgBytes);
assertNotNull(this.request, 'request is required');

this.request.write(msgBytes);
Expand All @@ -164,7 +197,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport {
* be sent, but that the client is still open to receiving messages.
*/
finishSend(): void {
logger.debug('finishSend');
NodeHttp2gRPCTransport.logMessage('debug', 'finishSend');
assertNotNull(this.request, 'request is required');
this.request.end();
}
Expand All @@ -174,7 +207,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport {
* sent nor received, and preventing the client from receiving any more events.
*/
cancel(): void {
logger.debug('cancel');
NodeHttp2gRPCTransport.logMessage('debug', 'cancel');
assertNotNull(this.request, 'request is required');
this.request.close();
}
Expand All @@ -187,6 +220,7 @@ export class NodeHttp2gRPCTransport implements GrpcTransport {
for (const session of NodeHttp2gRPCTransport.sessionMap.values()) {
session.close();
}
NodeHttp2gRPCTransport.logMessageHandlers.clear();
NodeHttp2gRPCTransport.sessionMap.clear();
}
}
Expand Down

0 comments on commit cbab7a2

Please sign in to comment.