Skip to content

Commit 72bf943

Browse files
authored
refactor(server): event emits (#10648)
* refactor(server): event emits * refactor: change default priority to 0
1 parent 7e99394 commit 72bf943

25 files changed

+222
-171
lines changed

Diff for: server/src/app.module.ts

+26-10
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { BullModule } from '@nestjs/bullmq';
2-
import { Module, OnModuleInit, ValidationPipe } from '@nestjs/common';
2+
import { Inject, Module, OnModuleDestroy, OnModuleInit, ValidationPipe } from '@nestjs/common';
33
import { ConfigModule } from '@nestjs/config';
4-
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE } from '@nestjs/core';
4+
import { APP_FILTER, APP_GUARD, APP_INTERCEPTOR, APP_PIPE, ModuleRef } from '@nestjs/core';
55
import { EventEmitterModule } from '@nestjs/event-emitter';
66
import { ScheduleModule, SchedulerRegistry } from '@nestjs/schedule';
77
import { TypeOrmModule } from '@nestjs/typeorm';
@@ -12,15 +12,15 @@ import { bullConfig, bullQueues, clsConfig, immichAppConfig } from 'src/config';
1212
import { controllers } from 'src/controllers';
1313
import { databaseConfig } from 'src/database.config';
1414
import { entities } from 'src/entities';
15+
import { IEventRepository } from 'src/interfaces/event.interface';
1516
import { AuthGuard } from 'src/middleware/auth.guard';
1617
import { ErrorInterceptor } from 'src/middleware/error.interceptor';
1718
import { FileUploadInterceptor } from 'src/middleware/file-upload.interceptor';
1819
import { HttpExceptionFilter } from 'src/middleware/http-exception.filter';
1920
import { LoggingInterceptor } from 'src/middleware/logging.interceptor';
2021
import { repositories } from 'src/repositories';
2122
import { services } from 'src/services';
22-
import { ApiService } from 'src/services/api.service';
23-
import { MicroservicesService } from 'src/services/microservices.service';
23+
import { setupEventHandlers } from 'src/utils/events';
2424
import { otelConfig } from 'src/utils/instrumentation';
2525

