diff --git a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/package.json b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/package.json index 4995f9971c5..68c987630d0 100644 --- a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/package.json +++ b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/package.json @@ -2,6 +2,9 @@ "name": "@hyperledger/cactus-plugin-ledger-connector-sawtooth-socketio", "version": "1.1.2", "license": "Apache-2.0", + "main": "dist/index.js", + "module": "dist/index.js", + "types": "dist/index.d.ts", "scripts": { "start": "cd ./dist && node common/core/bin/www.js", "debug": "nodemon --inspect ./dist/common/core/bin/www.js", @@ -28,6 +31,9 @@ "xmlhttprequest": "1.8.0" }, "devDependencies": { + "@hyperledger/cactus-test-tooling": "1.1.2", + "@hyperledger/cactus-common": "1.1.2", + "@hyperledger/cactus-api-client": "1.1.2", "@types/config": "0.0.41" } } diff --git a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/common/core/bin/www.ts b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/common/core/bin/www.ts index f8ba3097496..785f23e9ca0 100755 --- a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/common/core/bin/www.ts +++ b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/common/core/bin/www.ts @@ -1,7 +1,7 @@ #!/usr/bin/env node /* - * Copyright 2021 Hyperledger Cactus Contributors + * Copyright 2022 Hyperledger Cactus Contributors * SPDX-License-Identifier: Apache-2.0 * * www.js @@ -16,7 +16,6 @@ */ import app from "../app"; -const debug = require("debug")("connector:server"); import https = require("https"); // Overwrite config read path @@ -37,42 +36,8 @@ logger.level = configRead('logLevel', 'info'); // destination dependency (MONITOR) implementation class import { ServerMonitorPlugin } from "../../../connector/ServerMonitorPlugin"; -const Smonitor = new ServerMonitorPlugin(); - -/** - * Get port from environment and store in Express. - */ - -const sslport = normalizePort(process.env.PORT || configRead('sslParam.port')); -app.set("port", sslport); - -// Specify private key and certificate -const sslParam = { - key: fs.readFileSync(configRead('sslParam.key')), - cert: fs.readFileSync(configRead('sslParam.cert')), -}; - -/** - * Create HTTPS server. - */ - -const server = https.createServer(sslParam, app); // Start as an https server. -const io = new Server(server); - -/** - * Listen on provided port, on all network interfaces. - */ - -server.listen(sslport, function () { - console.log("listening on *:" + sslport); -}); -server.on("error", onError); -server.on("listening", onListening); - -/** - * Normalize a port into a number, string, or false. - */ +// Normalize a port into a number, string, or false. function normalizePort(val: string) { const port = parseInt(val, 10); @@ -89,65 +54,108 @@ function normalizePort(val: string) { return false; } -/** - * Event listener for HTTPS server "error" event. - */ - -function onError(error: any) { - if (error.syscall !== "listen") { - throw error; +export async function startSawtoothSocketIOConnector() { + const Smonitor = new ServerMonitorPlugin(); + + // Get port from environment and store in Express. + const sslport = normalizePort(process.env.PORT || configRead('sslParam.port')); + app.set("port", sslport); + + // Specify private key and certificate + let keyString: string; + let certString: string; + try { + keyString = configRead('sslParam.keyValue'); + certString = configRead('sslParam.certValue'); + } catch { + keyString = fs.readFileSync(configRead('sslParam.key'), "ascii"); + certString = fs.readFileSync(configRead('sslParam.cert'), "ascii"); } - const bind = - typeof sslport === "string" ? "Pipe " + sslport : "Port " + sslport; + // Create HTTPS server. + const server = https.createServer({ + key: keyString, + cert: certString, + }, app); // Start as an https server. + const io = new Server(server); - // handle specific listen errors with friendly messages - switch (error.code) { - case "EACCES": - console.error(bind + " requires elevated privileges"); - process.exit(1); - break; - case "EADDRINUSE": - console.error(bind + " is already in use"); - process.exit(1); - break; - default: + // Event listener for HTTPS server "error" event. + server.on("error", (error: any) => { + if (error.syscall !== "listen") { throw error; - } -} + } + + const bind = + typeof sslport === "string" ? "Pipe " + sslport : "Port " + sslport; + + // handle specific listen errors with friendly messages + switch (error.code) { + case "EACCES": + console.error(bind + " requires elevated privileges"); + process.exit(1); + case "EADDRINUSE": + console.error(bind + " is already in use"); + process.exit(1); + default: + throw error; + } + }); -/** - * Event listener for HTTPS server "listening" event. - */ + io.on("connection", function (client) { + logger.info("Client " + client.id + " connected."); -function onListening() { - const addr = server.address(); + // startMonitor: starting block generation event monitoring + client.on("startMonitor", function (data) { - if (!addr) { - logger.error("Could not get running server address - exit."); - process.exit(1); - } + if (!data || !data.filterKey) { + client.emit("error", { + status: 400, + errorDetail: "filterKey is required for startMonitor on sawtooth ledger", + }); + } - const bind = typeof addr === "string" ? "pipe " + addr : "port " + addr.port; - debug("Listening on " + bind); -} + Smonitor.startMonitor(client.id, data.filterKey, (callbackData) => { + let emitType = ""; + if (callbackData.status == 200) { + emitType = "eventReceived"; + logger.info("event data callbacked."); + } else { + emitType = "monitor_error"; + } + client.emit(emitType, callbackData); + }); + }); -io.on("connection", function (client) { - logger.info("Client " + client.id + " connected."); - - /** - * startMonitor: starting block generation event monitoring - **/ - client.on("startMonitor", function (data) { - Smonitor.startMonitor(client.id, data.filterKey, (callbackData) => { - let emitType = ""; - if (callbackData.status == 200) { - emitType = "eventReceived"; - logger.info("event data callbacked."); - } else { - emitType = "monitor_error"; - } - client.emit(emitType, callbackData); + + client.on("stopMonitor", function () { + Smonitor.stopMonitor(client.id); }); + + client.on("disconnect", function (reason) { + logger.info("Client " + client.id + " disconnected."); + logger.info("Reason :" + reason); + // Stop monitoring if disconnected client is for event monitoring + Smonitor.stopMonitor(client.id); + }); + }); + + // Listen on provided port, on all network interfaces. + return new Promise((resolve) => server.listen(sslport, () => resolve(server))); +} + +if (require.main === module) { + // When this file executed as a script, not loaded as module - run the connector + startSawtoothSocketIOConnector().then((server) => { + const addr = server.address(); + + if (!addr) { + logger.error("Could not get running server address - exit."); + process.exit(1); + } + + const bind = typeof addr === "string" ? "pipe " + addr : "port " + addr.port; + logger.debug("Listening on " + bind); + }).catch((err) => { + logger.error("Could not start sawtooth-socketio connector:", err); }); -}); +} diff --git a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/connector/ServerMonitorPlugin.ts b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/connector/ServerMonitorPlugin.ts index 90b3724ea41..242fee0ec7c 100644 --- a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/connector/ServerMonitorPlugin.ts +++ b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/connector/ServerMonitorPlugin.ts @@ -32,15 +32,8 @@ export type MonitorCallback = (callback: { * Class definitions of server monitoring */ export class ServerMonitorPlugin { - currentBlockNumber: number; - - /* - * constructors - */ - constructor() { - // Define dependent specific settings - this.currentBlockNumber = -1; - } + currentBlockNumber = -1; + runningMonitors = new Map>(); /* * startMonitor @@ -62,12 +55,20 @@ export class ServerMonitorPlugin { logger.debug(`responseObj = ${JSON.stringify(responseObj)}`); logger.debug(`currentBlockNumber = ${that.currentBlockNumber}`); that.periodicMonitoring(clientId, filterKey, cb); - }; + } + + const method = configRead("blockMonitor.request.method"); + const latestBlockURL = new URL( + configRead("blockMonitor.request.getLatestBlockNumberCommand"), + configRead("blockMonitor.request.host")) + .toString(); + logger.debug("latestBlockURL:", latestBlockURL); + httpReq.open( - configRead("blockMonitor.request.method"), - configRead("blockMonitor.request.host") + - configRead("blockMonitor.request.getLatestBlockNumberCommand"), + method, + latestBlockURL, ); + httpReq.send(); } @@ -135,7 +136,12 @@ export class ServerMonitorPlugin { status: 200, blockData: signedTransactionDataArray, }; - cb(retObj); + + if (that.runningMonitors.has(clientId)) { + cb(retObj); + } else { + logger.info("Monitoring seems to be stopped - don't send the response! ClientID:", clientId); + } } } @@ -144,15 +150,36 @@ export class ServerMonitorPlugin { } logger.debug(`currentBlockNumber = ${that.currentBlockNumber}`); }; + const timerBlockMonitoring = setInterval(function () { - const callURL = - configRead("blockMonitor.request.host") + + const newBlocksPath = configRead("blockMonitor.request.periodicMonitoringCommand1") + SplugUtil.convertBlockNumber(that.currentBlockNumber) + configRead("blockMonitor.request.periodicMonitoringCommand2"); - logger.debug(`call URL = ${callURL}`); - httpReq.open(configRead("blockMonitor.request.method"), callURL); + const newBlocksUrl = new URL( + newBlocksPath, + configRead("blockMonitor.request.host")) + .toString(); + logger.debug(`newBlocksUrl = ${newBlocksUrl}`); + + httpReq.open(configRead("blockMonitor.request.method"), newBlocksUrl); httpReq.send(); }, configRead("blockMonitor.pollingInterval")); + + this.runningMonitors.set(clientId, timerBlockMonitoring); + } + + /** + * Stop monitor for given client. + * + * @param {string} clientId: Client ID from which monitoring stop request was made + */ + stopMonitor(clientId: string) { + let monitorInterval = this.runningMonitors.get(clientId); + if (monitorInterval) { + logger.info("stop watching and remove interval."); + clearInterval(monitorInterval); + this.runningMonitors.delete(clientId); + } } } diff --git a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/index.ts b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/index.ts new file mode 100644 index 00000000000..87cb558397c --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/index.ts @@ -0,0 +1 @@ +export * from "./public-api"; diff --git a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/public-api.ts b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/public-api.ts new file mode 100644 index 00000000000..089662c6b7c --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/main/typescript/public-api.ts @@ -0,0 +1 @@ +export { startSawtoothSocketIOConnector } from "./common/core/bin/www" diff --git a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/test/typescript/integration/sawtooth-socketio-connector.test.ts b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/test/typescript/integration/sawtooth-socketio-connector.test.ts new file mode 100644 index 00000000000..a0b2ed893eb --- /dev/null +++ b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/src/test/typescript/integration/sawtooth-socketio-connector.test.ts @@ -0,0 +1,244 @@ +/** + * Functional test of basic operations on sawtooth connector (packages/cactus-plugin-ledger-connector-sawtooth-socketio). + */ + +////////////////////////////////// +// Constants +////////////////////////////////// + +const testLogLevel: LogLevelDesc = "info"; +const sutLogLevel: LogLevelDesc = "info"; +const testTimeout = 1000 * 2 * 60; // 2 minutes timeout for some tests +const setupTimeout = 1000 * 3 * 60; // 3 minutes timeout for setup + +// Ledger settings +const containerImageName = "ghcr.io/hyperledger/cactus-sawtooth-all-in-one"; +const containerImageVersion = "2022-11-21-9da24a0"; +const useRunningLedger = false; + +// Use for development on local sawtooth network +// const containerImageName = "sawtooth_aio_1x"; +// const containerImageVersion = "1.0.0"; +// const useRunningLedger = true; + +// ApiClient settings +const syncReqTimeout = 1000 * 10; // 10 seconds + +import { + SawtoothTestLedger, + SelfSignedPkiGenerator, + pruneDockerAllIfGithubAction, +} from "@hyperledger/cactus-test-tooling"; + +import { + LogLevelDesc, + LoggerProvider, + Logger, +} from "@hyperledger/cactus-common"; + +import { SocketIOApiClient } from "@hyperledger/cactus-api-client"; + +import "jest-extended"; +import { Server as HttpsServer } from "https"; + +// Logger setup +const log: Logger = LoggerProvider.getOrCreate({ + label: "sawtooth-socketio-connector.test", + level: testLogLevel, +}); + +describe("Sawtooth-SocketIO connector tests", () => { + let ledger: SawtoothTestLedger; + let connectorPrivKeyValue: string; + let connectorCertValue: string; + let connectorServer: HttpsServer; + let apiClient: SocketIOApiClient; + + ////////////////////////////////// + // Environment Setup + ////////////////////////////////// + + beforeAll(async () => { + log.info("Prune Docker..."); + await pruneDockerAllIfGithubAction({ logLevel: testLogLevel }); + + log.info(`Start Ledger ${containerImageName}:${containerImageVersion}...`); + ledger = new SawtoothTestLedger({ + containerImageName, + containerImageVersion, + emitContainerLogs: false, + logLevel: sutLogLevel, + useRunningLedger, + }); + await ledger.start(); + const ledgerRestApi = await ledger.getRestApiHost(); + log.info(`Ledger started, API: ${ledgerRestApi}`); + + // Generate connector private key and certificate + const pkiGenerator = new SelfSignedPkiGenerator(); + const pki = pkiGenerator.create("localhost"); + connectorPrivKeyValue = pki.privateKeyPem; + connectorCertValue = pki.certificatePem; + const jwtAlgo = "RS512"; + + const connectorConfig: any = { + sslParam: { + port: 0, // random port + keyValue: connectorPrivKeyValue, + certValue: connectorCertValue, + jwtAlgo: jwtAlgo, + }, + blockMonitor: { + request: { + method: "GET", + host: ledgerRestApi, + getLatestBlockNumberCommand: "blocks?limit=1", + periodicMonitoringCommand1: "blocks?start=", + periodicMonitoringCommand2: "&reverse", + }, + pollingInterval: 5000 + }, + logLevel: sutLogLevel, + }; + const configJson = JSON.stringify(connectorConfig); + log.debug("Connector Config:", configJson); + + log.info("Export connector config before loading the module..."); + process.env["NODE_CONFIG"] = configJson; + + // Load connector module + const connectorModule = await import("../../../main/typescript/index"); + + // Run the connector + connectorServer = await connectorModule.startSawtoothSocketIOConnector(); + expect(connectorServer).toBeTruthy(); + const connectorAddress = connectorServer.address(); + if (!connectorAddress || typeof connectorAddress === "string") { + throw new Error("Unexpected sawtooth connector AddressInfo type"); + } + log.info( + "Sawtooth-SocketIO Connector started on:", + `${connectorAddress.address}:${connectorAddress.port}`, + ); + + // Create ApiClient instance + const apiConfigOptions = { + validatorID: "sawtooth-socketio-test", + validatorURL: `https://localhost:${connectorAddress.port}`, + validatorKeyValue: connectorCertValue, + logLevel: sutLogLevel, + maxCounterRequestID: 1000, + syncFunctionTimeoutMillisecond: syncReqTimeout, + socketOptions: { + rejectUnauthorized: false, + reconnection: false, + timeout: syncReqTimeout * 2, + }, + }; + log.debug("ApiClient config:", apiConfigOptions); + apiClient = new SocketIOApiClient(apiConfigOptions); + }, setupTimeout); + + afterAll(async () => { + log.info("FINISHING THE TESTS"); + + if (apiClient) { + log.info("Close ApiClient connection..."); + apiClient.close(); + } + + if (connectorServer) { + log.info("Stop the sawtooth connector..."); + await new Promise((resolve) => + connectorServer.close(() => resolve()), + ); + } + + if (ledger) { + log.info("Stop the sawtooth ledger..."); + await ledger.stop(); + await ledger.destroy(); + } + + // SocketIOApiClient has timeout running for each request which is not cancellable at the moment. + // Wait timeout amount of seconds to make sure all handles are closed. + await new Promise((resolve) => setTimeout(resolve, syncReqTimeout)) + + log.info("Prune Docker..."); + await pruneDockerAllIfGithubAction({ logLevel: testLogLevel }); + }, setupTimeout); + + ////////////////////////////////// + // Tests + ////////////////////////////////// + + /** + * Simple test to see if test sawtooth ledger is running correctly and required API is available. + * Will set and retrieve intkey value. + * Doesn't use apiClient or validator. + */ + test("Sanity check ledger connection", async () => { + const keyName = "sanityCheck"; + const keyValue = "42"; + + // Set key + const setResponse = JSON.parse(await ledger.runSawtoothShell(["intkey", "set", keyName, keyValue])); + log.debug("setResponse:", setResponse); + const setStatus = await ledger.waitOnTransaction(setResponse.link); + log.info("setStatus:", setStatus); + expect(setStatus).not.toEqual("PENDING"); // TX should be commited + + // Show key + const showResponse = await ledger.runSawtoothShell(["intkey", "show", keyName]); + log.info("showResponse:", showResponse); + expect(showResponse).toContain(keyName); + expect(showResponse).toContain(keyValue); + }); + + /** + * Test ServerMonitorPlugin startMonitor/stopMonitor functions. + */ + test("Monitoring returns new block", async () => { + // Create monitoring promise and subscription + let monitorSub: any; + const newBlockPromise = new Promise((resolve, reject) => { + monitorSub = apiClient.watchBlocksV1({ + filterKey: "intkey" + }).subscribe({ + next: block => resolve(block), + error: err => reject(err), + complete: () => reject("Unexpected watchBlocksV1 completion - reject."), + }); + }); + + // Keep adding new keys until block was received + try { + let keyId = 1; + while (keyId++) { // infinite loop + // Set new key + const keyName = "monitorTest" + keyId; + await ledger.runSawtoothShell(["intkey", "set", keyName, "42"]); + await ledger.runSawtoothShell(["intkey", "inc", keyName, "11"]); + const sleepPromise: Promise = new Promise((resolve) => setTimeout(resolve, 2000)) + + // Wait for 2 seconds or for new block to arrive + const resolvedValue = await Promise.race([newBlockPromise, sleepPromise]); + log.debug("Monitor: resolvedValue", resolvedValue); + if (resolvedValue && resolvedValue!.blockData) { + log.info("Resolved watchBlock promise"); + expect(resolvedValue.status).toEqual(200); + expect(resolvedValue.blockData).toBeTruthy(); + break; + } + } + } catch (error) { + throw error; + } finally { + if (monitorSub) { + monitorSub.unsubscribe(); + } else { + log.warn("monitorSub was not valid, could not unsubscribe"); + } + } + }, testTimeout); +}); diff --git a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/tsconfig.json b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/tsconfig.json index 8a357213720..ea43e4c32e6 100644 --- a/packages/cactus-plugin-ledger-connector-sawtooth-socketio/tsconfig.json +++ b/packages/cactus-plugin-ledger-connector-sawtooth-socketio/tsconfig.json @@ -13,11 +13,21 @@ "./src/main/typescript/common/core/*.ts", "./src/main/typescript/common/core/bin/*.ts", "./src/main/typescript/common/core/config/*.ts", - "./src/main/typescript/connector/*.ts" + "./src/main/typescript/connector/*.ts", + "./src/main/typescript/*.ts" ], "references": [ { "path": "../cactus-cmd-socketio-server/tsconfig.json" + }, + { + "path": "../cactus-test-tooling/tsconfig.json" + }, + { + "path": "../cactus-common/tsconfig.json" + }, + { + "path": "../cactus-api-client/tsconfig.json" } ] }