diff --git a/packages/qvac-lib-inference-addon-cpp/CHANGELOG.md b/packages/qvac-lib-inference-addon-cpp/CHANGELOG.md index c504c8cf00..25e0c04a8e 100644 --- a/packages/qvac-lib-inference-addon-cpp/CHANGELOG.md +++ b/packages/qvac-lib-inference-addon-cpp/CHANGELOG.md @@ -1,5 +1,11 @@ # Changelog +## [1.1.3] - 2026-03-18 +- Add native job IDs to queued addon events so JS callbacks can distinguish late cancel/error delivery from newer accepted jobs. +- Extend JS callback delivery with a trailing native `jobId` argument while keeping existing 4-argument handlers compatible. +- Make shared `cancel(handle, jobId)` honor the requested job ID while remaining backward compatible for existing callers that omit it. +- Add addon-cpp regression coverage for late cancel ownership and stale cancel isolation. + ## [1.1.2] - 2026-02-20 Reduce noise from logs, macro for compile-time enabling of debug logs. diff --git a/packages/qvac-lib-inference-addon-cpp/CMakeLists.txt b/packages/qvac-lib-inference-addon-cpp/CMakeLists.txt index ff54cec513..7c50e8aebb 100644 --- a/packages/qvac-lib-inference-addon-cpp/CMakeLists.txt +++ b/packages/qvac-lib-inference-addon-cpp/CMakeLists.txt @@ -6,7 +6,7 @@ if(BUILD_TESTING) endif() project(qvac-lib-inference-addon-cpp - VERSION 1.1.2 + VERSION 1.1.3 LANGUAGES CXX) find_path(VCPKG_INSTALLED_PATH share/qvac-lint-cpp/.clang-format REQUIRED) diff --git a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/JobRunner.hpp b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/JobRunner.hpp index 854f8de5c6..078e079319 100644 --- a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/JobRunner.hpp +++ b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/JobRunner.hpp @@ -43,12 +43,18 @@ class ProcessingSync { }; class JobRunner { + struct PendingJob { + JobId jobId; + std::any input; + }; + std::shared_ptr outputQueue_; model::IModel* const model_; model::IModelCancel* const modelCancel_; mutable std::timed_mutex mtx_; mutable std::condition_variable_any processCv_; - std::optional job_; + std::optional job_; + JobId nextJobId_{1}; mutable std::thread processingThread_; mutable std::atomic_bool running_ = false; mutable std::atomic_bool ready_ = false; @@ -65,6 +71,7 @@ class JobRunner { void process() { while (running_) { std::unique_lock lock(mtx_); + std::optional currentJob; try { // Signal that thread is ready for a new job @@ -79,25 +86,31 @@ class JobRunner { // Acquire processing while holding the main `lock` for atomicity. ready_ = false; processingSync_.setActive(true); + currentJob = std::move(*job_); // Unlock main lock to ensure cancel() can acquire without blocking lock.unlock(); - std::any output = model_->process(job_.value()); + std::any output = model_->process(currentJob->input); // Make sure to reset job before queue result. Client might // be waiting to queue a new job as soon as current is ended. finalizeJob(lock); - outputQueue_->queueResult(std::move(output)); - outputQueue_->queueJobEnded(); + outputQueue_->queueResult(currentJob->jobId, std::move(output)); + outputQueue_->queueJobEnded(currentJob->jobId); } catch (const std::exception& e) { finalizeJob(lock); - outputQueue_->queueException(e); + if (currentJob.has_value()) { + outputQueue_->queueException(currentJob->jobId, e); + } } catch (...) { finalizeJob(lock); - outputQueue_->queueException( - std::runtime_error("Unknown exception in processing loop")); + if (currentJob.has_value()) { + outputQueue_->queueException( + currentJob->jobId, + std::runtime_error("Unknown exception in processing loop")); + } } } } @@ -144,26 +157,29 @@ class JobRunner { // Return a boolean instead. return false; } - job_ = std::move(input); + job_ = PendingJob{nextJobId_++, std::move(input)}; lock.unlock(); processCv_.notify_one(); return true; } - void cancel() { + void cancel(std::optional jobId = std::nullopt) { std::scoped_lock lock{mtx_}; if (modelCancel_ == nullptr) { QLOG(logger::Priority::WARNING, "Model does not support cancellation"); return; } - if (job_.has_value()) { + if (job_.has_value() && + (!jobId.has_value() || job_->jobId == jobId.value())) { + const auto activeJobId = job_->jobId; modelCancel_->cancel(); processingSync_.waitInactive(); job_.reset(); if (ready_.load()) { // If the worker has not taken the job yet (ready_ == true, still in // wait), it will never run queueJobEnded. Signal finished now. - outputQueue_->queueException(std::runtime_error("Job cancelled")); + outputQueue_->queueException( + activeJobId, std::runtime_error("Job cancelled")); } } } diff --git a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/JsInterface.hpp b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/JsInterface.hpp index ba0ff92eb5..613f18ccb9 100644 --- a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/JsInterface.hpp +++ b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/JsInterface.hpp @@ -279,7 +279,7 @@ class JsInterface { -> js_value_t* try { JsArgsParser argsParser(env, info); auto& instance = getInstance(env, argsParser.get(0, "instance")); - return instance.cancelJob(); + return instance.cancelJob(argsParser.getIntegralOptional(1)); } JSCATCH }; diff --git a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/addon/AddonCpp.hpp b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/addon/AddonCpp.hpp index 938b6a1c73..bf83487650 100644 --- a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/addon/AddonCpp.hpp +++ b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/addon/AddonCpp.hpp @@ -59,7 +59,9 @@ class AddonCpp { /// @returns False if the job cannot be run (e.g. because a job is already set or being processed) bool runJob(std::any input) { return jobRunner_.runJob(std::move(input)); } - void cancelJob() { jobRunner_.cancel(); } + void cancelJob(std::optional jobId = std::nullopt) { + jobRunner_.cancel(jobId); + } const std::reference_wrapper model; model::IModelAsyncLoad* const asyncLoad; diff --git a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/addon/AddonJs.hpp b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/addon/AddonJs.hpp index bd2f7255ec..2640bfffb2 100644 --- a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/addon/AddonJs.hpp +++ b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/addon/AddonJs.hpp @@ -42,9 +42,10 @@ class AddonJs { * @return JavaScript Promise that resolves when cancellation completes * @note This is a non-blocking operation that returns a future/promise */ - js_value_t* cancelJob() { + js_value_t* cancelJob(std::optional jobId = std::nullopt) { return js::JsAsyncTask::run( - env_, [addonCppRef = addonCpp]() { addonCppRef->cancelJob(); }); + env_, + [addonCppRef = addonCpp, jobId]() { addonCppRef->cancelJob(jobId); }); } /** diff --git a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputCallbackCpp.hpp b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputCallbackCpp.hpp index 1ee2416a12..4129fb241b 100644 --- a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputCallbackCpp.hpp +++ b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputCallbackCpp.hpp @@ -64,16 +64,16 @@ class OutputCallBackCpp : public OutputCallBackInterface { /** * @brief Process output events using handlers */ - void processEvent(const std::any& output) { - if (!output.has_value()) { + void processEvent(const QueuedOutputEvent& outputEvent) { + if (!outputEvent.payload.has_value()) { // e.g. JobStarted events don't have data return; } try { out_handl::OutputHandlerInterface& handler = - outputHandlers_.get(output); - handler.handleOutput(output); + outputHandlers_.get(outputEvent.payload); + handler.handleOutput(outputEvent.payload); } catch (const std::exception& e) { QLOG( logger::Priority::ERROR, @@ -97,7 +97,8 @@ class OutputCallBackCpp : public OutputCallBackInterface { } while (outputQueue_ != nullptr && !shouldStop_.load()) { - std::vector outputQueue = std::move(outputQueue_->clear()); + std::vector outputQueue = + std::move(outputQueue_->clear()); lock.unlock(); for (size_t i = 0; !shouldStop_.load() && i < outputQueue.size(); i++) { diff --git a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputCallbackJs.hpp b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputCallbackJs.hpp index f078560b77..4d7a127584 100644 --- a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputCallbackJs.hpp +++ b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputCallbackJs.hpp @@ -86,24 +86,38 @@ class OutputCallBackJs : public OutputCallBackInterface { void stop() final { stopped_ = true; } private: + js_value_t* createEventName(const QueuedOutputEvent& outputEvent) { + switch (outputEvent.eventKind) { + case OutputEventKind::Output: + return js::String::create(env_, "Output"); + case OutputEventKind::JobEnded: + return js::String::create(env_, "JobEnded"); + case OutputEventKind::Error: + return js::String::create(env_, "Error"); + } + + return js::String::create(env_, "Output"); + } + /** * @brief Creates JavaScript parameters for output events using handlers * @returns Pair of JavaScript values for output data and error */ std::pair - createEventParams(const std::any& output) { - if (!output.has_value()) { + createEventParams(const QueuedOutputEvent& outputEvent) { + if (!outputEvent.payload.has_value()) { // e.g. JobStarted events don't have data return {js::Undefined::create(env_), js::Undefined::create(env_)}; } - out_handl::JsOutputHandlerInterface& handler = outputHandlers_.get(output); + out_handl::JsOutputHandlerInterface& handler = + outputHandlers_.get(outputEvent.payload); handler.setEnv(env_); - js_value_t* handlerResult = handler.handleOutput(output); + js_value_t* handlerResult = handler.handleOutput(outputEvent.payload); // For Error events, put handler result in error parameter (second) // For other events, put handler result in output parameter (first) - if (output.type() == typeid(Output::Error)) { + if (outputEvent.eventKind == OutputEventKind::Error) { return {js::Undefined::create(env_), handlerResult}; } else { return {handlerResult, js::Undefined::create(env_)}; @@ -116,15 +130,18 @@ class OutputCallBackJs : public OutputCallBackInterface { * outputCbParameters[1] = Event string * outputCbParameters[2] = Output data * outputCbParameters[3] = Error data + * outputCbParameters[4] = Native job ID */ void createOutputCbParams( - js_value_t* jsHandle, const std::any& output, + js_value_t* jsHandle, const QueuedOutputEvent& outputEvent, js_value_t** outputCbParameters) { outputCbParameters[0] = jsHandle; - outputCbParameters[1] = js::String::create(env_, output.type().name()); + outputCbParameters[1] = createEventName(outputEvent); std::tie(outputCbParameters[2], outputCbParameters[3]) = - createEventParams(output); + createEventParams(outputEvent); + outputCbParameters[4] = + js::Number::create(env_, static_cast(outputEvent.jobId)); } /** @@ -146,7 +163,7 @@ class OutputCallBackJs : public OutputCallBackInterface { js_value_t* jsHandle; JS(js_get_reference_value( outputCallBackJs.env_, outputCallBackJs.jsHandle_, &jsHandle)); - std::vector outputQueue; + std::vector outputQueue; { std::scoped_lock lk{outputCallBackJs.mtx_}; outputQueue = std::move(outputCallBackJs.outputQueue_->clear()); @@ -159,7 +176,7 @@ class OutputCallBackJs : public OutputCallBackInterface { utils::onExit([env = outputCallBackJs.env_, innerScope]() { js_close_handle_scope(env, innerScope); }); - static constexpr auto outputCbParametersCount = 4; + static constexpr auto outputCbParametersCount = 5; js_value_t* outputCbParameters[outputCbParametersCount]; outputCallBackJs.createOutputCbParams( jsHandle, outputQueue[i], outputCbParameters); diff --git a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputQueue.hpp b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputQueue.hpp index 6422424076..48374712ae 100644 --- a/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputQueue.hpp +++ b/packages/qvac-lib-inference-addon-cpp/src/qvac-lib-inference-addon-cpp/queue/OutputQueue.hpp @@ -15,6 +15,20 @@ namespace qvac_lib_inference_addon_cpp { +using JobId = uint64_t; + +enum class OutputEventKind { + Output, + JobEnded, + Error +}; + +struct QueuedOutputEvent { + JobId jobId; + OutputEventKind eventKind; + std::any payload; +}; + namespace Output { struct LogMsg : std::string { using std::string::string; @@ -27,14 +41,18 @@ struct Error : std::string { class OutputQueue { std::mutex mtx_; - std::vector outputQueue_; + std::vector outputQueue_; const model::IModel& model_; OutputCallBackInterface& outputCallback_; - void queueOutput(std::any&& output) { + void queueOutput(JobId jobId, OutputEventKind eventKind, std::any&& output) { std::scoped_lock lk{mtx_}; - outputQueue_.emplace_back(std::move(output)); + outputQueue_.emplace_back(QueuedOutputEvent{ + jobId, + eventKind, + std::move(output), + }); outputCallback_.notify(); } @@ -46,24 +64,26 @@ class OutputQueue { ~OutputQueue() = default; /// @brief Returns the current output queue and clears the internal queue. - std::vector clear() { + std::vector clear() { std::scoped_lock lk{mtx_}; auto result = std::move(outputQueue_); - outputQueue_ = std::vector(); + outputQueue_ = std::vector(); return result; } - void queueJobEnded() { return queueOutput(model_.runtimeStats()); } + void queueJobEnded(JobId jobId) { + return queueOutput(jobId, OutputEventKind::JobEnded, model_.runtimeStats()); + } - void queueResult(std::any&& output) { + void queueResult(JobId jobId, std::any&& output) { QLOG_DEBUG( std::string("[OutputQueue] queueResult called with type: ") + output.type().name()); - queueOutput(std::move(output)); + queueOutput(jobId, OutputEventKind::Output, std::move(output)); } - void queueException(const std::exception& exception) { - queueOutput(Output::Error{exception}); + void queueException(JobId jobId, const std::exception& exception) { + queueOutput(jobId, OutputEventKind::Error, Output::Error{exception}); } }; } // namespace qvac_lib_inference_addon_cpp diff --git a/packages/qvac-lib-inference-addon-cpp/tests/cpp_output_handler_test.cpp b/packages/qvac-lib-inference-addon-cpp/tests/cpp_output_handler_test.cpp index 694b82f20a..943af9fdf0 100644 --- a/packages/qvac-lib-inference-addon-cpp/tests/cpp_output_handler_test.cpp +++ b/packages/qvac-lib-inference-addon-cpp/tests/cpp_output_handler_test.cpp @@ -98,7 +98,7 @@ TEST(CppOutputHandlerTest, OutputCallbackCppWithCustomStringHandler) { "Hello from OutputCallbackCpp!", "Second message", "Third message"}; for (size_t i = 0; i < testStrings.size(); ++i) { - outputQueue->queueResult(std::any(testStrings[i])); + outputQueue->queueResult(static_cast(i + 1), std::any(testStrings[i])); } // Pop items from the queue with timeout - no need for manual sleep diff --git a/packages/qvac-lib-inference-addon-cpp/tests/job_runner_test.cpp b/packages/qvac-lib-inference-addon-cpp/tests/job_runner_test.cpp index 177f7586b4..2840345834 100644 --- a/packages/qvac-lib-inference-addon-cpp/tests/job_runner_test.cpp +++ b/packages/qvac-lib-inference-addon-cpp/tests/job_runner_test.cpp @@ -207,7 +207,7 @@ TEST_F(JobRunnerTest, CancelBeforeJobThenRunNormally) { // Verify we got a result (not an error) bool found_result = false; for (const auto& output : outputs) { - if (output.type() == typeid(std::string)) { + if (output.payload.type() == typeid(std::string)) { found_result = true; } } @@ -420,8 +420,8 @@ TEST_F( // Check output queue for bad_optional_access errors auto outputs = outputQueue_->clear(); for (const auto& output : outputs) { - if (output.type() == typeid(Output::Error)) { - auto error = std::any_cast(output); + if (output.payload.type() == typeid(Output::Error)) { + auto error = std::any_cast(output.payload); if (error.find("bad_optional_access") != std::string::npos || error.find("optional") != std::string::npos) { FAIL() << "BUG DETECTED: bad_optional_access in output after " @@ -462,4 +462,78 @@ TEST_F(JobRunnerTest, MultipleCancelsInSequence) { SUCCEED(); } +TEST_F(JobRunnerTest, LateCancelEventsStayBoundToCancelledJob) { + model_ = std::make_unique(std::chrono::milliseconds{500}); + outputQueue_ = std::make_shared(*callback_, *model_); + jobRunner_ = + std::make_unique(outputQueue_, model_.get(), model_.get()); + jobRunner_->start(); + + EXPECT_TRUE(jobRunner_->runJob(std::string("job-1"))); + jobRunner_->cancel(1); + + EXPECT_TRUE(jobRunner_->runJob(std::string("job-2"))); + std::this_thread::sleep_for(std::chrono::milliseconds{700}); + + const auto outputs = outputQueue_->clear(); + ASSERT_GE(outputs.size(), 3U); + + bool sawJob1Cancel = false; + bool sawJob2Result = false; + bool sawJob2Ended = false; + for (const auto& output : outputs) { + if (output.jobId == 1 && output.payload.type() == typeid(Output::Error)) { + sawJob1Cancel = + std::any_cast(output.payload) == "Job cancelled"; + } + if (output.jobId == 2 && output.payload.type() == typeid(std::string)) { + sawJob2Result = std::any_cast(output.payload) == "result"; + } + if (output.jobId == 2 && output.payload.type() == typeid(RuntimeStats)) { + sawJob2Ended = true; + } + } + + EXPECT_TRUE(sawJob1Cancel); + EXPECT_TRUE(sawJob2Result); + EXPECT_TRUE(sawJob2Ended); +} + +TEST_F(JobRunnerTest, StaleCancelDoesNotClearNewerAcceptedJob) { + model_ = std::make_unique(std::chrono::milliseconds{150}); + outputQueue_ = std::make_shared(*callback_, *model_); + jobRunner_ = + std::make_unique(outputQueue_, model_.get(), model_.get()); + jobRunner_->start(); + + EXPECT_TRUE(jobRunner_->runJob(std::string("job-1"))); + std::this_thread::sleep_for(std::chrono::milliseconds{250}); + + EXPECT_TRUE(jobRunner_->runJob(std::string("job-2"))); + jobRunner_->cancel(1); + std::this_thread::sleep_for(std::chrono::milliseconds{250}); + + const auto outputs = outputQueue_->clear(); + + bool sawJob2Result = false; + bool sawJob2Ended = false; + bool sawWrongJob2Cancel = false; + for (const auto& output : outputs) { + if (output.jobId == 2 && output.payload.type() == typeid(std::string)) { + sawJob2Result = true; + } + if (output.jobId == 2 && output.payload.type() == typeid(RuntimeStats)) { + sawJob2Ended = true; + } + if (output.jobId == 2 && output.payload.type() == typeid(Output::Error)) { + sawWrongJob2Cancel = + std::any_cast(output.payload) == "Job cancelled"; + } + } + + EXPECT_TRUE(sawJob2Result); + EXPECT_TRUE(sawJob2Ended); + EXPECT_FALSE(sawWrongJob2Cancel); +} + } // namespace qvac_lib_inference_addon_cpp diff --git a/packages/qvac-lib-inference-addon-cpp/vcpkg.json b/packages/qvac-lib-inference-addon-cpp/vcpkg.json index 2d5f3aefcd..1f37c7ce78 100644 --- a/packages/qvac-lib-inference-addon-cpp/vcpkg.json +++ b/packages/qvac-lib-inference-addon-cpp/vcpkg.json @@ -1,6 +1,6 @@ { "name": "qvac-lib-inference-addon-cpp", - "version": "1.1.2", + "version": "1.1.3", "dependencies": [ { "name": "qvac-lint-cpp",