2626
const common = [...services, ...repositories];
@@ -50,23 +50,39 @@ const imports = [
5050
controllers: [...controllers],
5151
providers: [...common, ...middleware],
5252
})
53-
export class ApiModule implements OnModuleInit {
54-
constructor(private service: ApiService) {}
53+
export class ApiModule implements OnModuleInit, OnModuleDestroy {
54+
constructor(
55+
private moduleRef: ModuleRef,
56+
@Inject(IEventRepository) private eventRepository: IEventRepository,
57+
) {}
5558

5659
async onModuleInit() {
57-
await this.service.init();
60+
setupEventHandlers(this.moduleRef);
61+
await this.eventRepository.emit('onBootstrapEvent', 'api');
62+
}
63+
64+
async onModuleDestroy() {
65+
await this.eventRepository.emit('onShutdownEvent');
5866
}
5967
}
6068

6169
@Module({
6270
imports: [...imports],
6371
providers: [...common, SchedulerRegistry],
6472
})
65-
export class MicroservicesModule implements OnModuleInit {
66-
constructor(private service: MicroservicesService) {}
73+
export class MicroservicesModule implements OnModuleInit, OnModuleDestroy {
74+
constructor(
75+
private moduleRef: ModuleRef,
76+
@Inject(IEventRepository) private eventRepository: IEventRepository,
77+
) {}
6778

6879
async onModuleInit() {
69-
await this.service.init();
80+
setupEventHandlers(this.moduleRef);
81+
await this.eventRepository.emit('onBootstrapEvent', 'microservices');
82+
}
83+
84+
async onModuleDestroy() {
85+
await this.eventRepository.emit('onShutdownEvent');
7086
}
7187
}
7288

Diff for: server/src/decorators.ts

+9-2
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import { OnEventOptions } from '@nestjs/event-emitter/dist/interfaces';
44
import { ApiExtension, ApiOperation, ApiProperty, ApiTags } from '@nestjs/swagger';
55
import _ from 'lodash';
66
import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants';
7-
import { ServerAsyncEvent, ServerEvent } from 'src/interfaces/event.interface';
7+
import { ServerEvent } from 'src/interfaces/event.interface';
8+
import { Metadata } from 'src/middleware/auth.guard';
89
import { setUnion } from 'src/utils/set';
910

1011
// PostgreSQL uses a 16-bit integer to indicate the number of bound parameters. This means that the
@@ -129,9 +130,15 @@ export interface GenerateSqlQueries {
129130
/** Decorator to enable versioning/tracking of generated Sql */
130131
export const GenerateSql = (...options: GenerateSqlQueries[]) => SetMetadata(GENERATE_SQL_KEY, options);
131132

132-
export const OnServerEvent = (event: ServerEvent | ServerAsyncEvent, options?: OnEventOptions) =>
133+
export const OnServerEvent = (event: ServerEvent, options?: OnEventOptions) =>
133134
OnEvent(event, { suppressErrors: false, ...options });
134135

136+
export type HandlerOptions = {
137+
/** lower value has higher priority, defaults to 0 */
138+
priority: number;
139+
};
140+
export const EventHandlerOptions = (options: HandlerOptions) => SetMetadata(Metadata.EVENT_HANDLER_OPTIONS, options);
141+
135142
type LifecycleRelease = 'NEXT_RELEASE' | string;
136143
type LifecycleMetadata = {
137144
addedAt?: LifecycleRelease;

Diff for: server/src/interfaces/event.interface.ts

+20-12
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,23 @@ import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server-i
44

55
export const IEventRepository = 'IEventRepository';
66

7+
type MaybePromise<T> = Promise<T> | T;
8+
9+
const noop = () => {};
10+
const dummyHandlers = {
11+
onBootstrapEvent: noop as (app: 'api' | 'microservices') => MaybePromise<void>,
12+
onShutdownEvent: noop as () => MaybePromise<void>,
13+
onConfigUpdateEvent: noop as (update: SystemConfigUpdate) => MaybePromise<void>,
14+
onConfigValidateEvent: noop as (update: SystemConfigUpdate) => MaybePromise<void>,
15+
};
16+
17+
export type SystemConfigUpdate = { newConfig: SystemConfig; oldConfig: SystemConfig };
18+
export type EventHandlers = typeof dummyHandlers;
19+
export type EmitEvent = keyof EventHandlers;
20+
export type EmitEventHandler<T extends EmitEvent> = (...args: Parameters<EventHandlers[T]>) => MaybePromise<void>;
21+
export const events = Object.keys(dummyHandlers) as EmitEvent[];
22+
export type OnEvents = Partial<EventHandlers>;
23+
724
export enum ClientEvent {
825
UPLOAD_SUCCESS = 'on_upload_success',
926
USER_DELETE = 'on_user_delete',
@@ -44,15 +61,10 @@ export interface ServerEventMap {
4461
[ServerEvent.WEBSOCKET_CONNECT]: { userId: string };
4562
}
4663

47-
export enum ServerAsyncEvent {
48-
CONFIG_VALIDATE = 'config.validate',
49-
}
50-
51-
export interface ServerAsyncEventMap {
52-
[ServerAsyncEvent.CONFIG_VALIDATE]: { newConfig: SystemConfig; oldConfig: SystemConfig };
53-
}
54-
5564
export interface IEventRepository {
65+
on<T extends EmitEvent>(event: T, handler: EmitEventHandler<T>): void;
66+
emit<T extends EmitEvent>(event: T, ...args: Parameters<EmitEventHandler<T>>): Promise<void>;
67+
5668
/**
5769
* Send to connected clients for a specific user
5870
*/
@@ -65,8 +77,4 @@ export interface IEventRepository {
6577
* Notify listeners in this and connected processes. Subscribe to an event with `@OnServerEvent`
6678
*/
6779
serverSend<E extends keyof ServerEventMap>(event: E, data: ServerEventMap[E]): boolean;
68-
/**
69-
* Notify and wait for responses from listeners in this process. Subscribe to an event with `@OnServerEvent`
70-
*/
71-
serverSendAsync<E extends keyof ServerAsyncEventMap>(event: E, data: ServerAsyncEventMap[E]): Promise<any>;
7280
}

Diff for: server/src/middleware/auth.guard.ts

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export enum Metadata {
2020
ADMIN_ROUTE = 'admin_route',
2121
SHARED_ROUTE = 'shared_route',
2222
API_KEY_SECURITY = 'api_key',
23+
EVENT_HANDLER_OPTIONS = 'event_handler_options',
2324
}
2425

2526
type AdminRoute = { admin?: true };

Diff for: server/src/repositories/event.repository.ts

+16-5
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import {
1010
import { Server, Socket } from 'socket.io';
1111
import {
1212
ClientEventMap,
13+
EmitEvent,
14+
EmitEventHandler,
1315
IEventRepository,
14-
ServerAsyncEventMap,
1516
ServerEvent,
1617
ServerEventMap,
1718
} from 'src/interfaces/event.interface';
@@ -27,6 +28,8 @@ import { Instrumentation } from 'src/utils/instrumentation';
2728
})
2829
@Injectable()
2930
export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect, OnGatewayInit, IEventRepository {
31+
private emitHandlers: Partial<Record<EmitEvent, EmitEventHandler<EmitEvent>[]>> = {};
32+
3033
@WebSocketServer()
3134
private server?: Server;
3235

@@ -71,6 +74,18 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
7174
await client.leave(client.nsp.name);
7275
}
7376

77+
on<T extends EmitEvent>(event: T, handler: EmitEventHandler<T>): void {
78+
const handlers: EmitEventHandler<EmitEvent>[] = this.emitHandlers[event] || [];
79+
this.emitHandlers[event] = [...handlers, handler];
80+
}
81+
82+
async emit<T extends EmitEvent>(event: T, ...args: Parameters<EmitEventHandler<T>>): Promise<void> {
83+
const handlers = this.emitHandlers[event] || [];
84+
for (const handler of handlers) {
85+
await handler(...args);
86+
}
87+
}
88+
7489
clientSend<E extends keyof ClientEventMap>(event: E, userId: string, data: ClientEventMap[E]) {
7590
this.server?.to(userId).emit(event, data);
7691
}
@@ -84,8 +99,4 @@ export class EventRepository implements OnGatewayConnection, OnGatewayDisconnect
8499
this.server?.serverSideEmit(event, data);
85100
return this.eventEmitter.emit(event, data);
86101
}
87-
88-
serverSendAsync<E extends keyof ServerAsyncEventMap, R = any[]>(event: E, data: ServerAsyncEventMap[E]): Promise<R> {
89-
return this.eventEmitter.emitAsync(event, data) as Promise<R>;
90-
}
91102
}

Diff for: server/src/services/api.service.ts

-17
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,8 @@ import { join } from 'node:path';
66
import { ONE_HOUR, WEB_ROOT } from 'src/constants';
77
import { ILoggerRepository } from 'src/interfaces/logger.interface';
88
import { AuthService } from 'src/services/auth.service';
9-
import { DatabaseService } from 'src/services/database.service';
109
import { JobService } from 'src/services/job.service';
11-
import { ServerInfoService } from 'src/services/server-info.service';
1210
import { SharedLinkService } from 'src/services/shared-link.service';
13-
import { StorageService } from 'src/services/storage.service';
14-
import { SystemConfigService } from 'src/services/system-config.service';
1511
import { VersionService } from 'src/services/version.service';
1612
import { OpenGraphTags } from 'src/utils/misc';
1713

@@ -39,12 +35,8 @@ const render = (index: string, meta: OpenGraphTags) => {
3935
export class ApiService {
4036
constructor(
4137
private authService: AuthService,
42-
private configService: SystemConfigService,
4338
private jobService: JobService,
44-
private serverService: ServerInfoService,
4539
private sharedLinkService: SharedLinkService,
46-
private storageService: StorageService,
47-
private databaseService: DatabaseService,
4840
private versionService: VersionService,
4941
@Inject(ILoggerRepository) private logger: ILoggerRepository,
5042
) {
@@ -61,15 +53,6 @@ export class ApiService {
6153
await this.jobService.handleNightlyJobs();
6254
}
6355

64-
async init() {
65-
await this.databaseService.init();
66-
await this.configService.init();
67-
this.storageService.init();
68-
await this.serverService.init();
69-
await this.versionService.init();
70-
this.logger.log(`Feature Flags: ${JSON.stringify(await this.serverService.getFeatures(), null, 2)}`);
71-
}
72-
7356
ssr(excludePaths: string[]) {
7457
let index = '';
7558
try {

0 commit comments

Comments
 (0)