Skip to content

Commit

Permalink
feature(framework): new filesystem debugger view
Browse files Browse the repository at this point in the history
  • Loading branch information
marcj committed Oct 17, 2023
1 parent 4c084dd commit 12e4ca3
Show file tree
Hide file tree
Showing 43 changed files with 2,440 additions and 196 deletions.
3 changes: 1 addition & 2 deletions packages/angular-universal/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@
"@deepkit/http": "^1.0.1-alpha.102",
"@deepkit/injector": "^1.0.1-alpha.102",
"@deepkit/logger": "^1.0.1-alpha.102",
"@deepkit/type": "^1.0.1-alpha.102",
"@types/node": "^14.0.0"
"@deepkit/type": "^1.0.1-alpha.102"
},
"jest": {
"transform": {
Expand Down
23 changes: 22 additions & 1 deletion packages/app/src/module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,27 @@ export function createModule<T extends CreateModuleDefinition>(options: T, name:

export type ListenerType = EventListener<any> | ClassType;

/**
* The AppModule is the base class for all modules.
*
* You can use `createModule` to create a new module class or extend from `AppModule` manually.
*
* @example
* ```typescript
*
* class MyModule extends AppModule {
* providers = [MyService];
* exports = [MyService];
*
* constructor(config: MyConfig) {
* super();
* this.setConfigDefinition(MyConfig);
* this.configure(config);
* this.name = 'myModule';
* }
* }
*
*/
export class AppModule<T extends RootModuleDefinition = {}, C extends ExtractClassType<T['config']> = any> extends InjectorModule<C, AppModule<any>> {
public setupConfigs: ((module: AppModule<any>, config: any) => void)[] = [];

Expand All @@ -244,7 +265,7 @@ export class AppModule<T extends RootModuleDefinition = {}, C extends ExtractCla
public uses: ((...args: any[]) => void)[] = [];

constructor(
public options: T,
public options: T = {} as T,
public name: string = '',
public setups: ((module: AppModule<any>, config: any) => void)[] = [],
public id: number = moduleId++,
Expand Down
1 change: 1 addition & 0 deletions packages/broker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@
export * from './src/client.js';
export * from './src/kernel.js';
export * from './src/model.js';
export * from './src/broker.js';
4 changes: 3 additions & 1 deletion packages/broker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,16 @@
"@deepkit/core-rxjs": "^1.0.1-alpha.39",
"@deepkit/rpc": "^1.0.1-alpha.41",
"@deepkit/type": "^1.0.1-alpha.40",
"@deepkit/event": "^1.0.1-alpha.40",
"rxjs": "*"
},
"devDependencies": {
"@deepkit/bson": "^1.0.1-alpha.102",
"@deepkit/core": "^1.0.1-alpha.100",
"@deepkit/core-rxjs": "^1.0.1-alpha.100",
"@deepkit/rpc": "^1.0.1-alpha.102",
"@deepkit/type": "^1.0.1-alpha.102"
"@deepkit/type": "^1.0.1-alpha.102",
"@deepkit/event": "^1.0.1-alpha.102"
},
"jest": {
"transform": {
Expand Down
70 changes: 70 additions & 0 deletions packages/broker/src/adapters/memory-adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { BrokerAdapter, BrokerCacheOptions, BrokerLockOptions } from '../broker.js';
import { Type } from '@deepkit/type';
import { ProcessLock } from '@deepkit/core';

export class BrokerMemoryAdapter implements BrokerAdapter {
protected cache: { [key: string]: any } = {};
protected channels: { [key: string]: ((m: any) => void)[] } = {};
protected locks: { [key: string]: ProcessLock } = {};

async disconnect(): Promise<void> {
}

async lock(id: string, options: BrokerLockOptions): Promise<void> {
const lock = new ProcessLock(id);
await lock.acquire(options.ttl, options.timeout);
this.locks[id] = lock;
}

async tryLock(id: string, options: BrokerLockOptions): Promise<boolean> {
const lock = new ProcessLock(id);
if (lock.tryLock(options.ttl)) {
this.locks[id] = lock;
return true;
}
return false;
}

async release(id: string): Promise<void> {
if (this.locks[id]) {
this.locks[id].unlock();
delete this.locks[id];
}
}

async getCache(key: string): Promise<any> {
return this.cache[key];
}

async setCache(key: string, value: any, options: BrokerCacheOptions) {
this.cache[key] = value;
}

async increase(key: string, value: any): Promise<void> {
if (!(key in this.cache)) this.cache[key] = 0;
this.cache[key] += value;
}

async subscribe(key: string, callback: (message: any) => void, type: Type): Promise<{ unsubscribe: () => Promise<void> }> {
if (!(key in this.channels)) this.channels[key] = [];
const fn = (m: any) => {
callback(m);
};
this.channels[key].push(fn);

return {
unsubscribe: async () => {
const index = this.channels[key].indexOf(fn);
if (index !== -1) this.channels[key].splice(index, 1);
}
};
}

async publish<T>(key: string, message: T): Promise<void> {
if (!(key in this.channels)) return;
for (const callback of this.channels[key]) {
callback(message);
}
}
}

209 changes: 209 additions & 0 deletions packages/broker/src/broker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
import { ReceiveType, reflect, ReflectionKind, resolveReceiveType, Type } from '@deepkit/type';
import { EventToken } from '@deepkit/event';

export interface BrokerLockOptions {
/**
* Time to live in seconds. Default 2 minutes.
*
* The lock is automatically released after this time.
* This is to prevent deadlocks.
*/
ttl: number;

/**
* Timeout when acquiring the lock in seconds. Default 30 seconds.
* Ween a lock is not acquired after this time, an error is thrown.
*/
timeout: number;
}

export interface BrokerAdapter {
lock(id: string, options: BrokerLockOptions): Promise<void>;

tryLock(id: string, options: BrokerLockOptions): Promise<boolean>;

release(id: string): Promise<void>;

getCache(key: string, type: Type): Promise<any>;

setCache(key: string, value: any, options: BrokerCacheOptions, type: Type): Promise<void>;

increase(key: string, value: any): Promise<void>;

publish(key: string, message: any, type: Type): Promise<void>;

subscribe(key: string, callback: (message: any) => void, type: Type): Promise<{ unsubscribe: () => Promise<void> }>;

disconnect(): Promise<void>;
}

export const onBrokerLock = new EventToken('broker.lock');

export interface BrokerCacheOptions {
ttl: number;
tags: string[];
}

export class CacheError extends Error {
}

export type BrokerBusChannel<Type, Channel extends string, Parameters extends object = {}> = [Channel, Parameters, Type];

export type BrokerCacheKey<Type, Key extends string, Parameters extends object = {}> = [Key, Parameters, Type];

export type CacheBuilder<T extends BrokerCacheKey<any, any, any>> = (parameters: T[1], options: BrokerCacheOptions) => T[2] | Promise<T[2]>;

export class BrokerBus<T> {
constructor(
private channel: string,
private adapter: BrokerAdapter,
private type: Type,
) {
}

async publish<T>(message: T) {
return this.adapter.publish(this.channel, message, this.type);
}

async subscribe(callback: (message: T) => void): Promise<{ unsubscribe: () => Promise<void> }> {
return this.adapter.subscribe(this.channel, callback, this.type);
}
}

export class BrokerCache<T extends BrokerCacheKey<any, any, any>> {
constructor(
private key: string,
private builder: CacheBuilder<T>,
private options: BrokerCacheOptions,
private adapter: BrokerAdapter,
private type: Type,
) {
}

protected getCacheKey(parameters: T[1]): string {
//this.key contains parameters e.g. user/:id, id comes from parameters.id. let's replace all of it.
//note: we could create JIT function for this, but it's probably not worth it.
return this.key.replace(/:([a-zA-Z0-9_]+)/g, (v, name) => {
if (!(name in parameters)) throw new CacheError(`Parameter ${name} not given`);
return String(parameters[name]);
});
}

async set(parameters: T[1], value: T[2], options: Partial<BrokerCacheOptions> = {}) {
const cacheKey = this.getCacheKey(parameters);
await this.adapter.setCache(cacheKey, value, { ...this.options, ...options }, this.type);
}

async increase(parameters: T[1], value: number) {
const cacheKey = this.getCacheKey(parameters);
await this.adapter.increase(cacheKey, value);
}

async get(parameters: T[1]): Promise<T[2]> {
const cacheKey = this.getCacheKey(parameters);
let entry = await this.adapter.getCache(cacheKey, this.type);
if (entry !== undefined) return entry;

const options: BrokerCacheOptions = { ...this.options };
entry = await this.builder(parameters, options);
await this.adapter.setCache(cacheKey, entry, options, this.type);

return entry;
}
}

export class BrokerLock {
public acquired: boolean = false;

constructor(
private id: string,
private adapter: BrokerAdapter,
private options: BrokerLockOptions,
) {
}

async acquire(): Promise<void> {
await this.adapter.lock(this.id, this.options);
this.acquired = true;
}

async try(): Promise<boolean> {
if (this.acquired) return true;

return this.acquired = await this.adapter.tryLock(this.id, this.options);
}

async release(): Promise<void> {
this.acquired = false;
await this.adapter.release(this.id);
}
}

export class Broker {
constructor(
private readonly adapter: BrokerAdapter
) {
}

public lock(id: string, options: Partial<BrokerLockOptions> = {}): BrokerLock {
return new BrokerLock(id, this.adapter, Object.assign({ ttl: 60*2, timeout: 30 }, options));
}

public disconnect(): Promise<void> {
return this.adapter.disconnect();
}

protected cacheProvider: { [path: string]: (...args: any[]) => any } = {};

public provideCache<T extends BrokerCacheKey<any, any, any>>(provider: (options: T[1]) => T[2] | Promise<T[2]>, type?: ReceiveType<T>) {
type = resolveReceiveType(type);
if (type.kind !== ReflectionKind.tuple) throw new CacheError(`Invalid type given`);
if (type.types[0].type.kind !== ReflectionKind.literal) throw new CacheError(`Invalid type given`);
const path = String(type.types[0].type.literal);
this.cacheProvider[path] = provider;
}

public cache<T extends BrokerCacheKey<any, any, any>>(type?: ReceiveType<T>): BrokerCache<T> {
type = resolveReceiveType(type);
if (type.kind !== ReflectionKind.tuple) throw new CacheError(`Invalid type given`);
if (type.types[0].type.kind !== ReflectionKind.literal) throw new CacheError(`Invalid type given`);
const path = String(type.types[0].type.literal);
const provider = this.cacheProvider[path];
if (!provider) throw new CacheError(`No cache provider for cache ${type.typeName} (${path}) registered`);

return new BrokerCache<T>(path, provider, { ttl: 30, tags: [] }, this.adapter, type.types[2].type);
}

public async get<T>(key: string, builder: (options: BrokerCacheOptions) => Promise<T>, type?: ReceiveType<T>): Promise<T> {
if (!type) {
//type not manually provided via Broker.get<Type>, so we try to extract it from the builder.
const fn = reflect(builder);
if (fn.kind !== ReflectionKind.function) throw new CacheError(`Can not detect type of builder function`);
type = fn.return;
while (type.kind === ReflectionKind.promise) type = type.type;
} else {
type = resolveReceiveType(type);
}

const cache = this.adapter.getCache(key, type);
if (cache !== undefined) return cache;

const options: BrokerCacheOptions = { ttl: 30, tags: [] };
const value = builder(options);
await this.adapter.setCache(key, value, options, type);
return value;
}

public bus<T extends BrokerBusChannel<any, any>>(type?: ReceiveType<T>): BrokerBus<T[2]> {
type = resolveReceiveType(type);
if (type.kind !== ReflectionKind.tuple) throw new CacheError(`Invalid type given`);
if (type.types[0].type.kind !== ReflectionKind.literal) throw new CacheError(`Invalid type given`);
const path = String(type.types[0].type.literal);

return new BrokerBus(path, this.adapter, type.types[2].type);
}

public queue<T>(channel: string, type?: ReceiveType<T>) {

}
}
1 change: 0 additions & 1 deletion packages/broker/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,6 @@ export class BrokerClient extends RpcBaseClient {
}
}


export class BrokerDirectClient extends BrokerClient {
constructor(rpcKernel: BrokerKernel) {
super(new RpcDirectClientAdapter(rpcKernel));
Expand Down
Loading

0 comments on commit 12e4ca3

Please sign in to comment.