Skip to content
This repository was archived by the owner on Feb 8, 2024. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/teleterm/src/services/tshd/createClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import Logger from 'teleterm/logger';

import middleware, { withLogging } from './middleware';
import createAbortController from './createAbortController';
import { createClusterEvenstsStream } from './createClusterEventsStream';

export default function createClient(
addr: string,
Expand Down Expand Up @@ -413,6 +414,11 @@ export default function createClient(
});
});
},

clusterEvents() {
const stream = tshd.clusterEvents(new api.ClusterEventsRequest());
return createClusterEvenstsStream(stream);
},
};

return client;
Expand Down
64 changes: 64 additions & 0 deletions packages/teleterm/src/services/tshd/createClusterEventsStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { ClientReadableStream, StatusObject } from '@grpc/grpc-js';

import * as api from 'teleterm/services/tshd/v1/service_pb';

import * as types from './types';

export function createClusterEvenstsStream(
stream: ClientReadableStream<api.ClusterEvent>
): types.ClusterEventsStream {
return {
onNewGatewayConnectionAccepted(
callback: (event: types.NewGatewayConnectionAccepted) => void
): void {
stream.addListener('data', (clusterEvent: api.ClusterEvent) => {
if (clusterEvent.hasNewGatewayConnectionAccepted()) {
const event = {
...clusterEvent.getNewGatewayConnectionAccepted().toObject(),
clusterUri: clusterEvent.getClusterUri(),
};
callback(event);
}
});
},

/**
* From the docs:
*
* The 'error' event indicates that an error has occurred and the stream has been closed.
* Only one of 'error' or 'end' will be emitted.
*
* https://grpc.io/docs/languages/node/basics/#streaming-rpcs
*
* However, there's an issue in grpc-js where if 'error' is emitted, 'end' is emitted
* afterwards anyway. https://github.com/grpc/grpc-node/issues/1396
* We should not depend on this behavior.
*/
onError(callback: (error: Error) => void) {
stream.addListener('error', callback);
},

/**
* From the docs:
*
* The 'end' event indicates that the server has finished sending and no errors occurred.
* Only one of 'error' or 'end' will be emitted.
*
* https://grpc.io/docs/languages/node/basics/#streaming-rpcs
*
* However, there's an issue in grpc-js where if 'error' is emitted, 'end' is emitted
* afterwards anyway. https://github.com/grpc/grpc-node/issues/1396
* We should not depend on this behavior.
*/
onEnd(callback: () => void) {
stream.addListener('end', callback);
},

/**
* Fired when the server sends the status.
*/
onStatus(callback: (status: StatusObject) => void) {
stream.addListener('status', callback);
},
};
}
3 changes: 3 additions & 0 deletions packages/teleterm/src/services/tshd/fixtures/mocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
Application,
AuthSettings,
Cluster,
ClusterEventsStream,
CreateGatewayParams,
Database,
Gateway,
Expand Down Expand Up @@ -55,4 +56,6 @@ export class MockTshClient implements TshClient {
abortSignal?: TshAbortSignal
) => Promise<undefined>;
logout: (clusterUri: string) => Promise<undefined>;

clusterEvents: () => ClusterEventsStream;
}
22 changes: 22 additions & 0 deletions packages/teleterm/src/services/tshd/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { StatusObject } from '@grpc/grpc-js';

import apiCluster from './v1/cluster_pb';
import apiDb from './v1/database_pb';
import apigateway from './v1/gateway_pb';
Expand All @@ -7,6 +9,12 @@ import apiApp from './v1/app_pb';
import apiService from './v1/service_pb';
import apiAuthSettings from './v1/auth_settings_pb';

type RequiredKeys<T> = {
// eslint-disable-next-line @typescript-eslint/ban-types
[K in keyof T]-?: {} extends { [P in K]: T[K] } ? never : K;
}[keyof T];
type PickRequiredKeys<T> = Pick<T, RequiredKeys<T>>;

