diff --git a/TickerQ.slnx b/TickerQ.slnx index 2b854dcc..f6e73832 100644 --- a/TickerQ.slnx +++ b/TickerQ.slnx @@ -24,8 +24,8 @@ - - + + diff --git a/hub/Directory.Build.props b/hub/Directory.Build.props new file mode 100644 index 00000000..5085f4ee --- /dev/null +++ b/hub/Directory.Build.props @@ -0,0 +1,10 @@ + + + + + + + + + diff --git a/src/TickerQ.RemoteExecutor/Models/RegisteredFunctionsResponse.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/Models/RegisteredFunctionsResponse.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/Models/RegisteredFunctionsResponse.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/Models/RegisteredFunctionsResponse.cs diff --git a/src/TickerQ.RemoteExecutor/Models/RemoteTickerFunctionDescriptor.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/Models/RemoteTickerFunctionDescriptor.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/Models/RemoteTickerFunctionDescriptor.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/Models/RemoteTickerFunctionDescriptor.cs diff --git a/src/TickerQ.RemoteExecutor/RemoteExecutionDelegateFactory.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/RemoteExecutionDelegateFactory.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/RemoteExecutionDelegateFactory.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/RemoteExecutionDelegateFactory.cs diff --git a/src/TickerQ.RemoteExecutor/RemoteExecutionEndpoints.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/RemoteExecutionEndpoints.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/RemoteExecutionEndpoints.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/RemoteExecutionEndpoints.cs diff --git a/src/TickerQ.RemoteExecutor/RemoteExecutionServiceExtension.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/RemoteExecutionServiceExtension.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/RemoteExecutionServiceExtension.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/RemoteExecutionServiceExtension.cs diff --git a/src/TickerQ.RemoteExecutor/RemoteFunctionRegistry.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/RemoteFunctionRegistry.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/RemoteFunctionRegistry.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/RemoteFunctionRegistry.cs diff --git a/src/TickerQ.RemoteExecutor/RemoteFunctionsSyncService.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/RemoteFunctionsSyncService.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/RemoteFunctionsSyncService.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/RemoteFunctionsSyncService.cs diff --git a/src/TickerQ.RemoteExecutor/TickerExecutionTaskHandlerRouter.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/TickerExecutionTaskHandlerRouter.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/TickerExecutionTaskHandlerRouter.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/TickerExecutionTaskHandlerRouter.cs diff --git a/src/TickerQ.RemoteExecutor/TickerQ.RemoteExecutor.csproj b/hub/remoteExecutor/TickerQ.RemoteExecutor/TickerQ.RemoteExecutor.csproj similarity index 100% rename from src/TickerQ.RemoteExecutor/TickerQ.RemoteExecutor.csproj rename to hub/remoteExecutor/TickerQ.RemoteExecutor/TickerQ.RemoteExecutor.csproj diff --git a/src/TickerQ.RemoteExecutor/TickerQRemoteExecutionOptions.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/TickerQRemoteExecutionOptions.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/TickerQRemoteExecutionOptions.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/TickerQRemoteExecutionOptions.cs diff --git a/src/TickerQ.RemoteExecutor/TickerQRemoteExecutorConstants.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/TickerQRemoteExecutorConstants.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/TickerQRemoteExecutorConstants.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/TickerQRemoteExecutorConstants.cs diff --git a/src/TickerQ.RemoteExecutor/TickerQRemoteSignatureFilter.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/TickerQRemoteSignatureFilter.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/TickerQRemoteSignatureFilter.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/TickerQRemoteSignatureFilter.cs diff --git a/src/TickerQ.RemoteExecutor/TickerRemoteExecutionTaskHandler.cs b/hub/remoteExecutor/TickerQ.RemoteExecutor/TickerRemoteExecutionTaskHandler.cs similarity index 100% rename from src/TickerQ.RemoteExecutor/TickerRemoteExecutionTaskHandler.cs rename to hub/remoteExecutor/TickerQ.RemoteExecutor/TickerRemoteExecutionTaskHandler.cs diff --git a/src/TickerQ.SDK/Client/TickerQSdkHttpClient.cs b/hub/sdks/dotnet/TickerQ.SDK/Client/TickerQSdkHttpClient.cs similarity index 100% rename from src/TickerQ.SDK/Client/TickerQSdkHttpClient.cs rename to hub/sdks/dotnet/TickerQ.SDK/Client/TickerQSdkHttpClient.cs diff --git a/src/TickerQ.SDK/DependencyInjection/TickerQSdkDependencyInjection.cs b/hub/sdks/dotnet/TickerQ.SDK/DependencyInjection/TickerQSdkDependencyInjection.cs similarity index 100% rename from src/TickerQ.SDK/DependencyInjection/TickerQSdkDependencyInjection.cs rename to hub/sdks/dotnet/TickerQ.SDK/DependencyInjection/TickerQSdkDependencyInjection.cs diff --git a/src/TickerQ.SDK/HostedServices/TickerQFunctionRegistrationHostedService.cs b/hub/sdks/dotnet/TickerQ.SDK/HostedServices/TickerQFunctionRegistrationHostedService.cs similarity index 100% rename from src/TickerQ.SDK/HostedServices/TickerQFunctionRegistrationHostedService.cs rename to hub/sdks/dotnet/TickerQ.SDK/HostedServices/TickerQFunctionRegistrationHostedService.cs diff --git a/src/TickerQ.SDK/Infrastructure/JsonExampleGenerator.cs b/hub/sdks/dotnet/TickerQ.SDK/Infrastructure/JsonExampleGenerator.cs similarity index 100% rename from src/TickerQ.SDK/Infrastructure/JsonExampleGenerator.cs rename to hub/sdks/dotnet/TickerQ.SDK/Infrastructure/JsonExampleGenerator.cs diff --git a/src/TickerQ.SDK/Infrastructure/TickerQFunctionSyncService.cs b/hub/sdks/dotnet/TickerQ.SDK/Infrastructure/TickerQFunctionSyncService.cs similarity index 100% rename from src/TickerQ.SDK/Infrastructure/TickerQFunctionSyncService.cs rename to hub/sdks/dotnet/TickerQ.SDK/Infrastructure/TickerQFunctionSyncService.cs diff --git a/src/TickerQ.SDK/Models/Node.cs b/hub/sdks/dotnet/TickerQ.SDK/Models/Node.cs similarity index 100% rename from src/TickerQ.SDK/Models/Node.cs rename to hub/sdks/dotnet/TickerQ.SDK/Models/Node.cs diff --git a/src/TickerQ.SDK/Models/NodeFunction.cs b/hub/sdks/dotnet/TickerQ.SDK/Models/NodeFunction.cs similarity index 100% rename from src/TickerQ.SDK/Models/NodeFunction.cs rename to hub/sdks/dotnet/TickerQ.SDK/Models/NodeFunction.cs diff --git a/src/TickerQ.SDK/Models/RemoteExecutionContext.cs b/hub/sdks/dotnet/TickerQ.SDK/Models/RemoteExecutionContext.cs similarity index 100% rename from src/TickerQ.SDK/Models/RemoteExecutionContext.cs rename to hub/sdks/dotnet/TickerQ.SDK/Models/RemoteExecutionContext.cs diff --git a/src/TickerQ.SDK/Models/SyncNodesAndFunctionsResult.cs b/hub/sdks/dotnet/TickerQ.SDK/Models/SyncNodesAndFunctionsResult.cs similarity index 100% rename from src/TickerQ.SDK/Models/SyncNodesAndFunctionsResult.cs rename to hub/sdks/dotnet/TickerQ.SDK/Models/SyncNodesAndFunctionsResult.cs diff --git a/src/TickerQ.SDK/Persistence/TickerQRemotePersistenceProvider.cs b/hub/sdks/dotnet/TickerQ.SDK/Persistence/TickerQRemotePersistenceProvider.cs similarity index 100% rename from src/TickerQ.SDK/Persistence/TickerQRemotePersistenceProvider.cs rename to hub/sdks/dotnet/TickerQ.SDK/Persistence/TickerQRemotePersistenceProvider.cs diff --git a/src/TickerQ.SDK/SdkExecutionEndpoint.cs b/hub/sdks/dotnet/TickerQ.SDK/SdkExecutionEndpoint.cs similarity index 100% rename from src/TickerQ.SDK/SdkExecutionEndpoint.cs rename to hub/sdks/dotnet/TickerQ.SDK/SdkExecutionEndpoint.cs diff --git a/src/TickerQ.SDK/TickerQ.SDK.csproj b/hub/sdks/dotnet/TickerQ.SDK/TickerQ.SDK.csproj similarity index 100% rename from src/TickerQ.SDK/TickerQ.SDK.csproj rename to hub/sdks/dotnet/TickerQ.SDK/TickerQ.SDK.csproj diff --git a/src/TickerQ.SDK/TickerQSdkConstants.cs b/hub/sdks/dotnet/TickerQ.SDK/TickerQSdkConstants.cs similarity index 100% rename from src/TickerQ.SDK/TickerQSdkConstants.cs rename to hub/sdks/dotnet/TickerQ.SDK/TickerQSdkConstants.cs diff --git a/src/TickerQ.SDK/TickerQSignatureFilter.cs b/hub/sdks/dotnet/TickerQ.SDK/TickerQSignatureFilter.cs similarity index 100% rename from src/TickerQ.SDK/TickerQSignatureFilter.cs rename to hub/sdks/dotnet/TickerQ.SDK/TickerQSignatureFilter.cs diff --git a/src/TickerQ.SDK/TickerSdkOptions.cs b/hub/sdks/dotnet/TickerQ.SDK/TickerSdkOptions.cs similarity index 100% rename from src/TickerQ.SDK/TickerSdkOptions.cs rename to hub/sdks/dotnet/TickerQ.SDK/TickerSdkOptions.cs diff --git a/hub/sdks/node/.gitignore b/hub/sdks/node/.gitignore new file mode 100644 index 00000000..76707a62 --- /dev/null +++ b/hub/sdks/node/.gitignore @@ -0,0 +1,6 @@ +node_modules/ +dist/ +*.tsbuildinfo +.npm +.env +.env.* diff --git a/hub/sdks/node/LICENSE b/hub/sdks/node/LICENSE new file mode 100644 index 00000000..77ec7e41 --- /dev/null +++ b/hub/sdks/node/LICENSE @@ -0,0 +1,213 @@ +TickerQ is dual-licensed under the Apache License 2.0 and the MIT License. + +You may choose either license to use this software. + +--- + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + +TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + +1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all other + entities that control, are controlled by, or are under common control + with that entity. For the purposes of this definition, "control" + means (i) the power, direct or indirect, to cause the direction or + management of such entity, whether by contract or otherwise, or + (ii) ownership of fifty percent (50%) or more of the outstanding + shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity exercising + permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + +2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + +3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + +4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + +5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + +6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + +7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + +8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + +9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + +END OF TERMS AND CONDITIONS + +Copyright 2025 Arcenox + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +--- + + MIT License + +Copyright (c) 2025 Arcenox + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. \ No newline at end of file diff --git a/hub/sdks/node/README.md b/hub/sdks/node/README.md new file mode 100644 index 00000000..10b72f1d --- /dev/null +++ b/hub/sdks/node/README.md @@ -0,0 +1,215 @@ +# @tickerq/sdk + +Node.js SDK for [TickerQ](https://tickerq.net) — connect your Node.js application to TickerQ Hub for distributed job scheduling. + +## Installation + +```bash +npm install @tickerq/sdk +``` + +**Requirements:** Node.js >= 18 + +## Quick Start + +```ts +import express from 'express'; +import { TickerQSdk, TickerTaskPriority } from '@tickerq/sdk'; + +const app = express(); +app.use(express.raw({ type: 'application/json' })); + +// 1. Initialize SDK +const sdk = new TickerQSdk((opts) => + opts + .setApiKey('your-api-key') + .setApiSecret('your-api-secret') + .setCallbackUri('https://your-app.com') + .setNodeName('my-node'), +); + +// 2. Register functions +sdk.function('SendEmail', { priority: TickerTaskPriority.High }) + .withRequest({ to: '', subject: '', body: '' }) + .handle(async (ctx, signal) => { + console.log(`Sending email to ${ctx.request.to}`); + }); + +// 3. Mount endpoints & start +sdk.expressHandlers().mount(app); + +await sdk.start(); +app.listen(3000); +``` + +## Registering Functions + +### With typed request + +The default value provides both **type inference** and the **example JSON** sent to the Hub. + +```ts +sdk.function('ProcessOrder', { + priority: TickerTaskPriority.High, + maxConcurrency: 3, + requestType: 'OrderRequest', +}) + .withRequest({ orderId: 0, customerId: '', items: [''], total: 0 }) + .handle(async (ctx, signal) => { + ctx.request.orderId; // number + ctx.request.customerId; // string + ctx.request.items; // string[] + }); +``` + +### Without request + +```ts +sdk.function('DatabaseCleanup', { + cronExpression: '0 0 3 * * *', + priority: TickerTaskPriority.LongRunning, +}) + .handle(async (ctx, signal) => { + console.log(`Running cleanup for ${ctx.functionName}`); + }); +``` + +### With primitive request + +```ts +sdk.function('ResizeImage') + .withRequest('default-url') + .handle(async (ctx, signal) => { + console.log(ctx.request); // string + }); +``` + +## Function Options + +| Option | Type | Default | Description | +|--------|------|---------|-------------| +| `cronExpression` | `string` | — | Cron schedule (6-field, second precision) | +| `priority` | `TickerTaskPriority` | `Normal` | `High`, `Normal`, `Low`, or `LongRunning` | +| `maxConcurrency` | `number` | `0` (unlimited) | Max parallel executions for this function | +| `requestType` | `string` | auto-detected | Type name sent to Hub for documentation | + +## SDK Configuration + +```ts +const sdk = new TickerQSdk((opts) => + opts + .setApiKey('your-api-key') // Required — Hub API key + .setApiSecret('your-api-secret') // Required — Hub API secret + .setCallbackUri('https://...') // Required — URL where Hub sends execution callbacks + .setNodeName('my-node') // Required — Unique node identifier + .setTimeoutMs(30000) // Optional — HTTP timeout (default: 30s) + .setAllowSelfSignedCerts(true), // Optional — Skip TLS verification (dev only) +); +``` + +## Mounting Endpoints + +The SDK exposes two HTTP endpoints that the Hub calls: + +- `POST /execute` — Receives function execution requests +- `POST /resync` — Re-syncs function registry with the Hub + +### Express + +```ts +sdk.expressHandlers().mount(app); + +// Or with a prefix +sdk.expressHandlers('/tickerq').mount(app); +``` + +### Raw Node.js HTTP + +```ts +import { createServer } from 'node:http'; + +const handler = sdk.createHandler(); +const server = createServer(handler); +server.listen(3000); +``` + +## Lifecycle + +```ts +// Start — freezes function registry, syncs with Hub +await sdk.start(); + +// Check status +console.log(sdk.isStarted); + +// Graceful shutdown — waits for running tasks to complete +await sdk.stop(); // default 30s timeout +await sdk.stop(60_000); // custom timeout +``` + +## Handler Context + +Every handler receives a `TickerFunctionContext` and an `AbortSignal`: + +```ts +sdk.function('MyJob') + .handle(async (ctx, signal) => { + ctx.id; // string — unique execution ID + ctx.functionName; // string — registered function name + ctx.type; // TickerType — TimeTicker or CronTickerOccurrence + ctx.retryCount; // number — current retry attempt + ctx.scheduledFor; // Date — when this execution was scheduled + ctx.isDue; // boolean + + // Use signal for cancellation + if (signal.aborted) return; + }); +``` + +With a typed request: + +```ts +sdk.function('SendEmail') + .withRequest({ to: '', subject: '' }) + .handle(async (ctx, signal) => { + ctx.request.to; // string — fully typed + ctx.request.subject; // string + }); +``` + +## Priority Levels + +| Priority | Behavior | +|----------|----------| +| `TickerTaskPriority.High` | Executed first | +| `TickerTaskPriority.Normal` | Default priority | +| `TickerTaskPriority.Low` | Executed when no higher priority tasks are queued | +| `TickerTaskPriority.LongRunning` | Bypasses worker concurrency limit | + +## Custom Logger + +```ts +import type { TickerQLogger } from '@tickerq/sdk'; + +const logger: TickerQLogger = { + info: (msg, ...args) => console.log(msg, ...args), + warn: (msg, ...args) => console.warn(msg, ...args), + error: (msg, ...args) => console.error(msg, ...args), +}; + +const sdk = new TickerQSdk((opts) => opts + .setApiKey('...') + .setApiSecret('...') + .setCallbackUri('...') + .setNodeName('...'), + logger, +); +``` + +## Zero Dependencies + +The SDK has **no runtime dependencies**. It uses only Node.js built-in modules (`node:http`, `node:https`, `node:crypto`). Express is an optional peer dependency for the `expressHandlers()` convenience method. + +## License + +Dual-licensed under [MIT](LICENSE) and [Apache 2.0](LICENSE). Choose whichever you prefer. diff --git a/hub/sdks/node/package.json b/hub/sdks/node/package.json new file mode 100644 index 00000000..6d311200 --- /dev/null +++ b/hub/sdks/node/package.json @@ -0,0 +1,62 @@ +{ + "name": "@tickerq/sdk", + "version": "1.0.0", + "description": "TickerQ Node.js SDK — Connect your Node.js application to TickerQ Hub for distributed job scheduling.", + "main": "dist/index.js", + "module": "dist/index.js", + "types": "dist/index.d.ts", + "exports": { + ".": { + "types": "./dist/index.d.ts", + "require": "./dist/index.js", + "default": "./dist/index.js" + } + }, + "scripts": { + "build": "tsc", + "watch": "tsc --watch", + "clean": "rm -rf dist", + "prepublishOnly": "npm run build" + }, + "keywords": [ + "tickerq", + "scheduler", + "background-jobs", + "cron", + "distributed", + "hub", + "sdk" + ], + "author": "Arcenox", + "license": "(MIT OR Apache-2.0)", + "repository": { + "type": "git", + "url": "https://github.com/Arcenox-co/TickerQ", + "directory": "sdks/node" + }, + "homepage": "https://tickerq.net", + "bugs": { + "url": "https://github.com/Arcenox-co/TickerQ/issues" + }, + "engines": { + "node": ">=18.0.0" + }, + "files": [ + "dist", + "LICENSE", + "README.md" + ], + "peerDependencies": { + "express": ">=4.0.0" + }, + "peerDependenciesMeta": { + "express": { + "optional": true + } + }, + "devDependencies": { + "@types/express": "^4.17.21", + "@types/node": "^20.11.0", + "typescript": "^5.3.0" + } +} diff --git a/hub/sdks/node/src/TickerQSdk.ts b/hub/sdks/node/src/TickerQSdk.ts new file mode 100644 index 00000000..0fb827d8 --- /dev/null +++ b/hub/sdks/node/src/TickerQSdk.ts @@ -0,0 +1,214 @@ +import { TickerSdkOptions } from './TickerSdkOptions'; +import { TickerQSdkHttpClient, TickerQLogger } from './client/TickerQSdkHttpClient'; +import { + TickerFunctionProvider, + type TickerFunctionHandler, + type TickerFunctionHandlerNoRequest, +} from './infrastructure/TickerFunctionProvider'; +import { TickerFunctionBuilder, type FunctionOptions } from './infrastructure/TickerFunctionBuilder'; +import { TickerQFunctionSyncService } from './infrastructure/TickerQFunctionSyncService'; +import { TickerQRemotePersistenceProvider } from './persistence/TickerQRemotePersistenceProvider'; +import { TickerQTaskScheduler } from './worker/TickerQTaskScheduler'; +import { TickerFunctionConcurrencyGate } from './worker/TickerFunctionConcurrencyGate'; +import { SdkExecutionEndpoint } from './middleware/SdkExecutionEndpoint'; + +/** + * Main entry point for the TickerQ Node.js SDK. + * + * Usage: + * ```ts + * const sdk = new TickerQSdk(opts => opts + * .setApiKey('your-key') + * .setApiSecret('your-secret') + * .setCallbackUri('https://your-app.com') + * .setNodeName('node-1') + * ); + * + * // With typed request + * sdk.function('SendEmail', { priority: TickerTaskPriority.High }) + * .withRequest({ to: '', subject: '', body: '' }) + * .handle(async (ctx, signal) => { + * ctx.request.to; // fully typed + * }); + * + * // Without request + * sdk.function('Cleanup', { cronExpression: '0 0 3 * * *' }) + * .handle(async (ctx, signal) => { + * console.log(ctx.functionName); + * }); + * + * await sdk.start(); + * sdk.expressHandlers().mount(app); + * ``` + */ +export class TickerQSdk { + readonly options: TickerSdkOptions; + readonly httpClient: TickerQSdkHttpClient; + readonly syncService: TickerQFunctionSyncService; + readonly persistenceProvider: TickerQRemotePersistenceProvider; + readonly taskScheduler: TickerQTaskScheduler; + readonly concurrencyGate: TickerFunctionConcurrencyGate; + + private readonly endpoint: SdkExecutionEndpoint; + private readonly logger: TickerQLogger | null; + private _started = false; + + constructor( + configure: (options: TickerSdkOptions) => void, + logger?: TickerQLogger, + ) { + this.options = new TickerSdkOptions(); + configure(this.options); + this.options.validate(); + + this.logger = logger ?? null; + this.httpClient = new TickerQSdkHttpClient(this.options, this.logger ?? undefined); + this.syncService = new TickerQFunctionSyncService(this.httpClient, this.options); + this.persistenceProvider = new TickerQRemotePersistenceProvider(this.httpClient); + this.taskScheduler = new TickerQTaskScheduler(); + this.concurrencyGate = new TickerFunctionConcurrencyGate(); + + this.endpoint = new SdkExecutionEndpoint( + this.options, + this.syncService, + this.taskScheduler, + this.concurrencyGate, + this.persistenceProvider, + this.logger ?? undefined, + ); + } + + /** + * Register a function WITH a typed request payload. + * The default instance provides both the type inference AND the example JSON for the Hub. + * + * ```ts + * sdk.registerFunction('SendEmail', + * { to: '', subject: '', body: '' }, // ← default instance + * async (ctx, signal) => { + * ctx.request.to; // ← string, fully typed + * }, + * ); + * ``` + */ + registerFunction( + functionName: string, + requestDefault: TRequest, + handler: TickerFunctionHandler, + options?: FunctionOptions, + ): this; + + /** + * Register a function WITHOUT a request payload. + * + * ```ts + * sdk.registerFunction('Cleanup', async (ctx, signal) => { + * console.log(ctx.functionName); + * }); + * ``` + */ + registerFunction( + functionName: string, + handler: TickerFunctionHandlerNoRequest, + options?: FunctionOptions, + ): this; + + // ─── Implementation ───────────────────────────────────────────────── + + registerFunction( + functionName: string, + requestDefaultOrHandler: Record | TickerFunctionHandlerNoRequest, + handlerOrOptions?: TickerFunctionHandler | FunctionOptions, + maybeOptions?: FunctionOptions, + ): this { + if (typeof requestDefaultOrHandler === 'function') { + TickerFunctionProvider.registerFunction( + functionName, + requestDefaultOrHandler as TickerFunctionHandlerNoRequest, + handlerOrOptions as FunctionOptions | undefined, + ); + } else { + TickerFunctionProvider.registerFunction( + functionName, + requestDefaultOrHandler, + handlerOrOptions as TickerFunctionHandler, + maybeOptions, + ); + } + return this; + } + + /** + * Fluent builder for registering a function. + * + * ```ts + * // With typed request + * sdk.function('SendEmail', { priority: TickerTaskPriority.High }) + * .withRequest({ to: '', subject: '', body: '' }) + * .handle(async (ctx, signal) => { + * ctx.request.to; // fully typed + * }); + * + * // Without request + * sdk.function('Cleanup', { cronExpression: '0 0 3 * * *' }) + * .handle(async (ctx, signal) => { }); + * ``` + */ + function(functionName: string, options?: FunctionOptions): TickerFunctionBuilder { + return new TickerFunctionBuilder(functionName, options); + } + + /** + * Start the SDK: freeze function registry and sync with Hub. + */ + async start(): Promise { + if (this._started) return; + + TickerFunctionProvider.build(); + + this.logger?.info( + `TickerQ SDK: Starting with ${TickerFunctionProvider.tickerFunctions.size} registered function(s)...`, + ); + + const result = await this.syncService.syncAsync(); + + if (result) { + this.logger?.info( + `TickerQ SDK: Synced with Hub. Scheduler URL: ${result.applicationUrl}`, + ); + } else { + this.logger?.warn('TickerQ SDK: Hub sync returned null. Functions may not be scheduled.'); + } + + this._started = true; + } + + /** + * Graceful shutdown: wait for running tasks and dispose the scheduler. + */ + async stop(timeoutMs = 30_000): Promise { + this.logger?.info('TickerQ SDK: Stopping...'); + this.taskScheduler.freeze(); + await this.taskScheduler.waitForRunningTasks(timeoutMs); + this.taskScheduler.dispose(); + this.logger?.info('TickerQ SDK: Stopped.'); + } + + /** + * Returns a framework-agnostic HTTP handler for /execute and /resync. + */ + createHandler(prefix = ''): (req: import('http').IncomingMessage, res: import('http').ServerResponse) => void { + return this.endpoint.createHandler(prefix); + } + + /** + * Returns Express-compatible route handlers for /execute and /resync. + */ + expressHandlers(prefix = '') { + return this.endpoint.expressHandlers(prefix); + } + + get isStarted(): boolean { + return this._started; + } +} diff --git a/hub/sdks/node/src/TickerSdkOptions.ts b/hub/sdks/node/src/TickerSdkOptions.ts new file mode 100644 index 00000000..6d3ca9cc --- /dev/null +++ b/hub/sdks/node/src/TickerSdkOptions.ts @@ -0,0 +1,78 @@ +export const TICKERQ_SDK_CONSTANTS = { + HubBaseUrl: 'https://hub.tickerq.net/', + HubHostname: 'hub.tickerq.net', +} as const; + +export class TickerSdkOptions { + /** Scheduler URL — updated after sync with Hub. */ + apiUri: string | null = null; + + /** Fixed Hub URL. */ + readonly hubUri: string = TICKERQ_SDK_CONSTANTS.HubBaseUrl; + + /** HMAC-SHA256 webhook signature key — set after Hub sync. */ + webhookSignature: string | null = null; + + /** Public URL where the Hub sends execution callbacks. */ + callbackUri: string | null = null; + + /** Hub API key for authentication. */ + apiKey: string | null = null; + + /** Hub API secret for authentication. */ + apiSecret: string | null = null; + + /** Identifier for this application node. */ + nodeName: string | null = null; + + /** HTTP request timeout in milliseconds (default: 30000). */ + timeoutMs: number = 30_000; + + /** Allow self-signed SSL certificates (dev/local Scheduler). Default: false. */ + allowSelfSignedCerts: boolean = false; + + setApiKey(apiKey: string): this { + this.apiKey = apiKey; + return this; + } + + setApiSecret(apiSecret: string): this { + this.apiSecret = apiSecret; + return this; + } + + setCallbackUri(callbackUri: string): this { + this.callbackUri = callbackUri; + return this; + } + + setNodeName(nodeName: string): this { + this.nodeName = nodeName; + return this; + } + + setTimeoutMs(timeoutMs: number): this { + this.timeoutMs = timeoutMs; + return this; + } + + setAllowSelfSignedCerts(allow: boolean): this { + this.allowSelfSignedCerts = allow; + return this; + } + + validate(): void { + if (!this.apiKey) { + throw new Error('TickerQ SDK: ApiKey is required. Call setApiKey().'); + } + if (!this.apiSecret) { + throw new Error('TickerQ SDK: ApiSecret is required. Call setApiSecret().'); + } + if (!this.callbackUri) { + throw new Error('TickerQ SDK: CallbackUri is required. Call setCallbackUri().'); + } + if (!this.nodeName) { + throw new Error('TickerQ SDK: NodeName is required. Call setNodeName().'); + } + } +} diff --git a/hub/sdks/node/src/client/TickerQSdkHttpClient.ts b/hub/sdks/node/src/client/TickerQSdkHttpClient.ts new file mode 100644 index 00000000..d129d4cd --- /dev/null +++ b/hub/sdks/node/src/client/TickerQSdkHttpClient.ts @@ -0,0 +1,208 @@ +import * as https from 'node:https'; +import * as http from 'node:http'; +import { TickerSdkOptions, TICKERQ_SDK_CONSTANTS } from '../TickerSdkOptions'; +import { generateSignature } from '../utils/TickerQSignature'; + +/** + * HTTP client for communicating with TickerQ Hub and Scheduler. + * + * - Hub requests get X-Api-Key / X-Api-Secret headers. + * - Scheduler requests get X-Timestamp / X-TickerQ-Signature headers. + */ +export class TickerQSdkHttpClient { + private readonly options: TickerSdkOptions; + private readonly logger: TickerQLogger | null; + private readonly insecureAgent: https.Agent | undefined; + + constructor(options: TickerSdkOptions, logger?: TickerQLogger) { + this.options = options; + this.logger = logger ?? null; + + if (options.allowSelfSignedCerts) { + this.insecureAgent = new https.Agent({ rejectUnauthorized: false }); + } + } + + async getAsync(path: string, signal?: AbortSignal): Promise { + return this.sendAsync('GET', path, undefined, signal); + } + + async postAsync(path: string, request: TRequest, signal?: AbortSignal): Promise { + return this.sendAsync('POST', path, request, signal); + } + + async putAsync(path: string, request: TRequest, signal?: AbortSignal): Promise { + return this.sendAsync('PUT', path, request, signal); + } + + /** + * PUT that throws on failure instead of swallowing errors. + * Used for critical operations like status reporting. + */ + async putAsyncOrThrow(path: string, request: TRequest, signal?: AbortSignal): Promise { + const url = this.buildUrl(path); + const body = JSON.stringify(request); + const headers = this.buildHeaders(url, 'PUT', body); + headers['Content-Type'] = 'application/json'; + + const responseBody = await this.rawRequest(url, 'PUT', headers, body, signal); + + if (responseBody === null) { + throw new Error(`TickerQ HTTP PUT ${path} failed: no response`); + } + } + + async deleteAsync(path: string, signal?: AbortSignal): Promise { + await this.sendAsync('DELETE', path, undefined, signal); + } + + async getBytesAsync(path: string, signal?: AbortSignal): Promise { + const url = this.buildUrl(path); + const headers = this.buildHeaders(url, 'GET', ''); + + try { + const result = await this.rawRequest(url, 'GET', headers, undefined, signal); + if (result === null) return null; + return Buffer.from(result, 'utf-8'); + } catch (err) { + this.logger?.error(`TickerQ HTTP GET ${path} error:`, err); + return null; + } + } + + private async sendAsync( + method: string, + path: string, + request?: TRequest, + signal?: AbortSignal, + ): Promise { + const url = this.buildUrl(path); + const body = request !== undefined ? JSON.stringify(request) : ''; + const headers = this.buildHeaders(url, method, body); + + if (body) { + headers['Content-Type'] = 'application/json'; + } + + try { + const responseBody = await this.rawRequest(url, method, headers, body || undefined, signal); + if (!responseBody) return null; + return JSON.parse(responseBody) as TResponse; + } catch (err) { + this.logger?.error(`TickerQ HTTP ${method} ${path} error:`, err); + return null; + } + } + + /** + * Low-level HTTP request using node:http / node:https. + * This bypasses fetch entirely so we can use https.Agent + * with rejectUnauthorized: false for self-signed certs. + */ + private rawRequest( + url: URL, + method: string, + headers: Record, + body?: string, + signal?: AbortSignal, + ): Promise { + return new Promise((resolve, reject) => { + const isHttps = url.protocol === 'https:'; + const transport = isHttps ? https : http; + + const reqOptions: https.RequestOptions = { + hostname: url.hostname, + port: url.port || (isHttps ? 443 : 80), + path: url.pathname + (url.search || ''), + method, + headers, + timeout: this.options.timeoutMs, + }; + + // Apply insecure agent for self-signed certs + if (isHttps && this.insecureAgent) { + reqOptions.agent = this.insecureAgent; + } + + const req = transport.request(reqOptions, (res) => { + const chunks: Buffer[] = []; + res.on('data', (chunk: Buffer) => chunks.push(chunk)); + res.on('end', () => { + const responseBody = Buffer.concat(chunks).toString('utf-8'); + + if (!res.statusCode || res.statusCode >= 400) { + const errMsg = `TickerQ HTTP ${method} ${url.pathname} failed: ${res.statusCode} ${responseBody}`; + this.logger?.error(errMsg); + reject(new Error(errMsg)); + return; + } + + resolve(responseBody || null); + }); + }); + + req.on('error', reject); + req.on('timeout', () => { + req.destroy(new Error(`TickerQ HTTP ${method} ${url.pathname} timed out after ${this.options.timeoutMs}ms`)); + }); + + // Abort support + if (signal) { + if (signal.aborted) { + req.destroy(new Error('Aborted')); + return; + } + signal.addEventListener('abort', () => req.destroy(new Error('Aborted')), { once: true }); + } + + if (body) { + req.write(body, 'utf-8'); + } + req.end(); + }); + } + + private buildUrl(path: string): URL { + const baseUri = this.isHubPath(path) + ? this.options.hubUri + : (this.options.apiUri ?? this.options.hubUri); + return new URL(path, baseUri); + } + + private isHubPath(path: string): boolean { + return path.startsWith('/api/apps/'); + } + + private isHubRequest(url: URL): boolean { + return url.hostname === TICKERQ_SDK_CONSTANTS.HubHostname; + } + + private buildHeaders(url: URL, method: string, body: string): Record { + const headers: Record = {}; + + if (this.isHubRequest(url)) { + if (this.options.apiKey) headers['X-Api-Key'] = this.options.apiKey; + if (this.options.apiSecret) headers['X-Api-Secret'] = this.options.apiSecret; + } else if (this.options.webhookSignature) { + const timestamp = Math.floor(Date.now() / 1000); + const pathAndQuery = url.pathname + (url.search || ''); + const signature = generateSignature( + this.options.webhookSignature, + method, + pathAndQuery, + timestamp, + body, + ); + headers['X-Timestamp'] = String(timestamp); + headers['X-TickerQ-Signature'] = signature; + } + + return headers; + } +} + +export interface TickerQLogger { + info(message: string, ...args: unknown[]): void; + warn(message: string, ...args: unknown[]): void; + error(message: string, ...args: unknown[]): void; +} diff --git a/hub/sdks/node/src/enums/RunCondition.ts b/hub/sdks/node/src/enums/RunCondition.ts new file mode 100644 index 00000000..2a223872 --- /dev/null +++ b/hub/sdks/node/src/enums/RunCondition.ts @@ -0,0 +1,8 @@ +export enum RunCondition { + OnSuccess = 0, + OnFailure = 1, + OnCancelled = 2, + OnFailureOrCancelled = 3, + OnAnyCompletedStatus = 4, + InProgress = 5, +} diff --git a/hub/sdks/node/src/enums/TickerStatus.ts b/hub/sdks/node/src/enums/TickerStatus.ts new file mode 100644 index 00000000..8f6d0302 --- /dev/null +++ b/hub/sdks/node/src/enums/TickerStatus.ts @@ -0,0 +1,10 @@ +export enum TickerStatus { + Idle = 0, + Queued = 1, + InProgress = 2, + Done = 3, + DueDone = 4, + Failed = 5, + Cancelled = 6, + Skipped = 7, +} diff --git a/hub/sdks/node/src/enums/TickerTaskPriority.ts b/hub/sdks/node/src/enums/TickerTaskPriority.ts new file mode 100644 index 00000000..bfdb1f26 --- /dev/null +++ b/hub/sdks/node/src/enums/TickerTaskPriority.ts @@ -0,0 +1,6 @@ +export enum TickerTaskPriority { + LongRunning = 0, + High = 1, + Normal = 2, + Low = 3, +} diff --git a/hub/sdks/node/src/enums/TickerType.ts b/hub/sdks/node/src/enums/TickerType.ts new file mode 100644 index 00000000..1fbf9ff5 --- /dev/null +++ b/hub/sdks/node/src/enums/TickerType.ts @@ -0,0 +1,4 @@ +export enum TickerType { + CronTickerOccurrence = 0, + TimeTicker = 1, +} diff --git a/hub/sdks/node/src/enums/index.ts b/hub/sdks/node/src/enums/index.ts new file mode 100644 index 00000000..e39457c7 --- /dev/null +++ b/hub/sdks/node/src/enums/index.ts @@ -0,0 +1,4 @@ +export { TickerType } from './TickerType'; +export { TickerStatus } from './TickerStatus'; +export { TickerTaskPriority } from './TickerTaskPriority'; +export { RunCondition } from './RunCondition'; diff --git a/hub/sdks/node/src/index.ts b/hub/sdks/node/src/index.ts new file mode 100644 index 00000000..7a584f0a --- /dev/null +++ b/hub/sdks/node/src/index.ts @@ -0,0 +1,50 @@ +// ─── Main SDK Entry Point ─────────────────────────────────────────────── +export { TickerQSdk } from './TickerQSdk'; + +// ─── Configuration ────────────────────────────────────────────────────── +export { TickerSdkOptions, TICKERQ_SDK_CONSTANTS } from './TickerSdkOptions'; + +// ─── Enums ────────────────────────────────────────────────────────────── +export { TickerType } from './enums/TickerType'; +export { TickerStatus } from './enums/TickerStatus'; +export { TickerTaskPriority } from './enums/TickerTaskPriority'; +export { RunCondition } from './enums/RunCondition'; + +// ─── Models ───────────────────────────────────────────────────────────── +export type { RemoteExecutionContext } from './models/RemoteExecutionContext'; +export type { SyncNodesAndFunctionsResult } from './models/SyncNodesAndFunctionsResult'; +export type { NodeFunction } from './models/NodeFunction'; +export type { Node } from './models/Node'; +export type { TickerFunctionContext } from './models/TickerFunctionContext'; +export type { InternalFunctionContext } from './models/InternalFunctionContext'; +export type { TimeTickerEntity } from './models/TimeTickerEntity'; +export type { CronTickerEntity } from './models/CronTickerEntity'; +export type { PaginationResult } from './models/PaginationResult'; + +// ─── Infrastructure ───────────────────────────────────────────────────── +export { + TickerFunctionProvider, + type TickerFunctionDelegate, + type TickerFunctionHandler, + type TickerFunctionHandlerNoRequest, + type TickerFunctionRegistration, + type TickerFunctionRequestInfo, +} from './infrastructure/TickerFunctionProvider'; +export { TickerFunctionBuilder, type FunctionOptions } from './infrastructure/TickerFunctionBuilder'; +export { TickerQFunctionSyncService } from './infrastructure/TickerQFunctionSyncService'; + +// ─── Client ───────────────────────────────────────────────────────────── +export { TickerQSdkHttpClient, type TickerQLogger } from './client/TickerQSdkHttpClient'; + +// ─── Persistence ──────────────────────────────────────────────────────── +export { TickerQRemotePersistenceProvider } from './persistence/TickerQRemotePersistenceProvider'; + +// ─── Worker / Task Scheduler ──────────────────────────────────────────── +export { TickerQTaskScheduler } from './worker/TickerQTaskScheduler'; +export { TickerFunctionConcurrencyGate, Semaphore } from './worker/TickerFunctionConcurrencyGate'; + +// ─── Middleware / Endpoints ───────────────────────────────────────────── +export { SdkExecutionEndpoint } from './middleware/SdkExecutionEndpoint'; + +// ─── Utilities ────────────────────────────────────────────────────────── +export { generateSignature, validateSignature } from './utils/TickerQSignature'; diff --git a/hub/sdks/node/src/infrastructure/TickerFunctionBuilder.ts b/hub/sdks/node/src/infrastructure/TickerFunctionBuilder.ts new file mode 100644 index 00000000..40f53c00 --- /dev/null +++ b/hub/sdks/node/src/infrastructure/TickerFunctionBuilder.ts @@ -0,0 +1,80 @@ +import { TickerTaskPriority } from '../enums'; +import type { TickerFunctionContext } from '../models/TickerFunctionContext'; +import { TickerFunctionProvider, type TickerFunctionHandler, type TickerFunctionHandlerNoRequest } from './TickerFunctionProvider'; + +export interface FunctionOptions { + cronExpression?: string; + priority?: TickerTaskPriority; + maxConcurrency?: number; + requestType?: string; +} + +/** + * Fluent builder for registering a TickerQ function. + * + * ```ts + * sdk.function('SendEmail', { priority: TickerTaskPriority.High }) + * .withRequest({ to: '', subject: '', body: '' }) + * .handle(async (ctx, signal) => { + * ctx.request.to; // fully typed + * }); + * + * sdk.function('Cleanup') + * .handle(async (ctx, signal) => { }); + * ``` + */ +export class TickerFunctionBuilder { + private readonly functionName: string; + private readonly options: FunctionOptions; + private requestDefault: unknown = undefined; + private hasRequest = false; + + constructor(functionName: string, options?: FunctionOptions) { + this.functionName = functionName; + this.options = options ?? {}; + } + + /** + * Define a typed request payload for this function. + * The default instance provides type inference AND the example JSON for the Hub. + * + * ```ts + * sdk.function('SendEmail') + * .withRequest({ to: '', subject: '', body: '' }) + * .handle(async (ctx, signal) => { + * ctx.request.to; // string + * }); + * ``` + */ + withRequest(requestDefault: T): TickerFunctionBuilder { + const builder = this as unknown as TickerFunctionBuilder; + builder.requestDefault = requestDefault; + builder.hasRequest = true; + return builder; + } + + /** + * Register the handler for this function. + * Ends the builder chain and registers with TickerFunctionProvider. + */ + handle( + handler: [TRequest] extends [never] + ? TickerFunctionHandlerNoRequest + : TickerFunctionHandler, + ): void { + if (this.hasRequest) { + TickerFunctionProvider.registerFunction( + this.functionName, + this.requestDefault, + handler as TickerFunctionHandler, + this.options, + ); + } else { + TickerFunctionProvider.registerFunction( + this.functionName, + handler as TickerFunctionHandlerNoRequest, + this.options, + ); + } + } +} diff --git a/hub/sdks/node/src/infrastructure/TickerFunctionProvider.ts b/hub/sdks/node/src/infrastructure/TickerFunctionProvider.ts new file mode 100644 index 00000000..be984ce6 --- /dev/null +++ b/hub/sdks/node/src/infrastructure/TickerFunctionProvider.ts @@ -0,0 +1,176 @@ +import { TickerTaskPriority } from '../enums'; +import { TickerFunctionContext } from '../models/TickerFunctionContext'; + +/** + * Handler for a function WITH a typed request payload. + */ +export type TickerFunctionHandler = ( + context: TickerFunctionContext, + signal: AbortSignal, +) => Promise; + +/** + * Handler for a function WITHOUT a request payload. + */ +export type TickerFunctionHandlerNoRequest = ( + context: TickerFunctionContext, + signal: AbortSignal, +) => Promise; + +/** Internal delegate stored in the registry (always receives unknown request). */ +export type TickerFunctionDelegate = ( + context: TickerFunctionContext, + signal: AbortSignal, +) => Promise; + +export interface TickerFunctionRegistration { + cronExpression: string | null; + priority: TickerTaskPriority; + delegate: TickerFunctionDelegate; + maxConcurrency: number; +} + +export interface TickerFunctionRequestInfo { + requestType: string; + requestExampleJson: string; +} + +interface FunctionOptionsBase { + cronExpression?: string; + priority?: TickerTaskPriority; + maxConcurrency?: number; +} + +/** + * Central registry for all ticker functions. + */ +class TickerFunctionProviderImpl { + private _functions: Map = new Map(); + private _requestInfos: Map = new Map(); + private _requestDefaults: Map = new Map(); + private _frozen = false; + + get tickerFunctions(): ReadonlyMap { + return this._functions; + } + + get tickerFunctionRequestInfos(): ReadonlyMap { + return this._requestInfos; + } + + /** + * Register a function WITH a typed request. + * The default instance serves as both the type source AND the example JSON for the Hub. + * + * Usage: + * ```ts + * provider.registerFunction('SendEmail', + * { to: '', subject: '', body: '' }, // ← default instance (infers TRequest) + * async (ctx, signal) => { + * ctx.request.to; // ← fully typed + * }, + * ); + * ``` + */ + registerFunction( + functionName: string, + requestDefault: TRequest, + handler: TickerFunctionHandler, + options?: FunctionOptionsBase & { requestType?: string }, + ): void; + + /** + * Register a function WITHOUT a request payload. + * + * Usage: + * ```ts + * provider.registerFunction('Cleanup', async (ctx, signal) => { + * // no ctx.request + * }); + * ``` + */ + registerFunction( + functionName: string, + handler: TickerFunctionHandlerNoRequest, + options?: FunctionOptionsBase, + ): void; + + // ─── Implementation ───────────────────────────────────────────────── + + registerFunction( + functionName: string, + requestDefaultOrHandler: unknown | TickerFunctionHandlerNoRequest, + handlerOrOptions?: TickerFunctionHandler | FunctionOptionsBase, + maybeOptions?: FunctionOptionsBase & { requestType?: string }, + ): void { + if (this._frozen) { + throw new Error(`TickerFunctionProvider is frozen. Cannot register function '${functionName}' after build().`); + } + if (this._functions.has(functionName)) { + throw new Error(`TickerQ: Duplicate function name '${functionName}'. Each function must have a unique name.`); + } + + let delegate: TickerFunctionDelegate; + let options: (FunctionOptionsBase & { requestType?: string }) | undefined; + let requestDefault: unknown = undefined; + + if (typeof requestDefaultOrHandler === 'function') { + // Overload 2: registerFunction(name, handler, options?) + delegate = requestDefaultOrHandler as TickerFunctionDelegate; + options = handlerOrOptions as FunctionOptionsBase | undefined; + } else { + // Overload 1: registerFunction(name, requestDefault, handler, options?) + requestDefault = requestDefaultOrHandler; + delegate = handlerOrOptions as TickerFunctionDelegate; + options = maybeOptions; + } + + this._functions.set(functionName, { + cronExpression: options?.cronExpression ?? null, + priority: options?.priority ?? TickerTaskPriority.Normal, + delegate, + maxConcurrency: options?.maxConcurrency ?? 0, + }); + + if (requestDefault !== undefined) { + this._requestDefaults.set(functionName, requestDefault); + const typeName = options?.requestType + ?? (typeof requestDefault === 'object' && requestDefault !== null + ? requestDefault.constructor?.name ?? 'Object' + : typeof requestDefault); + this._requestInfos.set(functionName, { + requestType: typeName, + requestExampleJson: JSON.stringify(requestDefault, null, 2), + }); + } + } + + /** + * Get the stored request default for a function (used to populate ctx.request from raw bytes). + */ + getRequestDefault(functionName: string): unknown | undefined { + return this._requestDefaults.get(functionName); + } + + build(): void { + this._frozen = true; + } + + getFunction(functionName: string): TickerFunctionRegistration | undefined { + return this._functions.get(functionName); + } + + hasFunction(functionName: string): boolean { + return this._functions.has(functionName); + } + + reset(): void { + this._functions.clear(); + this._requestInfos.clear(); + this._requestDefaults.clear(); + this._frozen = false; + } +} + +/** Singleton instance. */ +export const TickerFunctionProvider = new TickerFunctionProviderImpl(); diff --git a/hub/sdks/node/src/infrastructure/TickerQFunctionSyncService.ts b/hub/sdks/node/src/infrastructure/TickerQFunctionSyncService.ts new file mode 100644 index 00000000..cc7a1bb2 --- /dev/null +++ b/hub/sdks/node/src/infrastructure/TickerQFunctionSyncService.ts @@ -0,0 +1,73 @@ +import { TickerQSdkHttpClient } from '../client/TickerQSdkHttpClient'; +import { TickerSdkOptions } from '../TickerSdkOptions'; +import { TickerFunctionProvider } from './TickerFunctionProvider'; +import type { Node } from '../models/Node'; +import type { NodeFunction } from '../models/NodeFunction'; +import type { SyncNodesAndFunctionsResult } from '../models/SyncNodesAndFunctionsResult'; + +/** + * Synchronizes registered functions with the TickerQ Hub. + * + * On startup, sends all registered functions to Hub and receives: + * - ApplicationUrl (Scheduler endpoint for persistence calls) + * - WebhookSignature (HMAC key for signing/validating requests) + */ +export class TickerQFunctionSyncService { + private readonly client: TickerQSdkHttpClient; + private readonly options: TickerSdkOptions; + + constructor(client: TickerQSdkHttpClient, options: TickerSdkOptions) { + this.client = client; + this.options = options; + } + + /** + * Sync all registered functions with the Hub. + * + * POST /api/apps/sync/nodes-functions/batch + */ + async syncAsync(signal?: AbortSignal): Promise { + const functions = TickerFunctionProvider.tickerFunctions; + const requestInfos = TickerFunctionProvider.tickerFunctionRequestInfos; + + const nodeFunctions: NodeFunction[] = []; + + for (const [name, reg] of functions) { + const requestInfo = requestInfos.get(name); + + const nodeFunction: NodeFunction = { + functionName: name, + expression: reg.cronExpression ?? '', + taskPriority: reg.priority, + requestType: requestInfo?.requestType ?? '', + requestExampleJson: requestInfo?.requestExampleJson ?? '', + }; + + nodeFunctions.push(nodeFunction); + } + + const node: Node = { + nodeName: this.options.nodeName!, + callbackUrl: this.options.callbackUri!, + isProduction: process.env.NODE_ENV === 'production', + functions: nodeFunctions, + }; + + const result = await this.client.postAsync( + '/api/apps/sync/nodes-functions/batch', + node, + signal, + ); + + if (result) { + if (result.applicationUrl) { + this.options.apiUri = result.applicationUrl; + } + if (result.webhookSignature) { + this.options.webhookSignature = result.webhookSignature; + } + } + + return result; + } +} diff --git a/hub/sdks/node/src/middleware/SdkExecutionEndpoint.ts b/hub/sdks/node/src/middleware/SdkExecutionEndpoint.ts new file mode 100644 index 00000000..ee615661 --- /dev/null +++ b/hub/sdks/node/src/middleware/SdkExecutionEndpoint.ts @@ -0,0 +1,450 @@ +import type { IncomingMessage, ServerResponse } from 'http'; +import { validateSignature } from '../utils/TickerQSignature'; +import { TickerSdkOptions } from '../TickerSdkOptions'; +import { TickerFunctionProvider } from '../infrastructure/TickerFunctionProvider'; +import { TickerQFunctionSyncService } from '../infrastructure/TickerQFunctionSyncService'; +import { TickerQTaskScheduler } from '../worker/TickerQTaskScheduler'; +import { TickerFunctionConcurrencyGate } from '../worker/TickerFunctionConcurrencyGate'; +import { TickerQRemotePersistenceProvider } from '../persistence/TickerQRemotePersistenceProvider'; +import { normalizeExecutionContext, type RemoteExecutionContext } from '../models/RemoteExecutionContext'; +import type { TickerFunctionContext } from '../models/TickerFunctionContext'; +import type { InternalFunctionContext } from '../models/InternalFunctionContext'; +import { TickerType, TickerStatus, TickerTaskPriority, RunCondition } from '../enums'; +import type { TickerQLogger } from '../client/TickerQSdkHttpClient'; + +function buildFunctionContext(context: RemoteExecutionContext): TickerFunctionContext { + return { + id: context.id, + type: context.type, + retryCount: context.retryCount, + isDue: context.isDue, + scheduledFor: new Date(context.scheduledFor), + functionName: context.functionName, + request: TickerFunctionProvider.getRequestDefault(context.functionName), + }; +} + +function buildInternalContext( + context: RemoteExecutionContext, + registration: { priority: TickerTaskPriority; maxConcurrency: number }, +): InternalFunctionContext { + return { + parametersToUpdate: [], + cachedPriority: registration.priority, + cachedMaxConcurrency: registration.maxConcurrency, + functionName: context.functionName, + tickerId: context.id, + parentId: null, + type: context.type, + retries: 0, + retryCount: context.retryCount, + status: TickerStatus.InProgress, + elapsedTime: 0, + exceptionDetails: null, + executedAt: new Date().toISOString(), + retryIntervals: [], + releaseLock: false, + executionTime: context.scheduledFor, + runCondition: RunCondition.OnSuccess, + timeTickerChildren: [], + }; +} + +function serializeException(err: unknown): string { + if (err instanceof Error) { + return JSON.stringify({ + type: err.constructor.name, + message: err.message, + stackTrace: err.stack ?? null, + }); + } + return JSON.stringify({ type: 'Unknown', message: String(err), stackTrace: null }); +} + +/** + * HTTP request handler for the /execute and /resync endpoints. + * Framework-agnostic — works with raw Node.js http, Express, Fastify, etc. + */ +export class SdkExecutionEndpoint { + private readonly options: TickerSdkOptions; + private readonly syncService: TickerQFunctionSyncService; + private readonly scheduler: TickerQTaskScheduler; + private readonly concurrencyGate: TickerFunctionConcurrencyGate; + private readonly persistenceProvider: TickerQRemotePersistenceProvider; + private readonly logger: TickerQLogger | null; + + constructor( + options: TickerSdkOptions, + syncService: TickerQFunctionSyncService, + scheduler: TickerQTaskScheduler, + concurrencyGate: TickerFunctionConcurrencyGate, + persistenceProvider: TickerQRemotePersistenceProvider, + logger?: TickerQLogger, + ) { + this.options = options; + this.syncService = syncService; + this.scheduler = scheduler; + this.concurrencyGate = concurrencyGate; + this.persistenceProvider = persistenceProvider; + this.logger = logger ?? null; + } + + /** + * Returns an Express-compatible middleware router. + * Mounts POST /execute and POST /resync under the given prefix. + */ + createHandler(prefix = ''): (req: IncomingMessage, res: ServerResponse) => void { + const executePath = `${prefix}/execute`; + const resyncPath = `${prefix}/resync`; + + return async (req: IncomingMessage, res: ServerResponse) => { + const url = req.url ?? ''; + const method = req.method?.toUpperCase() ?? ''; + + if (method !== 'POST') { + res.writeHead(405); + res.end('Method Not Allowed'); + return; + } + + if (url === executePath) { + await this.handleExecute(req, res); + } else if (url === resyncPath) { + await this.handleResync(req, res); + } else { + res.writeHead(404); + res.end('Not Found'); + } + }; + } + + /** + * Returns Express-compatible route handlers. + * Call with your express app or router instance: + * + * ```ts + * const { execute, resync } = sdk.getEndpoint().expressHandlers(); + * app.post('/execute', execute); + * app.post('/resync', resync); + * ``` + */ + expressHandlers(prefix = ''): { + execute: (req: any, res: any) => Promise; + resync: (req: any, res: any) => Promise; + mount: (app: { post: (path: string, handler: (req: any, res: any) => Promise) => void }) => void; + } { + const execute = async (req: any, res: any) => { + await this.handleExecuteExpress(req, res); + }; + const resync = async (req: any, res: any) => { + await this.handleResync(req, res); + }; + const mount = (app: { post: (path: string, handler: (req: any, res: any) => Promise) => void }) => { + app.post(`${prefix}/execute`, execute); + app.post(`${prefix}/resync`, resync); + }; + return { execute, resync, mount }; + } + + // ─── /execute ─────────────────────────────────────────────────────── + + private async handleExecute(req: IncomingMessage, res: ServerResponse): Promise { + const bodyBytes = await readBody(req); + + // Validate signature + const pathAndQuery = req.url ?? '/execute'; + const validationError = validateSignature( + this.options.webhookSignature, + 'POST', + pathAndQuery, + getHeader(req, 'x-timestamp'), + getHeader(req, 'x-tickerq-signature'), + bodyBytes, + ); + + if (validationError) { + this.logger?.warn(`TickerQ signature validation failed: ${validationError}`); + res.writeHead(401); + res.end('Unauthorized'); + return; + } + + let context: RemoteExecutionContext; + try { + const raw = JSON.parse(bodyBytes.toString('utf-8')); + context = normalizeExecutionContext(raw); + } catch { + res.writeHead(400); + res.end('Invalid JSON body'); + return; + } + + if (!context.functionName) { + res.writeHead(400); + res.end('Missing functionName'); + return; + } + + this.logger?.info( + `TickerQ: Received /execute for '${context.functionName}' (id: ${context.id}, type: ${context.type})`, + ); + + // Look up the function + const registration = TickerFunctionProvider.getFunction(context.functionName); + if (!registration) { + this.logger?.error(`TickerQ: Function '${context.functionName}' not found. Ensure it is registered.`); + res.writeHead(404); + res.end(`Function '${context.functionName}' not found`); + return; + } + + const functionContext = buildFunctionContext(context); + + // Queue execution with priority and concurrency gate + const semaphore = this.concurrencyGate.getSemaphore( + context.functionName, + registration.maxConcurrency, + ); + + // Respond immediately — execution happens async (fire-and-forget from Hub's perspective) + res.writeHead(200); + res.end('OK'); + + // Execute in the task scheduler + this.scheduler.queueAsync(async (signal) => { + await this.executeAndReportStatus(context, registration, functionContext, semaphore, signal); + }, registration.priority).catch((err) => { + this.logger?.error(`TickerQ: Failed to queue '${context.functionName}':`, err); + }); + } + + /** + * Express-specific handler that reads body from req.body if already parsed. + */ + private async handleExecuteExpress(req: any, res: any): Promise { + let bodyBytes: Buffer; + let bodyStr: string; + + if (req.body && typeof req.body === 'object') { + bodyStr = JSON.stringify(req.body); + bodyBytes = Buffer.from(bodyStr, 'utf-8'); + } else if (req.rawBody) { + bodyBytes = Buffer.isBuffer(req.rawBody) ? req.rawBody : Buffer.from(req.rawBody); + bodyStr = bodyBytes.toString('utf-8'); + } else { + bodyBytes = await readBody(req); + bodyStr = bodyBytes.toString('utf-8'); + } + + // Validate signature + const pathAndQuery = req.originalUrl ?? req.url ?? '/execute'; + const validationError = validateSignature( + this.options.webhookSignature, + 'POST', + pathAndQuery, + req.headers['x-timestamp'] as string | undefined, + req.headers['x-tickerq-signature'] as string | undefined, + bodyBytes, + ); + + if (validationError) { + this.logger?.warn(`TickerQ signature validation failed: ${validationError}`); + res.status(401).send('Unauthorized'); + return; + } + + let context: RemoteExecutionContext; + try { + const raw = typeof req.body === 'object' ? req.body : JSON.parse(bodyStr); + context = normalizeExecutionContext(raw); + } catch { + res.status(400).send('Invalid JSON body'); + return; + } + + if (!context.functionName) { + res.status(400).send('Missing functionName'); + return; + } + + this.logger?.info( + `TickerQ: Received /execute for '${context.functionName}' (id: ${context.id}, type: ${context.type})`, + ); + + const registration = TickerFunctionProvider.getFunction(context.functionName); + if (!registration) { + this.logger?.error(`TickerQ: Function '${context.functionName}' not found.`); + res.status(404).send(`Function '${context.functionName}' not found`); + return; + } + + const functionContext = buildFunctionContext(context); + + const semaphore = this.concurrencyGate.getSemaphore( + context.functionName, + registration.maxConcurrency, + ); + + res.status(200).send('OK'); + + this.scheduler.queueAsync(async (signal) => { + await this.executeAndReportStatus(context, registration, functionContext, semaphore, signal); + }, registration.priority).catch((err) => { + this.logger?.error(`TickerQ: Failed to queue '${context.functionName}':`, err); + }); + } + + // ─── Execution lifecycle ────────────────────────────────────────────── + + private async executeAndReportStatus( + context: RemoteExecutionContext, + registration: { delegate: (ctx: any, signal: AbortSignal) => Promise; priority: TickerTaskPriority; maxConcurrency: number }, + functionContext: TickerFunctionContext, + semaphore: { acquire: () => Promise<() => void> } | null, + signal: AbortSignal, + ): Promise { + const internalCtx = buildInternalContext(context, registration); + const startTime = performance.now(); + let release: (() => void) | null = null; + const typeName = context.type === TickerType.CronTickerOccurrence ? 'CronTicker' : 'TimeTicker'; + + this.logger?.info( + `TickerQ [${typeName}] Executing '${context.functionName}' (id: ${context.id}, retry: ${context.retryCount}, isDue: ${context.isDue})`, + ); + + try { + if (semaphore) { + this.logger?.info(`TickerQ [${typeName}] '${context.functionName}' waiting for concurrency semaphore...`); + release = await semaphore.acquire(); + this.logger?.info(`TickerQ [${typeName}] '${context.functionName}' semaphore acquired.`); + } + + internalCtx.status = TickerStatus.InProgress; + this.logger?.info(`TickerQ [${typeName}] '${context.functionName}' status -> InProgress`); + + await registration.delegate(functionContext, signal); + + // Success — set Done or DueDone based on isDue flag + const elapsed = Math.round(performance.now() - startTime); + internalCtx.status = context.isDue ? TickerStatus.DueDone : TickerStatus.Done; + internalCtx.elapsedTime = elapsed; + internalCtx.executedAt = new Date().toISOString(); + internalCtx.parametersToUpdate = ['Status', 'ElapsedTime', 'ExecutedAt']; + + this.logger?.info( + `TickerQ [${typeName}] '${context.functionName}' status -> ${TickerStatus[internalCtx.status]} (${elapsed}ms)`, + ); + } catch (err) { + const elapsed = Math.round(performance.now() - startTime); + + if (signal.aborted || (err instanceof Error && err.name === 'AbortError')) { + internalCtx.status = TickerStatus.Cancelled; + this.logger?.warn( + `TickerQ [${typeName}] '${context.functionName}' status -> Cancelled after ${elapsed}ms`, + ); + } else { + internalCtx.status = TickerStatus.Failed; + this.logger?.error( + `TickerQ [${typeName}] '${context.functionName}' status -> Failed after ${elapsed}ms:`, + err, + ); + } + + internalCtx.elapsedTime = elapsed; + internalCtx.executedAt = new Date().toISOString(); + internalCtx.exceptionDetails = serializeException(err); + internalCtx.parametersToUpdate = ['Status', 'ElapsedTime', 'ExecutedAt', 'ExceptionDetails']; + } finally { + if (release) { + release(); + this.logger?.info(`TickerQ [${typeName}] '${context.functionName}' semaphore released.`); + } + } + + // Report status back to the Scheduler/Hub + const endpoint = context.type === TickerType.CronTickerOccurrence + ? 'cron-ticker-occurrences/context' + : 'time-tickers/context'; + + this.logger?.info( + `TickerQ [${typeName}] '${context.functionName}' reporting status ${TickerStatus[internalCtx.status]} to Scheduler (PUT /${endpoint})...`, + ); + + try { + if (context.type === TickerType.CronTickerOccurrence) { + await this.persistenceProvider.updateCronTickerOccurrence(internalCtx); + } else { + await this.persistenceProvider.updateTimeTicker(internalCtx); + } + this.logger?.info( + `TickerQ [${typeName}] '${context.functionName}' status reported successfully.`, + ); + } catch (err) { + this.logger?.error( + `TickerQ [${typeName}] '${context.functionName}' failed to report status ${TickerStatus[internalCtx.status]} to Scheduler:`, + err, + ); + } + } + + // ─── /resync ──────────────────────────────────────────────────────── + + private async handleResync(req: IncomingMessage | any, res: ServerResponse | any): Promise { + // Validate signature on resync too + const bodyBytes = await readBody(req); + const pathAndQuery = req.originalUrl ?? req.url ?? '/resync'; + const validationError = validateSignature( + this.options.webhookSignature, + 'POST', + pathAndQuery, + getHeader(req, 'x-timestamp'), + getHeader(req, 'x-tickerq-signature'), + bodyBytes, + ); + + if (validationError) { + this.logger?.warn(`TickerQ resync signature validation failed: ${validationError}`); + if (typeof res.status === 'function') { + res.status(401).send('Unauthorized'); + } else { + res.writeHead(401); + res.end('Unauthorized'); + } + return; + } + + try { + await this.syncService.syncAsync(); + if (typeof res.status === 'function') { + res.status(200).send('OK'); + } else { + res.writeHead(200); + res.end('OK'); + } + } catch (err) { + this.logger?.error('TickerQ: Resync failed:', err); + if (typeof res.status === 'function') { + res.status(500).send('Resync failed'); + } else { + res.writeHead(500); + res.end('Resync failed'); + } + } + } +} + +// ─── Helpers ──────────────────────────────────────────────────────────── + +function readBody(req: IncomingMessage): Promise { + return new Promise((resolve, reject) => { + const chunks: Buffer[] = []; + req.on('data', (chunk: Buffer) => chunks.push(chunk)); + req.on('end', () => resolve(Buffer.concat(chunks))); + req.on('error', reject); + }); +} + +function getHeader(req: IncomingMessage, name: string): string | undefined { + const val = req.headers[name]; + return Array.isArray(val) ? val[0] : val; +} diff --git a/hub/sdks/node/src/models/CronTickerEntity.ts b/hub/sdks/node/src/models/CronTickerEntity.ts new file mode 100644 index 00000000..802c9fd8 --- /dev/null +++ b/hub/sdks/node/src/models/CronTickerEntity.ts @@ -0,0 +1,13 @@ +export interface CronTickerEntity { + id: string; + function: string; + description: string | null; + initIdentifier: string | null; + createdAt: string; + updatedAt: string; + expression: string; + request: string | null; + retries: number; + retryIntervals: number[] | null; + isEnabled: boolean; +} diff --git a/hub/sdks/node/src/models/InternalFunctionContext.ts b/hub/sdks/node/src/models/InternalFunctionContext.ts new file mode 100644 index 00000000..bf46e43f --- /dev/null +++ b/hub/sdks/node/src/models/InternalFunctionContext.ts @@ -0,0 +1,22 @@ +import { TickerType, TickerStatus, TickerTaskPriority, RunCondition } from '../enums'; + +export interface InternalFunctionContext { + parametersToUpdate: string[]; + cachedPriority: TickerTaskPriority; + cachedMaxConcurrency: number; + functionName: string; + tickerId: string; + parentId: string | null; + type: TickerType; + retries: number; + retryCount: number; + status: TickerStatus; + elapsedTime: number; + exceptionDetails: string | null; + executedAt: string; + retryIntervals: number[]; + releaseLock: boolean; + executionTime: string; + runCondition: RunCondition; + timeTickerChildren: InternalFunctionContext[]; +} diff --git a/hub/sdks/node/src/models/Node.ts b/hub/sdks/node/src/models/Node.ts new file mode 100644 index 00000000..14fb80fb --- /dev/null +++ b/hub/sdks/node/src/models/Node.ts @@ -0,0 +1,8 @@ +import { NodeFunction } from './NodeFunction'; + +export interface Node { + nodeName: string; + callbackUrl: string; + isProduction: boolean; + functions: NodeFunction[]; +} diff --git a/hub/sdks/node/src/models/NodeFunction.ts b/hub/sdks/node/src/models/NodeFunction.ts new file mode 100644 index 00000000..199766ed --- /dev/null +++ b/hub/sdks/node/src/models/NodeFunction.ts @@ -0,0 +1,9 @@ +import { TickerTaskPriority } from '../enums'; + +export interface NodeFunction { + functionName: string; + requestType: string; + requestExampleJson: string; + taskPriority: TickerTaskPriority; + expression: string; +} diff --git a/hub/sdks/node/src/models/PaginationResult.ts b/hub/sdks/node/src/models/PaginationResult.ts new file mode 100644 index 00000000..61479f28 --- /dev/null +++ b/hub/sdks/node/src/models/PaginationResult.ts @@ -0,0 +1,11 @@ +export interface PaginationResult { + items: T[]; + totalCount: number; + pageNumber: number; + pageSize: number; + totalPages: number; + hasPreviousPage: boolean; + hasNextPage: boolean; + firstItemIndex: number; + lastItemIndex: number; +} diff --git a/hub/sdks/node/src/models/RemoteExecutionContext.ts b/hub/sdks/node/src/models/RemoteExecutionContext.ts new file mode 100644 index 00000000..e921d97f --- /dev/null +++ b/hub/sdks/node/src/models/RemoteExecutionContext.ts @@ -0,0 +1,33 @@ +import { TickerType } from '../enums'; + +/** + * Raw execution context as sent by the TickerQ Scheduler/RemoteExecutor. + * The Hub serializes with PascalCase. + * We accept both PascalCase and camelCase via normalization. + */ +export interface RemoteExecutionContext { + id: string; + type: TickerType; + retryCount: number; + isDue: boolean; + scheduledFor: string; + functionName: string; +} + +/** + * Normalizes a parsed JSON object to camelCase keys (one level deep). + * Handles both PascalCase and camelCase property names. + */ +export function normalizeExecutionContext(raw: Record): RemoteExecutionContext { + const get = (camel: string, pascal: string): unknown => + raw[camel] !== undefined ? raw[camel] : raw[pascal]; + + return { + id: (get('id', 'Id') as string) ?? '', + type: (get('type', 'Type') as TickerType) ?? 0, + retryCount: (get('retryCount', 'RetryCount') as number) ?? 0, + isDue: (get('isDue', 'IsDue') as boolean) ?? false, + scheduledFor: (get('scheduledFor', 'ScheduledFor') as string) ?? new Date().toISOString(), + functionName: (get('functionName', 'FunctionName') as string) ?? '', + }; +} diff --git a/hub/sdks/node/src/models/SyncNodesAndFunctionsResult.ts b/hub/sdks/node/src/models/SyncNodesAndFunctionsResult.ts new file mode 100644 index 00000000..a63e971f --- /dev/null +++ b/hub/sdks/node/src/models/SyncNodesAndFunctionsResult.ts @@ -0,0 +1,4 @@ +export interface SyncNodesAndFunctionsResult { + applicationUrl: string; + webhookSignature: string; +} diff --git a/hub/sdks/node/src/models/TickerFunctionContext.ts b/hub/sdks/node/src/models/TickerFunctionContext.ts new file mode 100644 index 00000000..9dc7bb47 --- /dev/null +++ b/hub/sdks/node/src/models/TickerFunctionContext.ts @@ -0,0 +1,17 @@ +import { TickerType } from '../enums'; + +/** + * Base context passed to every ticker function handler. + * + * When TRequest is provided, the `request` property carries the deserialized payload. + * When omitted (defaults to `never`), `request` is not present. + */ +export interface TickerFunctionContext { + id: string; + type: TickerType; + retryCount: number; + isDue: boolean; + scheduledFor: Date; + functionName: string; + request: TRequest; +} diff --git a/hub/sdks/node/src/models/TimeTickerEntity.ts b/hub/sdks/node/src/models/TimeTickerEntity.ts new file mode 100644 index 00000000..c95c1d5b --- /dev/null +++ b/hub/sdks/node/src/models/TimeTickerEntity.ts @@ -0,0 +1,25 @@ +import { TickerStatus, RunCondition } from '../enums'; + +export interface TimeTickerEntity { + id: string; + function: string; + description: string | null; + initIdentifier: string | null; + createdAt: string; + updatedAt: string; + status: TickerStatus; + lockHolder: string | null; + request: string | null; + executionTime: string | null; + lockedAt: string | null; + executedAt: string | null; + exceptionMessage: string | null; + skippedReason: string | null; + elapsedTime: number; + retries: number; + retryCount: number; + retryIntervals: number[] | null; + parentId: string | null; + children: TimeTickerEntity[]; + runCondition: RunCondition | null; +} diff --git a/hub/sdks/node/src/models/index.ts b/hub/sdks/node/src/models/index.ts new file mode 100644 index 00000000..68d7d4aa --- /dev/null +++ b/hub/sdks/node/src/models/index.ts @@ -0,0 +1,9 @@ +export type { RemoteExecutionContext } from './RemoteExecutionContext'; +export type { SyncNodesAndFunctionsResult } from './SyncNodesAndFunctionsResult'; +export type { NodeFunction } from './NodeFunction'; +export type { Node } from './Node'; +export type { TickerFunctionContext } from './TickerFunctionContext'; +export type { InternalFunctionContext } from './InternalFunctionContext'; +export type { TimeTickerEntity } from './TimeTickerEntity'; +export type { CronTickerEntity } from './CronTickerEntity'; +export type { PaginationResult } from './PaginationResult'; diff --git a/hub/sdks/node/src/persistence/TickerQRemotePersistenceProvider.ts b/hub/sdks/node/src/persistence/TickerQRemotePersistenceProvider.ts new file mode 100644 index 00000000..cff9acb4 --- /dev/null +++ b/hub/sdks/node/src/persistence/TickerQRemotePersistenceProvider.ts @@ -0,0 +1,186 @@ +import { TickerQSdkHttpClient } from '../client/TickerQSdkHttpClient'; +import type { InternalFunctionContext } from '../models/InternalFunctionContext'; +import type { TimeTickerEntity } from '../models/TimeTickerEntity'; +import type { CronTickerEntity } from '../models/CronTickerEntity'; + +const TIME_TICKERS_PATH = 'time-tickers'; +const CRON_TICKERS_PATH = 'cron-tickers'; + +/** + * Remote persistence provider that communicates with the TickerQ Scheduler via HTTP. + * + * Only CRUD operations are implemented. Query/queue operations throw NotSupportedError. + */ +export class TickerQRemotePersistenceProvider { + private readonly client: TickerQSdkHttpClient; + + constructor(client: TickerQSdkHttpClient) { + this.client = client; + } + + // ─── Time Ticker CRUD ─────────────────────────────────────────────── + + async addTimeTickers(tickers: TimeTickerEntity[], signal?: AbortSignal): Promise { + await this.client.postAsync(`/${TIME_TICKERS_PATH}`, tickers, signal); + return tickers.length; + } + + async updateTimeTickers(tickers: TimeTickerEntity[], signal?: AbortSignal): Promise { + await this.client.putAsync(`/${TIME_TICKERS_PATH}`, tickers, signal); + return tickers.length; + } + + async removeTimeTickers(tickerIds: string[], signal?: AbortSignal): Promise { + await this.client.postAsync(`/${TIME_TICKERS_PATH}/delete`, tickerIds, signal); + return tickerIds.length; + } + + async updateTimeTicker(functionContext: InternalFunctionContext, signal?: AbortSignal): Promise { + await this.client.putAsyncOrThrow(`/${TIME_TICKERS_PATH}/context`, functionContext, signal); + } + + async updateTimeTickersWithUnifiedContext( + timeTickerIds: string[], + functionContext: InternalFunctionContext, + signal?: AbortSignal, + ): Promise { + await this.client.postAsync( + `/${TIME_TICKERS_PATH}/unified-context`, + { ids: timeTickerIds, context: functionContext }, + signal, + ); + } + + async getTimeTickerRequest(id: string, signal?: AbortSignal): Promise { + return this.client.getBytesAsync(`/${TIME_TICKERS_PATH}/request/${id}`, signal); + } + + // ─── Cron Ticker CRUD ─────────────────────────────────────────────── + + async insertCronTickers(tickers: CronTickerEntity[], signal?: AbortSignal): Promise { + await this.client.postAsync(`/${CRON_TICKERS_PATH}`, tickers, signal); + return tickers.length; + } + + async updateCronTickers(tickers: CronTickerEntity[], signal?: AbortSignal): Promise { + await this.client.putAsync(`/${CRON_TICKERS_PATH}`, tickers, signal); + return tickers.length; + } + + async removeCronTickers(cronTickerIds: string[], signal?: AbortSignal): Promise { + await this.client.postAsync(`/${CRON_TICKERS_PATH}/delete`, cronTickerIds, signal); + return cronTickerIds.length; + } + + // ─── Cron Ticker Occurrence ───────────────────────────────────────── + + async updateCronTickerOccurrence(functionContext: InternalFunctionContext, signal?: AbortSignal): Promise { + await this.client.putAsyncOrThrow('/cron-ticker-occurrences/context', functionContext, signal); + } + + async getCronTickerOccurrenceRequest(tickerId: string, signal?: AbortSignal): Promise { + return this.client.getBytesAsync(`/cron-ticker-occurrences/request/${tickerId}`, signal); + } + + // ─── Not Supported (server-side only) ─────────────────────────────── + + queueTimeTickers(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + queueTimedOutTimeTickers(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + releaseAcquiredTimeTickers(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getEarliestTimeTickers(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + migrateDefinedCronTickers(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getAllCronTickerExpressions(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + releaseDeadNodeTimeTickerResources(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getEarliestAvailableCronOccurrence(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + queueCronTickerOccurrences(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + queueTimedOutCronTickerOccurrences(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + releaseAcquiredCronTickerOccurrences(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + updateCronTickerOccurrencesWithUnifiedContext(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + releaseDeadNodeOccurrenceResources(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getTimeTickerById(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getTimeTickers(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getTimeTickersPaginated(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getCronTickerById(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getCronTickers(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getCronTickersPaginated(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getAllCronTickerOccurrences(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + getAllCronTickerOccurrencesPaginated(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + insertCronTickerOccurrences(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + removeCronTickerOccurrences(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + acquireImmediateTimeTickersAsync(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } + + acquireImmediateCronOccurrencesAsync(): never { + throw new Error('NotSupported: This operation requires direct database access. Use the Hub dashboard or the local persistence provider.'); + } +} diff --git a/hub/sdks/node/src/utils/TickerQSignature.ts b/hub/sdks/node/src/utils/TickerQSignature.ts new file mode 100644 index 00000000..32ce8dc7 --- /dev/null +++ b/hub/sdks/node/src/utils/TickerQSignature.ts @@ -0,0 +1,90 @@ +import { createHmac, timingSafeEqual } from 'crypto'; + +const MAX_TIMESTAMP_SKEW_SECONDS = 300; + +/** + * Generates an HMAC-SHA256 signature for outgoing requests to the Scheduler. + * + * Payload = UTF-8("{METHOD}\n{PATH}?{QUERY}\n{TIMESTAMP}\n") + UTF-8(body) + * Key = UTF-8(webhookSignature) + * Output = Base64(HMAC-SHA256(key, payload)) + */ +export function generateSignature( + webhookSignature: string, + method: string, + pathAndQuery: string, + timestamp: number, + body: string, +): string { + const header = `${method}\n${pathAndQuery}\n${timestamp}\n`; + const headerBytes = Buffer.from(header, 'utf-8'); + const bodyBytes = Buffer.from(body || '', 'utf-8'); + const payload = Buffer.concat([headerBytes, bodyBytes]); + + const key = Buffer.from(webhookSignature, 'utf-8'); + const hmac = createHmac('sha256', key); + hmac.update(payload); + return hmac.digest('base64'); +} + +/** + * Validates an incoming HMAC-SHA256 signature on webhook requests. + * + * Returns null on success, or an error message string on failure. + */ +export function validateSignature( + webhookSignature: string | null, + method: string, + pathAndQuery: string, + timestampHeader: string | undefined, + signatureHeader: string | undefined, + bodyBytes: Buffer, +): string | null { + if (!webhookSignature) { + return 'WebhookSignature is not configured. Cannot validate request.'; + } + + if (!signatureHeader) { + return 'Missing X-TickerQ-Signature header.'; + } + + if (!timestampHeader) { + return 'Missing X-Timestamp header.'; + } + + const timestamp = parseInt(timestampHeader, 10); + if (isNaN(timestamp)) { + return 'Invalid X-Timestamp format.'; + } + + const nowSeconds = Math.floor(Date.now() / 1000); + if (Math.abs(nowSeconds - timestamp) > MAX_TIMESTAMP_SKEW_SECONDS) { + return `Timestamp skew exceeds ${MAX_TIMESTAMP_SKEW_SECONDS} seconds.`; + } + + let receivedBytes: Buffer; + try { + receivedBytes = Buffer.from(signatureHeader, 'base64'); + } catch { + return 'Invalid Base64 in X-TickerQ-Signature header.'; + } + + const header = `${method}\n${pathAndQuery}\n${timestamp}\n`; + const headerBytes = Buffer.from(header, 'utf-8'); + const payload = Buffer.concat([headerBytes, bodyBytes]); + + const key = Buffer.from(webhookSignature, 'utf-8'); + const hmac = createHmac('sha256', key); + hmac.update(payload); + const expectedBytes = hmac.digest(); + + if (expectedBytes.length !== receivedBytes.length) { + return 'Signature mismatch.'; + } + + if (!timingSafeEqual(expectedBytes, receivedBytes)) { + return 'Signature mismatch.'; + } + + return null; +} diff --git a/hub/sdks/node/src/worker/TickerFunctionConcurrencyGate.ts b/hub/sdks/node/src/worker/TickerFunctionConcurrencyGate.ts new file mode 100644 index 00000000..5f0491f8 --- /dev/null +++ b/hub/sdks/node/src/worker/TickerFunctionConcurrencyGate.ts @@ -0,0 +1,72 @@ +/** + * Per-function concurrency limiter. + * + * Uses a simple semaphore pattern: acquire() returns a release function. + * If maxConcurrency is 0, no limit is applied. + */ +export class TickerFunctionConcurrencyGate { + private readonly semaphores: Map = new Map(); + + /** + * Get or create a semaphore for the given function. + * Returns null if maxConcurrency is 0 (no limit). + */ + getSemaphore(functionName: string, maxConcurrency: number): Semaphore | null { + if (maxConcurrency <= 0) return null; + + let sem = this.semaphores.get(functionName); + if (!sem) { + sem = new Semaphore(maxConcurrency); + this.semaphores.set(functionName, sem); + } + return sem; + } +} + +/** + * Async counting semaphore. + */ +export class Semaphore { + private currentCount: number; + private readonly maxCount: number; + private readonly waiters: Array<() => void> = []; + + constructor(maxCount: number) { + this.maxCount = maxCount; + this.currentCount = maxCount; + } + + /** + * Acquire one slot. Resolves when a slot is available. + * Returns a release function that must be called when done. + */ + async acquire(): Promise<() => void> { + if (this.currentCount > 0) { + this.currentCount--; + return () => this.release(); + } + + return new Promise<() => void>((resolve) => { + this.waiters.push(() => { + this.currentCount--; + resolve(() => this.release()); + }); + }); + } + + private release(): void { + this.currentCount++; + if (this.waiters.length > 0 && this.currentCount > 0) { + const next = this.waiters.shift()!; + next(); + } + } + + get availableCount(): number { + return this.currentCount; + } + + get waitingCount(): number { + return this.waiters.length; + } +} diff --git a/hub/sdks/node/src/worker/TickerQTaskScheduler.ts b/hub/sdks/node/src/worker/TickerQTaskScheduler.ts new file mode 100644 index 00000000..1a3d7f71 --- /dev/null +++ b/hub/sdks/node/src/worker/TickerQTaskScheduler.ts @@ -0,0 +1,194 @@ +import { TickerTaskPriority } from '../enums'; + +interface QueuedTask { + work: (signal: AbortSignal) => Promise; + priority: TickerTaskPriority; + resolve: () => void; + reject: (err: unknown) => void; +} + +/** + * Priority-based async task scheduler for TickerQ function execution. + * + * Node.js is single-threaded, but we still benefit from: + * - Priority-ordered execution (High > Normal > Low) + * - Controlled concurrency (prevents unbounded parallel I/O) + * - LongRunning tasks run in a separate "lane" (no max concurrency) + * - Per-function concurrency gates via TickerFunctionConcurrencyGate + * + * Default worker concurrency = number of CPUs. + */ +export class TickerQTaskScheduler { + private readonly maxWorkers: number; + private activeWorkers = 0; + private _isFrozen = false; + private _isDisposed = false; + + /** Separate queues per priority level. */ + private readonly queues: Map = new Map([ + [TickerTaskPriority.High, []], + [TickerTaskPriority.Normal, []], + [TickerTaskPriority.Low, []], + ]); + + /** LongRunning tasks bypass the concurrency limit. */ + private longRunningCount = 0; + + /** Track running task promises for graceful shutdown. */ + private readonly runningTasks: Set> = new Set(); + + constructor(maxWorkers?: number) { + const cpus = typeof require !== 'undefined' + ? require('os').cpus()?.length ?? 4 + : 4; + this.maxWorkers = maxWorkers ?? cpus; + } + + get isFrozen(): boolean { + return this._isFrozen; + } + + get isDisposed(): boolean { + return this._isDisposed; + } + + get totalActiveWorkers(): number { + return this.activeWorkers + this.longRunningCount; + } + + get totalQueuedTasks(): number { + let total = 0; + for (const queue of this.queues.values()) { + total += queue.length; + } + return total; + } + + /** + * Queue an async task with priority. + */ + async queueAsync( + work: (signal: AbortSignal) => Promise, + priority: TickerTaskPriority, + ): Promise { + if (this._isDisposed) { + throw new Error('TickerQTaskScheduler is disposed.'); + } + + if (this._isFrozen) { + throw new Error('TickerQTaskScheduler is frozen. Call resume() first.'); + } + + // LongRunning tasks execute immediately without queuing. + if (priority === TickerTaskPriority.LongRunning) { + return this.executeLongRunning(work); + } + + return new Promise((resolve, reject) => { + const queue = this.queues.get(priority)!; + queue.push({ work, priority, resolve, reject }); + this.processNext(); + }); + } + + freeze(): void { + this._isFrozen = true; + } + + resume(): void { + this._isFrozen = false; + this.processNext(); + } + + /** + * Wait for all running tasks to complete. + */ + async waitForRunningTasks(timeoutMs?: number): Promise { + if (this.runningTasks.size === 0 && this.totalQueuedTasks === 0) { + return true; + } + + const allDone = Promise.all(this.runningTasks).then(() => true); + + if (timeoutMs == null) { + await allDone; + return true; + } + + const timeout = new Promise((resolve) => + setTimeout(() => resolve(false), timeoutMs), + ); + + return Promise.race([allDone, timeout]); + } + + dispose(): void { + this._isDisposed = true; + this._isFrozen = true; + // Clear queues and reject pending tasks. + for (const queue of this.queues.values()) { + for (const task of queue) { + task.reject(new Error('TickerQTaskScheduler disposed.')); + } + queue.length = 0; + } + } + + getDiagnostics(): string { + const lines: string[] = [ + `Workers: ${this.activeWorkers}/${this.maxWorkers} (LongRunning: ${this.longRunningCount})`, + `Queued: High=${this.queues.get(TickerTaskPriority.High)!.length} Normal=${this.queues.get(TickerTaskPriority.Normal)!.length} Low=${this.queues.get(TickerTaskPriority.Low)!.length}`, + `Frozen: ${this._isFrozen} Disposed: ${this._isDisposed}`, + ]; + return lines.join('\n'); + } + + private processNext(): void { + if (this._isFrozen || this._isDisposed) return; + if (this.activeWorkers >= this.maxWorkers) return; + + const task = this.dequeueHighestPriority(); + if (!task) return; + + this.activeWorkers++; + const ac = new AbortController(); + + const taskPromise = task.work(ac.signal) + .then(() => task.resolve()) + .catch((err) => task.reject(err)) + .finally(() => { + this.activeWorkers--; + this.runningTasks.delete(taskPromise); + this.processNext(); + }); + + this.runningTasks.add(taskPromise); + + // Check if we can start more tasks in parallel. + this.processNext(); + } + + private dequeueHighestPriority(): QueuedTask | null { + // Priority order: High > Normal > Low + for (const priority of [TickerTaskPriority.High, TickerTaskPriority.Normal, TickerTaskPriority.Low]) { + const queue = this.queues.get(priority)!; + if (queue.length > 0) { + return queue.shift()!; + } + } + return null; + } + + private async executeLongRunning(work: (signal: AbortSignal) => Promise): Promise { + this.longRunningCount++; + const ac = new AbortController(); + + const taskPromise = work(ac.signal).finally(() => { + this.longRunningCount--; + this.runningTasks.delete(taskPromise); + }); + + this.runningTasks.add(taskPromise); + return taskPromise; + } +} diff --git a/hub/sdks/node/tsconfig.json b/hub/sdks/node/tsconfig.json new file mode 100644 index 00000000..a5c9a513 --- /dev/null +++ b/hub/sdks/node/tsconfig.json @@ -0,0 +1,20 @@ +{ + "compilerOptions": { + "target": "ES2022", + "module": "commonjs", + "lib": ["ES2022"], + "outDir": "./dist", + "rootDir": "./src", + "strict": true, + "esModuleInterop": true, + "skipLibCheck": true, + "forceConsistentCasingInFileNames": true, + "resolveJsonModule": true, + "declaration": true, + "declarationMap": true, + "sourceMap": true, + "moduleResolution": "node" + }, + "include": ["src/**/*"], + "exclude": ["node_modules", "dist"] +} diff --git a/src/src.sln b/src/src.sln index faaf8a69..b79e8bd4 100644 --- a/src/src.sln +++ b/src/src.sln @@ -10,11 +10,11 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ", "TickerQ\TickerQ. EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.SourceGenerator", "TickerQ.SourceGenerator\TickerQ.SourceGenerator.csproj", "{A3A3A9E3-C853-8C16-87A7-2829FAC084DF}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.RemoteExecutor", "TickerQ.RemoteExecutor\TickerQ.RemoteExecutor.csproj", "{3467E6BF-D4A0-E969-6FC2-3113EA08E567}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.RemoteExecutor", "..\hub\remoteExecutor\TickerQ.RemoteExecutor\TickerQ.RemoteExecutor.csproj", "{3467E6BF-D4A0-E969-6FC2-3113EA08E567}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.EntityFrameworkCore", "TickerQ.EntityFrameworkCore\TickerQ.EntityFrameworkCore.csproj", "{63A97B66-4163-9B2B-9DB4-1CD235095817}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.SDK", "TickerQ.SDK\TickerQ.SDK.csproj", "{0ECE4EF0-96D0-4E9B-53FC-BBE86F65437F}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.SDK", "..\hub\sdks\dotnet\TickerQ.SDK\TickerQ.SDK.csproj", "{0ECE4EF0-96D0-4E9B-53FC-BBE86F65437F}" EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TickerQ.Caching.StackExchangeRedis", "TickerQ.Caching.StackExchangeRedis\TickerQ.Caching.StackExchangeRedis.csproj", "{E1B5AE18-2847-D83A-2F51-BC0E86571A9F}" EndProject