Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: ptr queue to generic queue #4140

Merged
merged 2 commits into from
Sep 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions utl/flatbuffer/vw_to_flat.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,8 @@ void to_flat::convert_txt_to_flat(VW::workspace& all)
MultiExampleBuilder multi_ex_builder;
ExampleBuilder ex_builder;

VW::example* ae = all.example_parser->ready_parsed_examples.pop();
VW::example* ae = nullptr;
all.example_parser->ready_parsed_examples.try_pop(ae);

while (ae != nullptr && !ae->end_pass)
{
Expand Down Expand Up @@ -454,7 +455,8 @@ void to_flat::convert_txt_to_flat(VW::workspace& all)
ex_builder.clear();
_multi_ex_index++;
_examples++;
ae = all.example_parser->ready_parsed_examples.pop();
ae = nullptr;
all.example_parser->ready_parsed_examples.try_pop(ae);
continue;
}
else
Expand All @@ -472,7 +474,8 @@ void to_flat::convert_txt_to_flat(VW::workspace& all)

write_to_file(collection, all.l->is_multiline(), multi_ex_builder, ex_builder, outfile);

ae = all.example_parser->ready_parsed_examples.pop();
ae = nullptr;
all.example_parser->ready_parsed_examples.try_pop(ae);
}

if (collection && _collection_count > 0)
Expand Down
2 changes: 1 addition & 1 deletion vowpalwabbit/core/include/vw/core/parser.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ struct parser
std::vector<VW::string_view> words;

VW::object_pool<VW::example> example_pool;
VW::ptr_queue<VW::example> ready_parsed_examples;
VW::thread_safe_queue<VW::example*> ready_parsed_examples;

io_buf input; // Input source(s)

Expand Down
18 changes: 9 additions & 9 deletions vowpalwabbit/core/include/vw/core/queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,30 +23,30 @@
namespace VW
{
template <typename T>
class ptr_queue
class thread_safe_queue
{
public:
ptr_queue(size_t max_size) : max_size(max_size) {}
thread_safe_queue(size_t max_size) : max_size(max_size) {}

T* pop()
bool try_pop(T& item)
{
std::unique_lock<std::mutex> lock(mut);
while (object_queue.size() == 0 && !done) { is_not_empty.wait(lock); }

if (done && object_queue.size() == 0) { return nullptr; }
if (done && object_queue.size() == 0) { return false; }

auto item = object_queue.front();
item = std::move(object_queue.front());
object_queue.pop();

is_not_full.notify_all();
return item;
return true;
}

void push(T* item)
void push(T item)
{
std::unique_lock<std::mutex> lock(mut);
while (object_queue.size() == max_size) { is_not_full.wait(lock); }
object_queue.push(item);
object_queue.push(std::move(item));

is_not_empty.notify_all();
}
Expand All @@ -69,7 +69,7 @@ class ptr_queue

private:
size_t max_size;
std::queue<T*> object_queue;
std::queue<T> object_queue;
mutable std::mutex mut;

volatile bool done = false;
Expand Down
19 changes: 14 additions & 5 deletions vowpalwabbit/core/src/parser.cc
Original file line number Diff line number Diff line change
Expand Up @@ -913,14 +913,19 @@ void finish_example(VW::workspace& all, example& ec)

void thread_dispatch(VW::workspace& all, const VW::multi_ex& examples)
{
for (auto example : examples) { all.example_parser->ready_parsed_examples.push(example); }
for (auto* example : examples) { all.example_parser->ready_parsed_examples.push(example); }
}

void main_parse_loop(VW::workspace* all) { parse_dispatch(*all, thread_dispatch); }

namespace VW
{
example* get_example(parser* p) { return p->ready_parsed_examples.pop(); }
example* get_example(parser* p)
{
example* ex = nullptr;
p->ready_parsed_examples.try_pop(ex);
return ex;
}

float get_topic_prediction(example* ec, size_t i) { return ec->pred.scalars[i]; }

Expand Down Expand Up @@ -980,9 +985,13 @@ void free_parser(VW::workspace& all)

while (all.example_parser->ready_parsed_examples.size() > 0)
{
auto* current = all.example_parser->ready_parsed_examples.pop();
// this function also handles examples that were not from the pool.
VW::finish_example(all, *current);
VW::example* current = nullptr;
all.example_parser->ready_parsed_examples.try_pop(current);
if (current != nullptr)
{
// this function also handles examples that were not from the pool.
VW::finish_example(all, *current);
}
}

// There should be no examples in flight at this point.
Expand Down