export type Application = apiApp.App.AsObject;
export type Kube = apiKube.Kube.AsObject;
export type Server = apiServer.Server.AsObject;
Expand All @@ -23,6 +31,9 @@ export type GatewayProtocol =
| 'redis'
| 'sqlserver';
export type Database = apiDb.Database.AsObject;
export type NewGatewayConnectionAccepted =
apiService.NewGatewayConnectionAccepted.AsObject &
PickRequiredKeys<apiService.ClusterEvent.AsObject>;
export type Cluster = apiCluster.Cluster.AsObject;
export type LoggedInUser = apiCluster.LoggedInUser.AsObject;
export type AuthProvider = apiAuthSettings.AuthProvider.AsObject;
Expand Down Expand Up @@ -88,6 +99,17 @@ export type TshClient = {
abortSignal?: TshAbortSignal
) => Promise<void>;
logout: (clusterUri: string) => Promise<void>;

clusterEvents: () => ClusterEventsStream;
};

export type ClusterEventsStream = {
onNewGatewayConnectionAccepted(
callback: (event: NewGatewayConnectionAccepted) => void
): void;
onError(callback: (error: Error) => void): void;
onEnd(callback: () => void): void;
onStatus(callback: (status: StatusObject) => void): void;
};

export type TshAbortController = {
Expand Down
15 changes: 15 additions & 0 deletions packages/teleterm/src/services/tshd/v1/service_grpc_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ interface ITerminalServiceService extends grpc.ServiceDefinition<grpc.UntypedSer
login: ITerminalServiceService_ILogin;
loginPasswordless: ITerminalServiceService_ILoginPasswordless;
logout: ITerminalServiceService_ILogout;
clusterEvents: ITerminalServiceService_IClusterEvents;
}

interface ITerminalServiceService_IListRootClusters extends grpc.MethodDefinition<v1_service_pb.ListClustersRequest, v1_service_pb.ListClustersResponse> {
Expand Down Expand Up @@ -217,6 +218,15 @@ interface ITerminalServiceService_ILogout extends grpc.MethodDefinition<v1_servi
responseSerialize: grpc.serialize<v1_service_pb.EmptyResponse>;
responseDeserialize: grpc.deserialize<v1_service_pb.EmptyResponse>;
}
interface ITerminalServiceService_IClusterEvents extends grpc.MethodDefinition<v1_service_pb.ClusterEventsRequest, v1_service_pb.ClusterEvent> {
path: "/teleport.terminal.v1.TerminalService/ClusterEvents";
requestStream: false;
responseStream: true;
requestSerialize: grpc.serialize<v1_service_pb.ClusterEventsRequest>;
requestDeserialize: grpc.deserialize<v1_service_pb.ClusterEventsRequest>;
responseSerialize: grpc.serialize<v1_service_pb.ClusterEvent>;
responseDeserialize: grpc.deserialize<v1_service_pb.ClusterEvent>;
}

export const TerminalServiceService: ITerminalServiceService;

Expand All @@ -241,6 +251,7 @@ export interface ITerminalServiceServer {
login: grpc.handleUnaryCall<v1_service_pb.LoginRequest, v1_service_pb.EmptyResponse>;
loginPasswordless: grpc.handleBidiStreamingCall<v1_service_pb.LoginPasswordlessRequest, v1_service_pb.LoginPasswordlessResponse>;
logout: grpc.handleUnaryCall<v1_service_pb.LogoutRequest, v1_service_pb.EmptyResponse>;
clusterEvents: grpc.handleServerStreamingCall<v1_service_pb.ClusterEventsRequest, v1_service_pb.ClusterEvent>;
}

