Skip to content

Commit a4bc3fd

Browse files
committed
src: add batched support callback in AsyncTSQueue
Enable AsyncTSQueue to accept both single-element (T&&, const T&) and batch (std::vector<T>&&, const std::vector<T>&) callbacks, using if constexpr with type traits for compile-time dispatch. Ensure correct argument forwarding and update tests to cover all supported callback signatures. PR-URL: #312 Reviewed-By: Juan José Arboleda <[email protected]> PR-URL: #359 Reviewed-By: Rafael Gonzaga <[email protected]>
1 parent 5eb93c4 commit a4bc3fd

File tree

2 files changed

+113
-29
lines changed

2 files changed

+113
-29
lines changed

src/nsolid/async_ts_queue.h

Lines changed: 53 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#include <memory>
1111
#include <functional>
1212
#include <tuple>
13+
#include <type_traits>
1314

1415
namespace node {
1516
namespace nsolid {
@@ -26,7 +27,6 @@ class AsyncTSQueue : public std::enable_shared_from_this<AsyncTSQueue<T>> {
2627
public:
2728
using SharedAsyncTSQueue = std::shared_ptr<AsyncTSQueue<T>>;
2829
using WeakAsyncTSQueue = std::weak_ptr<AsyncTSQueue<T>>;
29-
using ProcessCallback = std::function<void(T&&)>;
3030

3131
/**
3232
* Factory method to create and initialize an AsyncTSQueue
@@ -37,11 +37,8 @@ class AsyncTSQueue : public std::enable_shared_from_this<AsyncTSQueue<T>> {
3737
*/
3838
template<typename Cb, typename... Args>
3939
static SharedAsyncTSQueue create(uv_loop_t* loop, Cb&& cb, Args&&... args) {
40-
// Create a shared_ptr with the private constructor
4140
SharedAsyncTSQueue queue(new AsyncTSQueue<T>(
4241
loop, std::forward<Cb>(cb), std::forward<Args>(args)...));
43-
44-
// Initialize the queue and return it
4542
queue->initialize();
4643
return queue;
4744
}
@@ -79,33 +76,70 @@ class AsyncTSQueue : public std::enable_shared_from_this<AsyncTSQueue<T>> {
7976

8077
/**
8178
* Process all items in the queue
79+
*
80+
* Calls the appropriate callback based on the callback type (single or batch)
81+
* determined at compile time using if constexpr.
8282
*/
8383
void process() {
84-
process_single_items();
84+
process_callback_();
8585
}
8686

8787
private:
88+
// Callback support for both single-item and batch processing
89+
using ProcessCallback = std::function<void()>;
90+
// --- Type traits for Callback Type Detection ---
91+
template <typename Cb, typename... Extra>
92+
using is_batch_callback = std::disjunction<
93+
std::is_invocable<Cb, std::vector<T>&&, Extra...>,
94+
std::is_invocable<Cb, const std::vector<T>&, Extra...>
95+
>;
96+
template <typename Cb, typename... Extra>
97+
using is_single_callback = std::disjunction<
98+
std::is_invocable<Cb, T&&, Extra...>,
99+
std::is_invocable<Cb, const T&, Extra...>
100+
>;
101+
88102
/**
89103
* Constructor for AsyncTSQueue
90104
*
91-
* @param loop The UV loop to use for async notifications
92-
* @param callback The callback to process items
105+
* Uses if constexpr with type traits to select between single-item and batch
106+
* callback logic at compile time.
93107
*/
94108
template<typename Cb, typename... Args>
95109
AsyncTSQueue(uv_loop_t* loop, Cb&& cb, Args&&... args)
96-
: loop_(loop),
97-
async_handle_(new nsuv::ns_async()) {
98-
// Create a lambda that captures the callback and arguments by value
99-
// and forwards them when called
100-
process_callback_ = [cb = std::forward<Cb>(cb),
101-
args_tuple = std::make_tuple(
102-
std::forward<Args>(args)...)]
103-
(T&& item) mutable {
104-
// Apply the callback with the item and stored arguments
105-
std::apply([&cb, &item](auto&&... args) {
106-
cb(std::forward<T>(item), std::forward<decltype(args)>(args)...);
107-
}, args_tuple);
110+
: loop_(loop), async_handle_(new nsuv::ns_async()) {
111+
// Create a bound callback function
112+
auto bound_cb = [cb = std::forward<Cb>(cb),
113+
...args = std::forward<Args>(args)](auto&& first) mutable {
114+
std::invoke(cb, std::forward<decltype(first)>(first), args...);
108115
};
116+
if constexpr (is_batch_callback<Cb, Args...>::value) {
117+
// Batch callback: process all items at once
118+
process_callback_ = [this, bound_cb = bound_cb]() mutable {
119+
T item;
120+
size_t size = queue_.dequeue(item);
121+
if (size > 0) {
122+
std::vector<T> batch;
123+
batch.reserve(size + 1);
124+
batch.push_back(std::move(item));
125+
while (queue_.dequeue(item)) {
126+
batch.push_back(std::move(item));
127+
}
128+
bound_cb(std::move(batch));
129+
}
130+
};
131+
} else if constexpr (is_single_callback<Cb, Args...>::value) {
132+
process_callback_ = [this, bound_cb = bound_cb]() mutable {
133+
T item;
134+
while (queue_.dequeue(item)) {
135+
bound_cb(std::move(item));
136+
}
137+
};
138+
} else {
139+
static_assert(is_batch_callback<Cb, Args...>::value ||
140+
is_single_callback<Cb, Args...>::value,
141+
"AsyncTSQueue callback signature not supported");
142+
}
109143
}
110144

111145
/**
@@ -128,16 +162,6 @@ class AsyncTSQueue : public std::enable_shared_from_this<AsyncTSQueue<T>> {
128162
queue->process();
129163
}
130164

131-
/**
132-
* Process items one by one using the process_callback_
133-
*/
134-
void process_single_items() {
135-
T item;
136-
while (queue_.dequeue(item)) {
137-
process_callback_(std::move(item));
138-
}
139-
}
140-
141165
uv_loop_t* loop_;
142166
nsuv::ns_async* async_handle_;
143167
TSQueue<T> queue_;

test/cctest/test_nsolid_async_ts_queue.cc

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,66 @@ TEST_F(AsyncTSQueueTest, MultipleEnqueueOperations) {
234234
EXPECT_EQ(processed_items[3], 4);
235235
}
236236

237+
// Test batch callback with std::vector<T>&&
238+
TEST_F(AsyncTSQueueTest, BatchCallbackRvalueVector) {
239+
std::vector<int> batch_processed;
240+
int call_count = 0;
241+
auto queue = AsyncTSQueue<int>::create(
242+
loop_,
243+
[&batch_processed, &call_count](std::vector<int>&& batch) {
244+
++call_count;
245+
batch_processed = std::move(batch);
246+
});
247+
queue->enqueue(10);
248+
queue->enqueue(20);
249+
queue->enqueue(30);
250+
ProcessEvents();
251+
EXPECT_EQ(call_count, 1);
252+
ASSERT_EQ(batch_processed.size(), 3u);
253+
EXPECT_EQ(batch_processed[0], 10);
254+
EXPECT_EQ(batch_processed[1], 20);
255+
EXPECT_EQ(batch_processed[2], 30);
256+
}
257+
258+
// Test batch callback with extra argument
259+
TEST_F(AsyncTSQueueTest, BatchCallbackWithExtraArg) {
260+
std::vector<std::string> batch_processed;
261+
std::string context = "CTX";
262+
auto queue = AsyncTSQueue<std::string>::create(
263+
loop_,
264+
[&batch_processed](std::vector<std::string>&& batch,
265+
const std::string& ctx) {
266+
for (auto& item : batch) batch_processed.push_back(ctx + ":" + item);
267+
},
268+
std::cref(context));
269+
queue->enqueue("a");
270+
queue->enqueue("b");
271+
queue->enqueue("c");
272+
ProcessEvents();
273+
ASSERT_EQ(batch_processed.size(), 3u);
274+
EXPECT_EQ(batch_processed[0], "CTX:a");
275+
EXPECT_EQ(batch_processed[1], "CTX:b");
276+
EXPECT_EQ(batch_processed[2], "CTX:c");
277+
}
278+
279+
// Test batch callback with const std::vector<T>&
280+
TEST_F(AsyncTSQueueTest, BatchCallbackConstVector) {
281+
std::vector<int> batch_processed;
282+
auto queue = AsyncTSQueue<int>::create(
283+
loop_,
284+
[&batch_processed](const std::vector<int>& batch) {
285+
batch_processed = batch;
286+
});
287+
queue->enqueue(5);
288+
queue->enqueue(7);
289+
queue->enqueue(9);
290+
ProcessEvents();
291+
ASSERT_EQ(batch_processed.size(), 3u);
292+
EXPECT_EQ(batch_processed[0], 5);
293+
EXPECT_EQ(batch_processed[1], 7);
294+
EXPECT_EQ(batch_processed[2], 9);
295+
}
296+
237297
// Test with a complex data type
238298
struct TestData {
239299
int id;

0 commit comments

Comments
 (0)