Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/qvac-lib-inference-addon-cpp/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## [1.1.3] - 2026-03-18
- Add native job IDs to queued addon events so JS callbacks can distinguish late cancel/error delivery from newer accepted jobs.
- Extend JS callback delivery with a trailing native `jobId` argument while keeping existing 4-argument handlers compatible.
- Make shared `cancel(handle, jobId)` honor the requested job ID while remaining backward compatible for existing callers that omit it.
- Add addon-cpp regression coverage for late cancel ownership and stale cancel isolation.

## [1.1.2] - 2026-02-20
Reduce noise from logs, macro for compile-time enabling of debug logs.

Expand Down
2 changes: 1 addition & 1 deletion packages/qvac-lib-inference-addon-cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if(BUILD_TESTING)
endif()

project(qvac-lib-inference-addon-cpp
VERSION 1.1.2
VERSION 1.1.3
LANGUAGES CXX)

find_path(VCPKG_INSTALLED_PATH share/qvac-lint-cpp/.clang-format REQUIRED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,18 @@ class ProcessingSync {
};

class JobRunner {
struct PendingJob {
JobId jobId;
std::any input;
};

std::shared_ptr<OutputQueue> outputQueue_;
model::IModel* const model_;
model::IModelCancel* const modelCancel_;
mutable std::timed_mutex mtx_;
mutable std::condition_variable_any processCv_;
std::optional<std::any> job_;
std::optional<PendingJob> job_;
JobId nextJobId_{1};
mutable std::thread processingThread_;
mutable std::atomic_bool running_ = false;
mutable std::atomic_bool ready_ = false;
Expand All @@ -65,6 +71,7 @@ class JobRunner {
void process() {
while (running_) {
std::unique_lock lock(mtx_);
std::optional<PendingJob> currentJob;

try {
// Signal that thread is ready for a new job
Expand All @@ -79,25 +86,31 @@ class JobRunner {
// Acquire processing while holding the main `lock` for atomicity.
ready_ = false;
processingSync_.setActive(true);
currentJob = std::move(*job_);

// Unlock main lock to ensure cancel() can acquire without blocking
lock.unlock();

std::any output = model_->process(job_.value());
std::any output = model_->process(currentJob->input);

// Make sure to reset job before queue result. Client might
// be waiting to queue a new job as soon as current is ended.
finalizeJob(lock);

outputQueue_->queueResult(std::move(output));
outputQueue_->queueJobEnded();
outputQueue_->queueResult(currentJob->jobId, std::move(output));
outputQueue_->queueJobEnded(currentJob->jobId);
} catch (const std::exception& e) {
finalizeJob(lock);
outputQueue_->queueException(e);
if (currentJob.has_value()) {
outputQueue_->queueException(currentJob->jobId, e);
}
} catch (...) {
finalizeJob(lock);
outputQueue_->queueException(
std::runtime_error("Unknown exception in processing loop"));
if (currentJob.has_value()) {
outputQueue_->queueException(
currentJob->jobId,
std::runtime_error("Unknown exception in processing loop"));
}
}
}
}
Expand Down Expand Up @@ -144,26 +157,29 @@ class JobRunner {
// Return a boolean instead.
return false;
}
job_ = std::move(input);
job_ = PendingJob{nextJobId_++, std::move(input)};
lock.unlock();
processCv_.notify_one();
return true;
}

void cancel() {
void cancel(std::optional<JobId> jobId = std::nullopt) {
std::scoped_lock lock{mtx_};
if (modelCancel_ == nullptr) {
QLOG(logger::Priority::WARNING, "Model does not support cancellation");
return;
}
if (job_.has_value()) {
if (job_.has_value() &&
(!jobId.has_value() || job_->jobId == jobId.value())) {
const auto activeJobId = job_->jobId;
modelCancel_->cancel();
processingSync_.waitInactive();
job_.reset();
if (ready_.load()) {
// If the worker has not taken the job yet (ready_ == true, still in
// wait), it will never run queueJobEnded. Signal finished now.
outputQueue_->queueException(std::runtime_error("Job cancelled"));
outputQueue_->queueException(
activeJobId, std::runtime_error("Job cancelled"));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class JsInterface {
-> js_value_t* try {
JsArgsParser argsParser(env, info);
auto& instance = getInstance(env, argsParser.get(0, "instance"));
return instance.cancelJob();
return instance.cancelJob(argsParser.getIntegralOptional<JobId>(1));
}
JSCATCH
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ class AddonCpp {

/// @returns False if the job cannot be run (e.g. because a job is already set or being processed)
bool runJob(std::any input) { return jobRunner_.runJob(std::move(input)); }
void cancelJob() { jobRunner_.cancel(); }
void cancelJob(std::optional<JobId> jobId = std::nullopt) {
jobRunner_.cancel(jobId);
}

const std::reference_wrapper<model::IModel> model;
model::IModelAsyncLoad* const asyncLoad;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ class AddonJs {
* @return JavaScript Promise that resolves when cancellation completes
* @note This is a non-blocking operation that returns a future/promise
*/
js_value_t* cancelJob() {
js_value_t* cancelJob(std::optional<JobId> jobId = std::nullopt) {
return js::JsAsyncTask::run(
env_, [addonCppRef = addonCpp]() { addonCppRef->cancelJob(); });
env_,
[addonCppRef = addonCpp, jobId]() { addonCppRef->cancelJob(jobId); });
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ class OutputCallBackCpp : public OutputCallBackInterface {
/**
* @brief Process output events using handlers
*/
void processEvent(const std::any& output) {
if (!output.has_value()) {
void processEvent(const QueuedOutputEvent& outputEvent) {
if (!outputEvent.payload.has_value()) {
// e.g. JobStarted events don't have data
return;
}

try {
out_handl::OutputHandlerInterface<void>& handler =
outputHandlers_.get(output);
handler.handleOutput(output);
outputHandlers_.get(outputEvent.payload);
handler.handleOutput(outputEvent.payload);
} catch (const std::exception& e) {
QLOG(
logger::Priority::ERROR,
Expand All @@ -97,7 +97,8 @@ class OutputCallBackCpp : public OutputCallBackInterface {
}

while (outputQueue_ != nullptr && !shouldStop_.load()) {
std::vector<std::any> outputQueue = std::move(outputQueue_->clear());
std::vector<QueuedOutputEvent> outputQueue =
std::move(outputQueue_->clear());
lock.unlock();

for (size_t i = 0; !shouldStop_.load() && i < outputQueue.size(); i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,38 @@ class OutputCallBackJs : public OutputCallBackInterface {
void stop() final { stopped_ = true; }

private:
js_value_t* createEventName(const QueuedOutputEvent& outputEvent) {
switch (outputEvent.eventKind) {
case OutputEventKind::Output:
return js::String::create(env_, "Output");
case OutputEventKind::JobEnded:
return js::String::create(env_, "JobEnded");
case OutputEventKind::Error:
return js::String::create(env_, "Error");
}

return js::String::create(env_, "Output");
}

/**
* @brief Creates JavaScript parameters for output events using handlers
* @returns Pair of JavaScript values for output data and error
*/
std::pair<js_value_t*, js_value_t*>
createEventParams(const std::any& output) {
if (!output.has_value()) {
createEventParams(const QueuedOutputEvent& outputEvent) {
if (!outputEvent.payload.has_value()) {
// e.g. JobStarted events don't have data
return {js::Undefined::create(env_), js::Undefined::create(env_)};
}

out_handl::JsOutputHandlerInterface& handler = outputHandlers_.get(output);
out_handl::JsOutputHandlerInterface& handler =
outputHandlers_.get(outputEvent.payload);
handler.setEnv(env_);
js_value_t* handlerResult = handler.handleOutput(output);
js_value_t* handlerResult = handler.handleOutput(outputEvent.payload);

// For Error events, put handler result in error parameter (second)
// For other events, put handler result in output parameter (first)
if (output.type() == typeid(Output::Error)) {
if (outputEvent.eventKind == OutputEventKind::Error) {
return {js::Undefined::create(env_), handlerResult};
} else {
return {handlerResult, js::Undefined::create(env_)};
Expand All @@ -116,15 +130,18 @@ class OutputCallBackJs : public OutputCallBackInterface {
* outputCbParameters[1] = Event string
* outputCbParameters[2] = Output data
* outputCbParameters[3] = Error data
* outputCbParameters[4] = Native job ID
*/
void createOutputCbParams(
js_value_t* jsHandle, const std::any& output,
js_value_t* jsHandle, const QueuedOutputEvent& outputEvent,
js_value_t** outputCbParameters) {
outputCbParameters[0] = jsHandle;
outputCbParameters[1] = js::String::create(env_, output.type().name());
outputCbParameters[1] = createEventName(outputEvent);

std::tie(outputCbParameters[2], outputCbParameters[3]) =
createEventParams(output);
createEventParams(outputEvent);
outputCbParameters[4] =
js::Number::create(env_, static_cast<double>(outputEvent.jobId));
}

/**
Expand All @@ -146,7 +163,7 @@ class OutputCallBackJs : public OutputCallBackInterface {
js_value_t* jsHandle;
JS(js_get_reference_value(
outputCallBackJs.env_, outputCallBackJs.jsHandle_, &jsHandle));
std::vector<std::any> outputQueue;
std::vector<QueuedOutputEvent> outputQueue;
{
std::scoped_lock lk{outputCallBackJs.mtx_};
outputQueue = std::move(outputCallBackJs.outputQueue_->clear());
Expand All @@ -159,7 +176,7 @@ class OutputCallBackJs : public OutputCallBackInterface {
utils::onExit([env = outputCallBackJs.env_, innerScope]() {
js_close_handle_scope(env, innerScope);
});
static constexpr auto outputCbParametersCount = 4;
static constexpr auto outputCbParametersCount = 5;
js_value_t* outputCbParameters[outputCbParametersCount];
outputCallBackJs.createOutputCbParams(
jsHandle, outputQueue[i], outputCbParameters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,20 @@

namespace qvac_lib_inference_addon_cpp {

using JobId = uint64_t;

enum class OutputEventKind {
Output,
JobEnded,
Error
};

struct QueuedOutputEvent {
JobId jobId;
OutputEventKind eventKind;
std::any payload;
};

namespace Output {
struct LogMsg : std::string {
using std::string::string;
Expand All @@ -27,14 +41,18 @@ struct Error : std::string {

class OutputQueue {
std::mutex mtx_;
std::vector<std::any> outputQueue_;
std::vector<QueuedOutputEvent> outputQueue_;

const model::IModel& model_;
OutputCallBackInterface& outputCallback_;

void queueOutput(std::any&& output) {
void queueOutput(JobId jobId, OutputEventKind eventKind, std::any&& output) {
std::scoped_lock lk{mtx_};
outputQueue_.emplace_back(std::move(output));
outputQueue_.emplace_back(QueuedOutputEvent{
jobId,
eventKind,
std::move(output),
});
outputCallback_.notify();
}

Expand All @@ -46,24 +64,26 @@ class OutputQueue {
~OutputQueue() = default;

/// @brief Returns the current output queue and clears the internal queue.
std::vector<std::any> clear() {
std::vector<QueuedOutputEvent> clear() {
std::scoped_lock lk{mtx_};
auto result = std::move(outputQueue_);
outputQueue_ = std::vector<std::any>();
outputQueue_ = std::vector<QueuedOutputEvent>();
return result;
}

void queueJobEnded() { return queueOutput(model_.runtimeStats()); }
void queueJobEnded(JobId jobId) {
return queueOutput(jobId, OutputEventKind::JobEnded, model_.runtimeStats());
}

void queueResult(std::any&& output) {
void queueResult(JobId jobId, std::any&& output) {
QLOG_DEBUG(
std::string("[OutputQueue] queueResult called with type: ") +
output.type().name());
queueOutput(std::move(output));
queueOutput(jobId, OutputEventKind::Output, std::move(output));
}

void queueException(const std::exception& exception) {
queueOutput(Output::Error{exception});
void queueException(JobId jobId, const std::exception& exception) {
queueOutput(jobId, OutputEventKind::Error, Output::Error{exception});
}
};
} // namespace qvac_lib_inference_addon_cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ TEST(CppOutputHandlerTest, OutputCallbackCppWithCustomStringHandler) {
"Hello from OutputCallbackCpp!", "Second message", "Third message"};

for (size_t i = 0; i < testStrings.size(); ++i) {
outputQueue->queueResult(std::any(testStrings[i]));
outputQueue->queueResult(static_cast<JobId>(i + 1), std::any(testStrings[i]));
}

// Pop items from the queue with timeout - no need for manual sleep
Expand Down
Loading
Loading