export interface ITerminalServiceClient {
Expand Down Expand Up @@ -304,6 +315,8 @@ export interface ITerminalServiceClient {
logout(request: v1_service_pb.LogoutRequest, callback: (error: grpc.ServiceError | null, response: v1_service_pb.EmptyResponse) => void): grpc.ClientUnaryCall;
logout(request: v1_service_pb.LogoutRequest, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: v1_service_pb.EmptyResponse) => void): grpc.ClientUnaryCall;
logout(request: v1_service_pb.LogoutRequest, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: v1_service_pb.EmptyResponse) => void): grpc.ClientUnaryCall;
clusterEvents(request: v1_service_pb.ClusterEventsRequest, options?: Partial<grpc.CallOptions>): grpc.ClientReadableStream<v1_service_pb.ClusterEvent>;
clusterEvents(request: v1_service_pb.ClusterEventsRequest, metadata?: grpc.Metadata, options?: Partial<grpc.CallOptions>): grpc.ClientReadableStream<v1_service_pb.ClusterEvent>;
}

export class TerminalServiceClient extends grpc.Client implements ITerminalServiceClient {
Expand Down Expand Up @@ -367,4 +380,6 @@ export class TerminalServiceClient extends grpc.Client implements ITerminalServi
public logout(request: v1_service_pb.LogoutRequest, callback: (error: grpc.ServiceError | null, response: v1_service_pb.EmptyResponse) => void): grpc.ClientUnaryCall;
public logout(request: v1_service_pb.LogoutRequest, metadata: grpc.Metadata, callback: (error: grpc.ServiceError | null, response: v1_service_pb.EmptyResponse) => void): grpc.ClientUnaryCall;
public logout(request: v1_service_pb.LogoutRequest, metadata: grpc.Metadata, options: Partial<grpc.CallOptions>, callback: (error: grpc.ServiceError | null, response: v1_service_pb.EmptyResponse) => void): grpc.ClientUnaryCall;
public clusterEvents(request: v1_service_pb.ClusterEventsRequest, options?: Partial<grpc.CallOptions>): grpc.ClientReadableStream<v1_service_pb.ClusterEvent>;
public clusterEvents(request: v1_service_pb.ClusterEventsRequest, metadata?: grpc.Metadata, options?: Partial<grpc.CallOptions>): grpc.ClientReadableStream<v1_service_pb.ClusterEvent>;
}
34 changes: 34 additions & 0 deletions packages/teleterm/src/services/tshd/v1/service_grpc_pb.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@ function deserialize_teleport_terminal_v1_Cluster(buffer_arg) {
return v1_cluster_pb.Cluster.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_teleport_terminal_v1_ClusterEvent(arg) {
if (!(arg instanceof v1_service_pb.ClusterEvent)) {
throw new Error('Expected argument of type teleport.terminal.v1.ClusterEvent');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_teleport_terminal_v1_ClusterEvent(buffer_arg) {
return v1_service_pb.ClusterEvent.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_teleport_terminal_v1_ClusterEventsRequest(arg) {
if (!(arg instanceof v1_service_pb.ClusterEventsRequest)) {
throw new Error('Expected argument of type teleport.terminal.v1.ClusterEventsRequest');
}
return Buffer.from(arg.serializeBinary());
}

function deserialize_teleport_terminal_v1_ClusterEventsRequest(buffer_arg) {
return v1_service_pb.ClusterEventsRequest.deserializeBinary(new Uint8Array(buffer_arg));
}

function serialize_teleport_terminal_v1_CreateGatewayRequest(arg) {
if (!(arg instanceof v1_service_pb.CreateGatewayRequest)) {
throw new Error('Expected argument of type teleport.terminal.v1.CreateGatewayRequest');
Expand Down Expand Up @@ -642,6 +664,18 @@ logout: {
responseSerialize: serialize_teleport_terminal_v1_EmptyResponse,
responseDeserialize: deserialize_teleport_terminal_v1_EmptyResponse,
},
// TODO: Add comment.
clusterEvents: {
path: '/teleport.terminal.v1.TerminalService/ClusterEvents',
requestStream: false,
responseStream: true,
requestType: v1_service_pb.ClusterEventsRequest,
responseType: v1_service_pb.ClusterEvent,
requestSerialize: serialize_teleport_terminal_v1_ClusterEventsRequest,
requestDeserialize: deserialize_teleport_terminal_v1_ClusterEventsRequest,
responseSerialize: serialize_teleport_terminal_v1_ClusterEvent,
responseDeserialize: deserialize_teleport_terminal_v1_ClusterEvent,
},
};

exports.TerminalServiceClient = grpc.makeGenericClientConstructor(TerminalServiceService);
106 changes: 106 additions & 0 deletions packages/teleterm/src/services/tshd/v1/service_pb.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -838,6 +838,112 @@ export namespace GetAuthSettingsRequest {
}
}

export class ClusterEventsRequest extends jspb.Message {

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): ClusterEventsRequest.AsObject;
static toObject(includeInstance: boolean, msg: ClusterEventsRequest): ClusterEventsRequest.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: ClusterEventsRequest, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): ClusterEventsRequest;
static deserializeBinaryFromReader(message: ClusterEventsRequest, reader: jspb.BinaryReader): ClusterEventsRequest;
}

export namespace ClusterEventsRequest {
export type AsObject = {
}
}

export class ClusterEvent extends jspb.Message {
getClusterUri(): string;
setClusterUri(value: string): ClusterEvent;


hasCertExpired(): boolean;
clearCertExpired(): void;
getCertExpired(): CertExpired | undefined;
setCertExpired(value?: CertExpired): ClusterEvent;


hasNewGatewayConnectionAccepted(): boolean;
clearNewGatewayConnectionAccepted(): void;
getNewGatewayConnectionAccepted(): NewGatewayConnectionAccepted | undefined;
setNewGatewayConnectionAccepted(value?: NewGatewayConnectionAccepted): ClusterEvent;


getEventCase(): ClusterEvent.EventCase;

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): ClusterEvent.AsObject;
static toObject(includeInstance: boolean, msg: ClusterEvent): ClusterEvent.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: ClusterEvent, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): ClusterEvent;
static deserializeBinaryFromReader(message: ClusterEvent, reader: jspb.BinaryReader): ClusterEvent;
}

