diff --git a/00_Base/src/interfaces/router/AbstractRouter.ts b/00_Base/src/interfaces/router/AbstractRouter.ts index e6f4ff525..dde4a694b 100644 --- a/00_Base/src/interfaces/router/AbstractRouter.ts +++ b/00_Base/src/interfaces/router/AbstractRouter.ts @@ -4,6 +4,7 @@ // SPDX-License-Identifier: Apache 2.0 import Ajv, { ErrorObject } from "ajv"; + import { Call, CallAction, CallResult, ICache, SystemConfig, CALL_SCHEMA_MAP, CALL_RESULT_SCHEMA_MAP, IMessageConfirmation, MessageOrigin, OcppError, OcppRequest, OcppResponse, IMessageHandler, IMessageSender, IMessage, MessageState } from "../.."; import { ILogObj, Logger } from "tslog"; import { IMessageRouter } from "./Router"; @@ -23,7 +24,7 @@ export abstract class AbstractMessageRouter implements IMessageRouter { protected _networkHook: (identifier: string, message: string) => Promise; /** - * Constructor of abstract central system. + * Constructor of abstract ocpp router. * * @param {Ajv} ajv - The Ajv instance to use for schema validation. */ @@ -69,7 +70,7 @@ export abstract class AbstractMessageRouter implements IMessageRouter { set config(config: SystemConfig) { this._config = config; // Update all necessary settings for hot reload - this._logger.info(`Updating system configuration for central system...`); + this._logger.info(`Updating system configuration for ocpp router...`); this._logger.settings.minLevel = this._config.logLevel; } @@ -79,39 +80,19 @@ export abstract class AbstractMessageRouter implements IMessageRouter { abstract onMessage(identifier: string, message: string): Promise; + abstract registerConnection(connectionIdentifier: string): Promise; + abstract deregisterConnection(connectionIdentifier: string): Promise; + abstract sendCall(identifier: string, tenantId: string, action: CallAction, payload: OcppRequest, correlationId?: string, origin?: MessageOrigin): Promise; abstract sendCallResult(correlationId: string, identifier: string, tenantId: string, action: CallAction, payload: OcppResponse, origin?: MessageOrigin): Promise; abstract sendCallError(correlationId: string, identifier: string, tenantId: string, action: CallAction, error: OcppError, origin?: MessageOrigin): Promise; - abstract shutdown(): void; /** * Public Methods */ - async registerConnection(connectionIdentifier: string): Promise { - const requestSubscription = await this._handler.subscribe(connectionIdentifier, undefined, { - stationId: connectionIdentifier, - state: MessageState.Request.toString(), - origin: MessageOrigin.CentralSystem.toString() - }); - - const responseSubscription = await this._handler.subscribe(connectionIdentifier, undefined, { - stationId: connectionIdentifier, - state: MessageState.Response.toString(), - origin: MessageOrigin.ChargingStation.toString() - }); - - return requestSubscription && responseSubscription; - } - - async deregisterConnection(connectionIdentifier: string): Promise { - // TODO: ensure that all queue implementations in 02_Util only unsubscribe 1 queue per call - // ...which will require refactoring this method to unsubscribe request and response queues separately - return await this._handler.unsubscribe(connectionIdentifier) - } - async handle(message: IMessage): Promise { this._logger.debug("Received message:", message); diff --git a/00_Base/src/interfaces/router/Router.ts b/00_Base/src/interfaces/router/Router.ts index bfdb54c2a..255a93a0b 100644 --- a/00_Base/src/interfaces/router/Router.ts +++ b/00_Base/src/interfaces/router/Router.ts @@ -6,7 +6,7 @@ import { IModule } from "../.."; /** - * Interface for the central system + * Interface for the ocpp router */ export interface IMessageRouter extends IModule { /** diff --git a/00_Base/src/ocpp/persistence/namespace.ts b/00_Base/src/ocpp/persistence/namespace.ts index 0ce53fb86..1554e8803 100644 --- a/00_Base/src/ocpp/persistence/namespace.ts +++ b/00_Base/src/ocpp/persistence/namespace.ts @@ -19,11 +19,12 @@ export enum Namespace { MeterValueType = 'MeterValue', ModemType = 'Modem', SecurityEventNotificationRequest = 'SecurityEvent', + Subscription = 'Subscription', + SystemConfig = 'SystemConfig', TransactionEventRequest = 'TransactionEvent', TransactionType = 'Transaction', VariableAttributeType = 'VariableAttribute', VariableCharacteristicsType = 'VariableCharacteristics', VariableStatus = 'VariableStatus', - VariableType = 'Variable', - SystemConfig = 'SystemConfig' + VariableType = 'Variable' } \ No newline at end of file diff --git a/02_Util/src/networkconnection/WebsocketNetworkConnection.ts b/02_Util/src/networkconnection/WebsocketNetworkConnection.ts index 1101cfec0..9c3a2e3cc 100644 --- a/02_Util/src/networkconnection/WebsocketNetworkConnection.ts +++ b/02_Util/src/networkconnection/WebsocketNetworkConnection.ts @@ -20,9 +20,6 @@ export class WebsocketNetworkConnection { private _httpServers: (http.Server | https.Server)[]; private _authenticator: IAuthenticator; private _router: IMessageRouter; - // private _onConnectionCallbacks: ((identifier: string, info?: Map) => Promise)[] = []; - // private _onCloseCallbacks: ((identifier: string, info?: Map) => Promise)[] = []; - // private _onMessageCallbacks: ((identifier: string, message: string, info?: Map) => Promise)[] = []; constructor( config: SystemConfig, @@ -87,18 +84,6 @@ export class WebsocketNetworkConnection { }); } - // addOnConnectionCallback(onConnectionCallback: (identifier: string, info?: Map) => Promise): void { - // this._onConnectionCallbacks.push(onConnectionCallback); - // } - - // addOnCloseCallback(onCloseCallback: (identifier: string, info?: Map) => Promise): void { - // this._onCloseCallbacks.push(onCloseCallback); - // } - - // addOnMessageCallback(onMessageCallback: (identifier: string, message: string, info?: Map) => Promise): void { - // this._onMessageCallbacks.push(onMessageCallback); - // } - /** * Send a message to the charging station specified by the identifier. * @@ -107,29 +92,34 @@ export class WebsocketNetworkConnection { * @return {boolean} True if the method sends the message successfully, false otherwise. */ sendMessage(identifier: string, message: string): Promise { - return this._cache.get(identifier, CacheNamespace.Connections).then(clientConnection => { - if (clientConnection) { - const websocketConnection = this._identifierConnections.get(identifier); - if (websocketConnection && websocketConnection.readyState === WebSocket.OPEN) { - websocketConnection.send(message, (error) => { - if (error) { - this._logger.error("On message send error", error); - } - }); // TODO: Handle errors - // TODO: Embed error handling into websocket message flow - return true; + return new Promise((resolve, reject) => { + this._cache.get(identifier, CacheNamespace.Connections).then(clientConnection => { + if (clientConnection) { + const websocketConnection = this._identifierConnections.get(identifier); + if (websocketConnection && websocketConnection.readyState === WebSocket.OPEN) { + websocketConnection.send(message, (error) => { + if (error) { + this._logger.error("On message send error", error); + reject(error); // Reject the promise with the error + } else { + resolve(true); // Resolve the promise with true indicating success + } + }); + } else { + const errorMsg = "Websocket connection is not ready - " + identifier; + this._logger.fatal(errorMsg); + websocketConnection?.close(1011, errorMsg); + reject(new Error(errorMsg)); // Reject with a new error + } } else { - this._logger.fatal("Websocket connection is not ready -", identifier); - websocketConnection?.close(1011, "Websocket connection is not ready - " + identifier); - return false; + const errorMsg = "Cannot identify client connection for " + identifier; + // This can happen when a charging station disconnects in the moment a message is trying to send. + // Retry logic on the message sender might not suffice as charging station might connect to different instance. + this._logger.error(errorMsg); + this._identifierConnections.get(identifier)?.close(1011, "Failed to get connection information for " + identifier); + reject(new Error(errorMsg)); // Reject with a new error } - } else { - // This can happen when a charging station disconnects in the moment a message is trying to send. - // Retry logic on the message sender might not suffice as charging station might connect to different instance. - this._logger.error("Cannot identify client connection for", identifier); - this._identifierConnections.get(identifier)?.close(1011, "Failed to get connection information for " + identifier); - return false; - } + }).catch(reject); // In case `_cache.get` fails }); } @@ -248,11 +238,6 @@ export class WebsocketNetworkConnection { this._router.registerConnection(identifier); - // await this._onConnectionCallbacks.forEach(async callback => { - // const info = new Map([["ip", ip], ["port", port.toString()]]); - // await callback(identifier, info); - // }); - this._logger.info("Successfully connected new charging station.", identifier); // Register all websocket events @@ -290,9 +275,6 @@ export class WebsocketNetworkConnection { this._cache.remove(identifier, CacheNamespace.Connections); this._identifierConnections.delete(identifier); this._router.deregisterConnection(identifier); - // this._onCloseCallbacks.forEach(callback => { - // callback(identifier); - // }); }); ws.on("pong", async () => { @@ -321,9 +303,6 @@ export class WebsocketNetworkConnection { */ private _onMessage(identifier: string, message: string): void { this._router.onMessage(identifier, message); - // this._onMessageCallbacks.forEach(callback => { - // callback(identifier, message); - // }); } /** diff --git a/02_Util/src/networkconnection/index.ts b/02_Util/src/networkconnection/index.ts index e2bda37fc..4b53163f0 100644 --- a/02_Util/src/networkconnection/index.ts +++ b/02_Util/src/networkconnection/index.ts @@ -4,4 +4,3 @@ export { Authenticator } from "./authenticator/Authenticator" export { WebsocketNetworkConnection } from "./WebsocketNetworkConnection" -export { MessageRouterImpl } from "./router/MessageRouter" \ No newline at end of file diff --git a/03_Modules/Configuration/src/module/module.ts b/03_Modules/Configuration/src/module/module.ts index dffc1b633..b7db55f39 100644 --- a/03_Modules/Configuration/src/module/module.ts +++ b/03_Modules/Configuration/src/module/module.ts @@ -354,7 +354,7 @@ export class ConfigurationModule extends AbstractModule { } // Handle post-response actions if (bootNotificationResponseMessageConfirmation.success) { - this._logger.debug("BootNotification response successfully sent to central system: ", bootNotificationResponseMessageConfirmation); + this._logger.debug("BootNotification response successfully sent to ocpp router: ", bootNotificationResponseMessageConfirmation); // Update charger-specific boot config with details of most recently sent BootNotificationResponse let bootConfigDbEntity: Boot | undefined = await this._bootRepository.readByKey(stationId); diff --git a/03_Modules/OcppRouter/.eslintrc.js b/03_Modules/OcppRouter/.eslintrc.js new file mode 100644 index 000000000..acaf724fa --- /dev/null +++ b/03_Modules/OcppRouter/.eslintrc.js @@ -0,0 +1,11 @@ +module.exports = { + "env": { + "browser": false, + "es2021": true + }, + "extends": "eslint:recommended", + "parserOptions": { + "ecmaVersion": "latest", + "sourceType": "module" + }, +} diff --git a/03_Modules/OcppRouter/package.json b/03_Modules/OcppRouter/package.json new file mode 100644 index 000000000..9a929078f --- /dev/null +++ b/03_Modules/OcppRouter/package.json @@ -0,0 +1,40 @@ +{ + "name": "@citrineos/ocpprouter", + "version": "1.0.0", + "description": "The ocpprouter module for OCPP v2.0.1. This module is not intended to be used directly, but rather as a dependency for other modules.", + "main": "lib/index.js", + "types": "lib/index.d.ts", + "files": [ + "lib" + ], + "scripts": { + "prepublish": "npx eslint", + "prepare": "npm run build", + "build": "tsc", + "refresh-base": "cd ../../00_Base && npm run build && npm pack && cd ../03_Modules/OcppRouter && npm install ../../00_Base/citrineos-base-1.0.0.tgz", + "refresh-data": "cd ../../01_Data && npm run build && npm pack && cd ../03_Modules/OcppRouter && npm install ../../01_Data/citrineos-data-1.0.0.tgz", + "refresh-util": "cd ../../02_Util && npm run build && npm pack && cd ../03_Modules/OcppRouter && npm install ../../02_Util/citrineos-util-1.0.0.tgz", + "install-all": "npm install ../../00_Base/citrineos-base-1.0.0.tgz && npm install ../../02_Util/citrineos-util-1.0.0.tgz && npm install ../../01_Data/citrineos-data-1.0.0.tgz", + "test": "echo \"Error: no test specified\" && exit 1" + }, + "keywords": [ + "ocpp", + "ocpp_v201" + ], + "license": "Apache-2.0", + "devDependencies": { + "@types/deasync-promise": "^1.0.0", + "@types/node-forge": "^1.3.1", + "@types/uuid": "^9.0.1", + "eslint": "^8.48.0", + "typescript": "^5.0.4" + }, + "dependencies": { + "@citrineos/base": "file:../../00_Base/citrineos-base-1.0.0.tgz", + "@citrineos/data": "file:../../01_Data/citrineos-data-1.0.0.tgz", + "@citrineos/util": "file:../../02_Util/citrineos-util-1.0.0.tgz", + "fastify": "^4.22.2", + "node-forge": "^1.3.1", + "uuid": "^9.0.0" + } +} \ No newline at end of file diff --git a/03_Modules/OcppRouter/src/index.ts b/03_Modules/OcppRouter/src/index.ts new file mode 100644 index 000000000..82f6922b8 --- /dev/null +++ b/03_Modules/OcppRouter/src/index.ts @@ -0,0 +1,8 @@ +// Copyright (c) 2023 S44, LLC +// Copyright Contributors to the CitrineOS Project +// +// SPDX-License-Identifier: Apache 2.0 + +export { AdminApi } from './module/api'; +export { IAdminApi } from './module/interface'; +export { MessageRouterImpl } from './module/router'; \ No newline at end of file diff --git a/03_Modules/OcppRouter/src/module/api.ts b/03_Modules/OcppRouter/src/module/api.ts new file mode 100644 index 000000000..b933d7c58 --- /dev/null +++ b/03_Modules/OcppRouter/src/module/api.ts @@ -0,0 +1,136 @@ +// Copyright (c) 2023 S44, LLC +// Copyright Contributors to the CitrineOS Project +// +// SPDX-License-Identifier: Apache 2.0 + +import { AbstractModuleApi, AsDataEndpoint, CallAction, HttpMethod, MessageOrigin, Namespace } from '@citrineos/base'; +import { FastifyInstance, FastifyRequest } from 'fastify'; +import { ILogObj, Logger } from 'tslog'; +import { IAdminApi } from './interface'; +import { MessageRouterImpl, Subscription } from './router'; + +/** + * Server API for the Certificates module. + */ +export class AdminApi extends AbstractModuleApi implements IAdminApi { + + /** + * Constructs a new instance of the class. + * + * @param {MessageRouterImpl} ocppRouter - The OcppRouter module. + * @param {FastifyInstance} server - The Fastify server instance. + * @param {Logger} [logger] - The logger instance. + */ + constructor(ocppRouter: MessageRouterImpl, server: FastifyInstance, logger?: Logger) { + super(ocppRouter, server, logger); + } + + /** + * Data endpoints + */ + + @AsDataEndpoint(Namespace.Subscription, HttpMethod.Put) + async putSubscription(request: FastifyRequest<{ Body: Subscription }>): Promise { + if (request.body.onConnect) { + this._module.addOnConnectionCallback(async (identifier: string) => { + if (identifier == request.body.stationId) { + return fetch(request.body.url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ stationId: identifier, event: 'connected' }) + }).then(res => res.status === 200).catch(error => { + this._logger.error(error); + return false; + }); + } else { // Ignore + return true; + } + }); + this._logger.debug(`Added onConnect callback to ${request.body.url} for station ${request.body.stationId}`); + } + if (request.body.onClose) { + this._module.addOnCloseCallback(async (identifier: string) => { + if (identifier == request.body.stationId) { + return fetch(request.body.url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ stationId: identifier, event: 'closed' }) + }).then(res => res.status === 200).catch(error => { + this._logger.error(error); + return false; + }); + } else { // Ignore + return true; + } + }); + this._logger.debug(`Added onClose callback to ${request.body.url} for station ${request.body.stationId}`); + } + if (request.body.onMessage) { + this._module.addOnMessageCallback(async (identifier: string, message: string) => { + if (identifier == request.body.stationId && + (!request.body.messageOptions?.regexFilter || new RegExp(request.body.messageOptions.regexFilter).test(message))) { + return fetch(request.body.url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ stationId: identifier, event: 'message', origin: MessageOrigin.ChargingStation, message: message }) + }).then(res => res.status === 200).catch(error => { + this._logger.error(error); + return false; + }); + } else { // Ignore + return true; + } + }); + this._logger.debug(`Added onMessage callback to ${request.body.url} for station ${request.body.stationId}`); + } + if (request.body.sentMessage) { + this._module.addSentMessageCallback(async (identifier: string, message: string, error?: any) => { + if (identifier == request.body.stationId && + (!request.body.messageOptions?.regexFilter || new RegExp(request.body.messageOptions.regexFilter).test(message))) { + return fetch(request.body.url, { + method: 'POST', + headers: { + 'Content-Type': 'application/json' + }, + body: JSON.stringify({ stationId: identifier, event: 'message', origin: MessageOrigin.CentralSystem, message: message, error: error }) + }).then(res => res.status === 200).catch(error => { + this._logger.error(error); + return false; + }); + } else { // Ignore + return true; + } + }); + this._logger.debug(`Added sentMessage callback to ${request.body.url} for station ${request.body.stationId}`); + } + return; + } + + /** + * Overrides superclass method to generate the URL path based on the input {@link CallAction} and the module's endpoint prefix configuration. + * + * @param {CallAction} input - The input {@link CallAction}. + * @return {string} - The generated URL path. + */ + protected _toMessagePath(input: CallAction): string { + const endpointPrefix = '/ocpprouter'; + return super._toMessagePath(input, endpointPrefix); + } + + /** + * Overrides superclass method to generate the URL path based on the input {@link Namespace} and the module's endpoint prefix configuration. + * + * @param {CallAction} input - The input {@link Namespace}. + * @return {string} - The generated URL path. + */ + protected _toDataPath(input: Namespace): string { + const endpointPrefix = '/ocpprouter'; + return super._toDataPath(input, endpointPrefix); + } +} \ No newline at end of file diff --git a/03_Modules/OcppRouter/src/module/interface.ts b/03_Modules/OcppRouter/src/module/interface.ts new file mode 100644 index 000000000..436c81ce4 --- /dev/null +++ b/03_Modules/OcppRouter/src/module/interface.ts @@ -0,0 +1,11 @@ +// Copyright (c) 2023 S44, LLC +// Copyright Contributors to the CitrineOS Project +// +// SPDX-License-Identifier: Apache 2.0 + +/** + * TODO: add actual interface + * Interface for the admin api. + */ +export interface IAdminApi { +} \ No newline at end of file diff --git a/02_Util/src/networkconnection/router/MessageRouter.ts b/03_Modules/OcppRouter/src/module/router.ts similarity index 84% rename from 02_Util/src/networkconnection/router/MessageRouter.ts rename to 03_Modules/OcppRouter/src/module/router.ts index 8f0ded0be..5ac2bf917 100644 --- a/02_Util/src/networkconnection/router/MessageRouter.ts +++ b/03_Modules/OcppRouter/src/module/router.ts @@ -8,8 +8,20 @@ import Ajv from "ajv"; import { v4 as uuidv4 } from "uuid"; import { ILogObj, Logger } from "tslog"; +export interface Subscription { + stationId: string; + onConnect: boolean; + onClose: boolean; + onMessage: boolean; + sentMessage: boolean; + messageOptions?: { + regexFilter?: string; + } + url: string; +} + /** - * Implementation of the central system + * Implementation of the ocpp router */ export class MessageRouterImpl extends AbstractMessageRouter implements IMessageRouter { @@ -22,6 +34,11 @@ export class MessageRouterImpl extends AbstractMessageRouter implements IMessage protected _handler: IMessageHandler; protected _networkHook: (identifier: string, message: string) => Promise; + private _onConnectionCallbacks: ((identifier: string, info?: Map) => Promise)[] = []; + private _onCloseCallbacks: ((identifier: string, info?: Map) => Promise)[] = []; + private _onMessageCallbacks: ((identifier: string, message: string, info?: Map) => Promise)[] = []; + private _sentMessageCallbacks: ((identifier: string, message: string, error?: any, info?: Map) => Promise)[] = []; + /** * Constructor for the class. * @@ -43,41 +60,67 @@ export class MessageRouterImpl extends AbstractMessageRouter implements IMessage ) { super(config, cache, handler, sender, networkHook, logger, ajv); - // Initialize router before socket server to avoid race condition - // this._router = new OcppMessageRouter(cache, - // sender, - // handler); - - // networkConnection.addOnConnectionCallback((identifier: string) => - // this.registerConnection(identifier) - // ); - - // networkConnection.addOnCloseCallback((identifier: string) => - // this.deregisterConnection(identifier) - // ); - - // networkConnection.addOnMessageCallback((identifier: string, message: string) => - // this.onMessage(identifier, message) - // ); - this._cache = cache; this._sender = sender; this._handler = handler; this._networkHook = networkHook; } + addOnConnectionCallback(onConnectionCallback: (identifier: string, info?: Map) => Promise): void { + this._onConnectionCallbacks.push(onConnectionCallback); + } + + addOnCloseCallback(onCloseCallback: (identifier: string, info?: Map) => Promise): void { + this._onCloseCallbacks.push(onCloseCallback); + } + + addOnMessageCallback(onMessageCallback: (identifier: string, message: string, info?: Map) => Promise): void { + this._onMessageCallbacks.push(onMessageCallback); + } + + addSentMessageCallback(sentMessageCallback: (identifier: string, message: string, error: any, info?: Map) => Promise): void { + this._sentMessageCallbacks.push(sentMessageCallback); + } + /** * Interface implementation */ - shutdown(): void { - this._sender.shutdown(); - this._handler.shutdown(); + async registerConnection(connectionIdentifier: string): Promise { + await this._onConnectionCallbacks.forEach(async callback => { + await callback(connectionIdentifier); + }); + + const requestSubscription = await this._handler.subscribe(connectionIdentifier, undefined, { + stationId: connectionIdentifier, + state: MessageState.Request.toString(), + origin: MessageOrigin.CentralSystem.toString() + }); + + const responseSubscription = await this._handler.subscribe(connectionIdentifier, undefined, { + stationId: connectionIdentifier, + state: MessageState.Response.toString(), + origin: MessageOrigin.ChargingStation.toString() + }); + + return requestSubscription && responseSubscription; + } + + async deregisterConnection(connectionIdentifier: string): Promise { + this._onCloseCallbacks.forEach(callback => { + callback(connectionIdentifier); + }); + // TODO: ensure that all queue implementations in 02_Util only unsubscribe 1 queue per call + // ...which will require refactoring this method to unsubscribe request and response queues separately + return await this._handler.unsubscribe(connectionIdentifier) } // TODO: identifier may not be unique, may require combination of tenantId and identifier. // find way to include tenantId here async onMessage(identifier: string, message: string): Promise { + this._onMessageCallbacks.forEach(callback => { + callback(identifier, message); + }); let rpcMessage: any; let messageTypeId: MessageTypeId | undefined = undefined let messageId: string = "-1"; // OCPP 2.0.1 part 4, section 4.2.3, "When also the MessageId cannot be read, the CALLERROR SHALL contain "-1" as MessageId." @@ -109,7 +152,7 @@ export class MessageRouterImpl extends AbstractMessageRouter implements IMessage const callError = error instanceof OcppError ? error.asCallError() : [MessageTypeId.CallError, messageId, ErrorCode.InternalError, "Unable to process message", { error: error }]; const rawMessage = JSON.stringify(callError, (k, v) => v ?? undefined); - this._networkHook(identifier, rawMessage); + this._sendMessage(identifier, rawMessage); } // TODO: Publish raw payload for error reporting return false; @@ -130,7 +173,7 @@ export class MessageRouterImpl extends AbstractMessageRouter implements IMessage CacheNamespace.Transactions, this._config.maxCallLengthSeconds)) { // Intentionally removing NULL values from object for OCPP conformity const rawMessage = JSON.stringify(message, (k, v) => v ?? undefined); - const success = await this._networkHook(identifier, rawMessage); + const success = await this._sendMessage(identifier, rawMessage); return { success }; } else { this._logger.info("Call already in progress, throwing retry exception", identifier, message); @@ -161,7 +204,7 @@ export class MessageRouterImpl extends AbstractMessageRouter implements IMessage // Intentionally removing NULL values from object for OCPP conformity const rawMessage = JSON.stringify(message, (k, v) => v ?? undefined); const success = await Promise.all([ - this._networkHook(identifier, rawMessage), + this._sendMessage(identifier, rawMessage), this._cache.remove(identifier, CacheNamespace.Transactions) ]).then(successes => successes.every(Boolean)); return { success }; @@ -190,7 +233,7 @@ export class MessageRouterImpl extends AbstractMessageRouter implements IMessage // Intentionally removing NULL values from object for OCPP conformity const rawMessage = JSON.stringify(message, (k, v) => v ?? undefined); const success = await Promise.all([ - this._networkHook(identifier, rawMessage), + this._sendMessage(identifier, rawMessage), this._cache.remove(identifier, CacheNamespace.Transactions) ]).then(successes => successes.every(Boolean)); return { success }; @@ -200,6 +243,11 @@ export class MessageRouterImpl extends AbstractMessageRouter implements IMessage } } + shutdown(): void { + this._sender.shutdown(); + this._handler.shutdown(); + } + /** * Private Methods */ @@ -343,6 +391,20 @@ export class MessageRouterImpl extends AbstractMessageRouter implements IMessage return this._cache.exists(action, identifier).then(blacklisted => !blacklisted); } + private async _sendMessage(identifier: string, rawMessage: string): Promise { + try { + const success = await this._networkHook(identifier, rawMessage); + this._sentMessageCallbacks.forEach(callback => { + callback(identifier, rawMessage); + }); + return success; + } catch (error) { + this._sentMessageCallbacks.forEach(callback => { + callback(identifier, rawMessage, error); + }); + return false; + } + } private async _sendCallIsAllowed(identifier: string, message: Call): Promise { const status = await this._cache.get(BOOT_STATUS, identifier); diff --git a/03_Modules/OcppRouter/tsconfig.json b/03_Modules/OcppRouter/tsconfig.json new file mode 100644 index 000000000..5cebe347b --- /dev/null +++ b/03_Modules/OcppRouter/tsconfig.json @@ -0,0 +1,21 @@ +{ + "compilerOptions": { + "target": "es6", + "module": "commonjs", + "skipLibCheck": true, + "experimentalDecorators": true, + "emitDecoratorMetadata": true, + "declaration": true, + "outDir": "lib", + "strict": true, + "resolveJsonModule": true, + "esModuleInterop": true + }, + "include": [ + "src" + ], + "exclude": [ + "node_modules", + "**/__tests__/*" + ] +} \ No newline at end of file diff --git a/Server/docker/Dockerfile b/Server/docker/Dockerfile index 24b9f2a68..23968440e 100644 --- a/Server/docker/Dockerfile +++ b/Server/docker/Dockerfile @@ -36,6 +36,19 @@ COPY /02_Util/src /usr/02_Util/src RUN npm run build --prefix /usr/02_Util RUN cd /usr/02_Util && npm pack +# Build citrineos-ocpprouter module +FROM base as citrineos-ocpprouter-builder +COPY --from=citrineos-base-builder /usr/00_Base/*.tgz /usr/00_Base/ +COPY --from=citrineos-data-builder /usr/01_Data/*.tgz /usr/01_Data/ +COPY --from=citrineos-util-builder /usr/02_Util/*.tgz /usr/02_Util/ +COPY /03_Modules/OcppRouter/package.json /usr/03_Modules/OcppRouter/ +RUN npm install --ignore-scripts=true --prefix /usr/03_Modules/OcppRouter + +COPY /03_Modules/OcppRouter/tsconfig.json /usr/03_Modules/OcppRouter/ +COPY /03_Modules/OcppRouter/src /usr/03_Modules/OcppRouter/src +RUN npm run build --prefix /usr/03_Modules/OcppRouter +RUN cd /usr/03_Modules/OcppRouter && npm pack + # Build citrineos-certificates module FROM base as citrineos-certificates-builder COPY --from=citrineos-base-builder /usr/00_Base/*.tgz /usr/00_Base/ @@ -135,6 +148,7 @@ WORKDIR /usr/server COPY --from=citrineos-base-builder /usr/00_Base/*.tgz /usr/00_Base/ COPY --from=citrineos-data-builder /usr/01_Data/*.tgz /usr/01_Data/ COPY --from=citrineos-util-builder /usr/02_Util/*.tgz /usr/02_Util/ +COPY --from=citrineos-ocpprouter-builder /usr/03_Modules/OcppRouter/*.tgz /usr/03_Modules/OcppRouter/ COPY --from=citrineos-certificates-builder /usr/03_Modules/Certificates/*.tgz /usr/03_Modules/Certificates/ COPY --from=citrineos-configuration-builder /usr/03_Modules/Configuration/*.tgz /usr/03_Modules/Configuration/ COPY --from=citrineos-evdriver-builder /usr/03_Modules/EVDriver/*.tgz /usr/03_Modules/EVDriver/ diff --git a/Server/package.json b/Server/package.json index bacb98be8..bd93acc10 100644 --- a/Server/package.json +++ b/Server/package.json @@ -9,6 +9,7 @@ "install-base": "cd ../00_Base && npm install && npm run build && npm pack && cd ../Server && npm install ../00_Base/citrineos-base-1.0.0.tgz", "install-util": "cd ../02_Util && npm install ../00_Base/citrineos-base-1.0.0.tgz && npm run build && npm pack && cd ../Server && npm install ../02_Util/citrineos-util-1.0.0.tgz", "install-data": "cd ../01_Data && npm install ../00_Base/citrineos-base-1.0.0.tgz && npm run build && npm pack && cd ../Server && npm install ../01_Data/citrineos-data-1.0.0.tgz", + "install-ocpprouter": "cd ../03_Modules/OcppRouter && npm run install-all && npm install && npm run build && npm pack && cd ../../Server && npm install ../03_Modules/OcppRouter/citrineos-ocpprouter-1.0.0.tgz", "install-certificates": "cd ../03_Modules/Certificates && npm run install-all && npm install && npm run build && npm pack && cd ../../Server && npm install ../03_Modules/Certificates/citrineos-certificates-1.0.0.tgz", "install-configuration": "cd ../03_Modules/Configuration && npm run install-all && npm install && npm run build && npm pack && cd ../../Server && npm install ../03_Modules/Configuration/citrineos-configuration-1.0.0.tgz", "install-evdriver": "cd ../03_Modules/EVDriver && npm run install-all && npm install && npm run build && npm pack && cd ../../Server && npm install ../03_Modules/EVDriver/citrineos-evdriver-1.0.0.tgz", @@ -47,6 +48,7 @@ "@citrineos/data": "file:../01_Data/citrineos-data-1.0.0.tgz", "@citrineos/evdriver": "file:../03_Modules/EVDriver/citrineos-evdriver-1.0.0.tgz", "@citrineos/monitoring": "file:../03_Modules/Monitoring/citrineos-monitoring-1.0.0.tgz", + "@citrineos/ocpprouter": "file:../03_Modules/OcppRouter/citrineos-ocpprouter-1.0.0.tgz", "@citrineos/reporting": "file:../03_Modules/Reporting/citrineos-reporting-1.0.0.tgz", "@citrineos/smartcharging": "file:../03_Modules/SmartCharging/citrineos-smartcharging-1.0.0.tgz", "@citrineos/transactions": "file:../03_Modules/Transactions/citrineos-transactions-1.0.0.tgz", diff --git a/Server/src/index.ts b/Server/src/index.ts index 99b7233a7..b92cd7b24 100644 --- a/Server/src/index.ts +++ b/Server/src/index.ts @@ -5,13 +5,14 @@ import { IAuthenticator, ICache, IMessageHandler, IMessageSender, IModule, IModuleApi, SystemConfig } from '@citrineos/base'; import { MonitoringModule, MonitoringModuleApi } from '@citrineos/monitoring'; -import { Authenticator, MemoryCache, MessageRouterImpl, RabbitMqReceiver, RabbitMqSender, WebsocketNetworkConnection, initSwagger } from '@citrineos/util'; +import { Authenticator, MemoryCache, RabbitMqReceiver, RabbitMqSender, WebsocketNetworkConnection, initSwagger } from '@citrineos/util'; import { JsonSchemaToTsProvider } from '@fastify/type-provider-json-schema-to-ts'; import Ajv from "ajv"; import addFormats from "ajv-formats" import fastify, { FastifyInstance } from 'fastify'; import { ILogObj, Logger } from 'tslog'; import { systemConfig } from './config'; +import { MessageRouterImpl, AdminApi } from '@citrineos/ocpprouter'; import { ConfigurationModule, ConfigurationModuleApi } from '@citrineos/configuration'; import { TransactionsModule, TransactionsModuleApi } from '@citrineos/transactions'; import { CertificatesModule, CertificatesModuleApi } from '@citrineos/certificates'; @@ -99,6 +100,7 @@ class CitrineOSServer { const reportingModule = new ReportingModule(this._config, this._cache, this._createSender(), this._createHandler(), this._logger); const transactionsModule = new TransactionsModule(this._config, this._cache, this._createSender(), this._createHandler(), this._logger); this._modules = [ + router, configurationModule, evdriverModule, monitoringModule, @@ -106,6 +108,7 @@ class CitrineOSServer { transactionsModule ] this._apis = [ + new AdminApi(router, this._server, this._logger), new ConfigurationModuleApi(configurationModule, this._server, this._logger), new EVDriverModuleApi(evdriverModule, this._server, this._logger), new MonitoringModuleApi(monitoringModule, this._server, this._logger), @@ -138,7 +141,7 @@ class CitrineOSServer { shutdown() { - // Shut down all modules and central system + // Shut down all modules and ocpp router this._modules.forEach(module => { module.shutdown(); }); diff --git a/Server/unix-init-install-all.sh b/Server/unix-init-install-all.sh index 8a481ff0f..277f10683 100644 --- a/Server/unix-init-install-all.sh +++ b/Server/unix-init-install-all.sh @@ -42,6 +42,17 @@ util_commands=( "npm pack" ) +ocpprouter_commands=( + "cd ../03_Modules/OcppRouter" + "rm -rf ./lib" + "rm -f citrineos-ocpprouter-1.0.0.tgz" + "npm install ../../00_Base/citrineos-base-1.0.0.tgz" + "npm install ../../01_Data/citrineos-data-1.0.0.tgz" + "npm install ../../02_Util/citrineos-util-1.0.0.tgz" + "npm install" + "npm pack" +) + certificates_commands=( "cd ../03_Modules/Certificates" "rm -rf ./lib" @@ -125,6 +136,7 @@ ocpp_server_commands=( "npm install ../00_Base/citrineos-base-1.0.0.tgz" "npm install ../01_Data/citrineos-data-1.0.0.tgz" "npm install ../02_Util/citrineos-util-1.0.0.tgz" + "npm install ../03_Modules/OcppRouter/citrineos-ocpprouter-1.0.0.tgz" "npm install ../03_Modules/Certificates/citrineos-certificates-1.0.0.tgz" "npm install ../03_Modules/Configuration/citrineos-configuration-1.0.0.tgz" "npm install ../03_Modules/EVDriver/citrineos-evdriver-1.0.0.tgz" @@ -139,6 +151,8 @@ ocpp_server_commands=( execute_commands "${base_commands[@]}" execute_commands "${data_commands[@]}" execute_commands "${util_commands[@]}" +execute_commands "${ocpprouter_commands[@]}"& +pid_ocpprouter=$! execute_commands "${certificates_commands[@]}"& pid_certificates=$! execute_commands "${configuration_commands[@]}"& @@ -156,6 +170,7 @@ pid_transactions=$! +wait $pid_ocpprouter wait $pid_certificates wait $pid_configuration wait $pid_evdriver diff --git a/Swarm/docker/Dockerfile b/Swarm/docker/Dockerfile index 7c9d21da1..fb41560f2 100644 --- a/Swarm/docker/Dockerfile +++ b/Swarm/docker/Dockerfile @@ -36,6 +36,19 @@ COPY /02_Util/src /usr/02_Util/src RUN npm run build --prefix /usr/02_Util RUN cd /usr/02_Util && npm pack +# Build citrineos-ocpprouter module +FROM base as citrineos-ocpprouter-builder +COPY --from=citrineos-base-builder /usr/00_Base/*.tgz /usr/00_Base/ +COPY --from=citrineos-data-builder /usr/01_Data/*.tgz /usr/01_Data/ +COPY --from=citrineos-util-builder /usr/02_Util/*.tgz /usr/02_Util/ +COPY /03_Modules/OcppRouter/package.json /usr/03_Modules/OcppRouter/ +RUN npm install --ignore-scripts=true --prefix /usr/03_Modules/OcppRouter + +COPY /03_Modules/OcppRouter/tsconfig.json /usr/03_Modules/OcppRouter/ +COPY /03_Modules/OcppRouter/src /usr/03_Modules/OcppRouter/src +RUN npm run build --prefix /usr/03_Modules/OcppRouter +RUN cd /usr/03_Modules/OcppRouter && npm pack + # Build citrineos-certificates module FROM base as citrineos-certificates-builder COPY --from=citrineos-base-builder /usr/00_Base/*.tgz /usr/00_Base/ @@ -135,6 +148,7 @@ WORKDIR /usr/server COPY --from=citrineos-base-builder /usr/00_Base/*.tgz /usr/00_Base/ COPY --from=citrineos-data-builder /usr/01_Data/*.tgz /usr/01_Data/ COPY --from=citrineos-util-builder /usr/02_Util/*.tgz /usr/02_Util/ +COPY --from=citrineos-ocpprouter-builder /usr/03_Modules/OcppRouter/*.tgz /usr/03_Modules/OcppRouter/ COPY --from=citrineos-certificates-builder /usr/03_Modules/Certificates/*.tgz /usr/03_Modules/Certificates/ COPY --from=citrineos-configuration-builder /usr/03_Modules/Configuration/*.tgz /usr/03_Modules/Configuration/ COPY --from=citrineos-evdriver-builder /usr/03_Modules/EVDriver/*.tgz /usr/03_Modules/EVDriver/ diff --git a/Swarm/package.json b/Swarm/package.json index cfe4dd352..c9179735c 100644 --- a/Swarm/package.json +++ b/Swarm/package.json @@ -9,6 +9,7 @@ "install-base": "cd ../00_Base && npm install && npm run build && npm pack && cd ../Swarm && npm install ../00_Base/citrineos-base-1.0.0.tgz", "install-util": "cd ../02_Util && npm install ../00_Base/citrineos-base-1.0.0.tgz && npm run build && npm pack && cd ../Swarm && npm install ../02_Util/citrineos-util-1.0.0.tgz", "install-data": "cd ../01_Data && npm install ../00_Base/citrineos-base-1.0.0.tgz && npm run build && npm pack && cd ../Swarm && npm install ../01_Data/citrineos-data-1.0.0.tgz", + "install-ocpprouter": "cd ../03_Modules/OcppRouter && npm run install-all && npm install && npm run build && npm pack && cd ../../Server && npm install ../03_Modules/OcppRouter/citrineos-ocpprouter-1.0.0.tgz", "install-certificates": "cd ../03_Modules/Certificates && npm run install-all && npm install && npm run build && npm pack && cd ../../Swarm && npm install ../03_Modules/Certificates/citrineos-certificates-1.0.0.tgz", "install-configuration": "cd ../03_Modules/Configuration && npm run install-all && npm install && npm run build && npm pack && cd ../../Swarm && npm install ../03_Modules/Configuration/citrineos-configuration-1.0.0.tgz", "install-evdriver": "cd ../03_Modules/EVDriver && npm run install-all && npm install && npm run build && npm pack && cd ../../Swarm && npm install ../03_Modules/EVDriver/citrineos-evdriver-1.0.0.tgz", @@ -47,6 +48,7 @@ "@citrineos/data": "file:../01_Data/citrineos-data-1.0.0.tgz", "@citrineos/evdriver": "file:../03_Modules/EVDriver/citrineos-evdriver-1.0.0.tgz", "@citrineos/monitoring": "file:../03_Modules/Monitoring/citrineos-monitoring-1.0.0.tgz", + "@citrineos/ocpprouter": "file:../03_Modules/OcppRouter/citrineos-ocpprouter-1.0.0.tgz", "@citrineos/reporting": "file:../03_Modules/Reporting/citrineos-reporting-1.0.0.tgz", "@citrineos/smartcharging": "file:../03_Modules/SmartCharging/citrineos-smartcharging-1.0.0.tgz", "@citrineos/transactions": "file:../03_Modules/Transactions/citrineos-transactions-1.0.0.tgz", diff --git a/Swarm/src/index.ts b/Swarm/src/index.ts index 08d1d416a..5557a4c69 100644 --- a/Swarm/src/index.ts +++ b/Swarm/src/index.ts @@ -9,7 +9,7 @@ /* eslint-disable @typescript-eslint/consistent-type-imports */ import { EventGroup, IAuthenticator, ICache, IMessageHandler, IMessageSender, IModule, IModuleApi, SystemConfig } from '@citrineos/base'; import { MonitoringModule, MonitoringModuleApi } from '@citrineos/monitoring'; -import { Authenticator, initSwagger, MemoryCache, MessageRouterImpl, RabbitMqReceiver, RabbitMqSender, RedisCache, WebsocketNetworkConnection } from '@citrineos/util'; +import { Authenticator, initSwagger, MemoryCache, RabbitMqReceiver, RabbitMqSender, RedisCache, WebsocketNetworkConnection } from '@citrineos/util'; import { JsonSchemaToTsProvider } from '@fastify/type-provider-json-schema-to-ts'; import Ajv from "ajv"; import addFormats from "ajv-formats" @@ -20,6 +20,7 @@ import { ConfigurationModule, ConfigurationModuleApi } from '@citrineos/configur import { TransactionsModule, TransactionsModuleApi } from '@citrineos/transactions'; import { CertificatesModule, CertificatesModuleApi } from '@citrineos/certificates'; import { EVDriverModule, EVDriverModuleApi } from '@citrineos/evdriver'; +import { MessageRouterImpl, AdminApi } from '@citrineos/ocpprouter'; import { ReportingModule, ReportingModuleApi } from '@citrineos/reporting'; import { SmartChargingModule, SmartChargingModuleApi } from '@citrineos/smartcharging'; import { sequelize } from '@citrineos/data'; @@ -93,6 +94,8 @@ class CitrineOSServer { this._networkConnection = new WebsocketNetworkConnection(this._config, this._cache, this._authenticator, router, this._logger); + const api = new AdminApi(router, this._server, this._logger) + process.on('SIGINT', this.shutdown.bind(this)); process.on('SIGTERM', this.shutdown.bind(this)); process.on('SIGQUIT', this.shutdown.bind(this)); @@ -108,7 +111,7 @@ class CitrineOSServer { shutdown() { - // Shut down central system + // Shut down ocpp router this._networkConnection.shutdown(); // Shutdown server diff --git a/Swarm/unix-init-install-all.sh b/Swarm/unix-init-install-all.sh index f48fcbcf8..124c9d2a2 100644 --- a/Swarm/unix-init-install-all.sh +++ b/Swarm/unix-init-install-all.sh @@ -42,6 +42,17 @@ util_commands=( "npm pack" ) +ocpprouter_commands=( + "cd ../03_Modules/OcppRouter" + "rm -rf ./lib" + "rm -f citrineos-ocpprouter-1.0.0.tgz" + "npm install ../../00_Base/citrineos-base-1.0.0.tgz" + "npm install ../../01_Data/citrineos-data-1.0.0.tgz" + "npm install ../../02_Util/citrineos-util-1.0.0.tgz" + "npm install" + "npm pack" +) + certificates_commands=( "cd ../03_Modules/Certificates" "rm -rf ./lib" @@ -125,6 +136,7 @@ ocpp_server_commands=( "npm install ../00_Base/citrineos-base-1.0.0.tgz" "npm install ../01_Data/citrineos-data-1.0.0.tgz" "npm install ../02_Util/citrineos-util-1.0.0.tgz" + "npm install ../03_Modules/OcppRouter/citrineos-ocpprouter-1.0.0.tgz" "npm install ../03_Modules/Certificates/citrineos-certificates-1.0.0.tgz" "npm install ../03_Modules/Configuration/citrineos-configuration-1.0.0.tgz" "npm install ../03_Modules/EVDriver/citrineos-evdriver-1.0.0.tgz" @@ -139,6 +151,8 @@ ocpp_server_commands=( execute_commands "${base_commands[@]}" execute_commands "${data_commands[@]}" execute_commands "${util_commands[@]}" +execute_commands "${ocpprouter_commands[@]}"& +pid_ocpprouter=$! execute_commands "${certificates_commands[@]}"& pid_certificates=$! execute_commands "${configuration_commands[@]}"& @@ -156,6 +170,7 @@ pid_transactions=$! +wait $pid_ocpprouter wait $pid_certificates wait $pid_configuration wait $pid_evdriver