export namespace ClusterEvent {
export type AsObject = {
clusterUri: string,
certExpired?: CertExpired.AsObject,
newGatewayConnectionAccepted?: NewGatewayConnectionAccepted.AsObject,
}

export enum EventCase {
EVENT_NOT_SET = 0,

CERT_EXPIRED = 2,

NEW_GATEWAY_CONNECTION_ACCEPTED = 3,

}

}

export class CertExpired extends jspb.Message {

serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): CertExpired.AsObject;
static toObject(includeInstance: boolean, msg: CertExpired): CertExpired.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: CertExpired, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): CertExpired;
static deserializeBinaryFromReader(message: CertExpired, reader: jspb.BinaryReader): CertExpired;
}

export namespace CertExpired {
export type AsObject = {
}
}

export class NewGatewayConnectionAccepted extends jspb.Message {
getGatewayUri(): string;
setGatewayUri(value: string): NewGatewayConnectionAccepted;

getTargetUri(): string;
setTargetUri(value: string): NewGatewayConnectionAccepted;


serializeBinary(): Uint8Array;
toObject(includeInstance?: boolean): NewGatewayConnectionAccepted.AsObject;
static toObject(includeInstance: boolean, msg: NewGatewayConnectionAccepted): NewGatewayConnectionAccepted.AsObject;
static extensions: {[key: number]: jspb.ExtensionFieldInfo<jspb.Message>};
static extensionsBinary: {[key: number]: jspb.ExtensionFieldBinaryInfo<jspb.Message>};
static serializeBinaryToWriter(message: NewGatewayConnectionAccepted, writer: jspb.BinaryWriter): void;
static deserializeBinary(bytes: Uint8Array): NewGatewayConnectionAccepted;
static deserializeBinaryFromReader(message: NewGatewayConnectionAccepted, reader: jspb.BinaryReader): NewGatewayConnectionAccepted;
}

export namespace NewGatewayConnectionAccepted {
export type AsObject = {
gatewayUri: string,
targetUri: string,
}
}

export class EmptyResponse extends jspb.Message {

serializeBinary(): Uint8Array;
Expand Down
Loading