Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for intra frame multithreading #1

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 24 additions & 8 deletions src/main/cpp/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ std::vector<char> read_file(const std::string &path) {
return out;
}

class error_message_handler : public kdu_core::kdu_message {
class error_message_handler : public kdu_core::kdu_thread_safe_message {
public:
void put_text(const char *msg) { std::cout << msg; }

Expand All @@ -52,7 +52,7 @@ class error_message_handler : public kdu_core::kdu_message {

static error_message_handler error_handler;

void run(int repetitions, const std::vector<char> &cs_buf, double &avg_time) {
void run(int repetitions, int num_fibers, const std::vector<char> &cs_buf, double &avg_time) {
kdu_compressed_source_buffered buffer((kdu_byte *)cs_buf.data(),
cs_buf.size());

Expand Down Expand Up @@ -116,19 +116,27 @@ void run(int repetitions, const std::vector<char> &cs_buf, double &avg_time) {
is_signed[0] = false;
}

// Construct multi-threaded processing environment
kdu_thread_env env;
env.create();
for (int t = 1; t < num_fibers; t++)
if (! env.add_thread())
throw std::runtime_error("Cannot allocate the requested number of fibers");

// Now for the decompression loop
kdu_stripe_decompressor d;
int stripe_heights[COMP_COUNT];

auto start = std::chrono::high_resolution_clock::now();

for (int i = 0; i < repetitions; i++) {
d.start(c, /* force_precise */ false,
/* want_fastest */ true);
/* want_fastest */ true,
/* multi-threading environment */ &env);
d.get_recommended_stripe_heights(8, max_stripe_height, stripe_heights, NULL);

bool more_samples = true;

while (more_samples) {
d.get_recommended_stripe_heights(8, max_stripe_height, stripe_heights, NULL);

if (is_planar) {
if (component_sz > 1) {
Expand All @@ -149,7 +157,7 @@ void run(int repetitions, const std::vector<char> &cs_buf, double &avg_time) {
}
}

d.reset();
d.finish(); // use this function because `env' has not been destroyed

buffer.seek(0);
c.restart(&buffer);
Expand All @@ -166,7 +174,9 @@ int main(int argc, char *argv[]) {

options.add_options()("r,repetitions", "Number of repetitions per thread",
cxxopts::value<int>()->default_value("100"))(
"t,threads", "Number of threads",
"t,threads", "Number of frames to process in parallel",
cxxopts::value<int>()->default_value("1"))(
"f,fibers", "Number of threads per frame",
cxxopts::value<int>()->default_value("1"))(
"codestream", "Path to input codestream", cxxopts::value<std::string>());

Expand All @@ -190,6 +200,12 @@ int main(int argc, char *argv[]) {
int repetitions = result["repetitions"].as<int>();

int num_threads = result["threads"].as<int>();
if (num_threads < 1)
throw std::runtime_error("Invalid number of threads");

int num_fibers = result["fibers"].as<int>();
if (num_fibers < 1)
throw std::runtime_error("Invalid number of fibers");

std::vector<double> avg_times(num_threads);

Expand All @@ -199,7 +215,7 @@ int main(int argc, char *argv[]) {

for (int i = 0; i < num_threads; i++) {
threads.push_back(
std::thread(run, repetitions, cs_buf, std::ref(avg_times[i])));
std::thread(run, repetitions, num_fibers, cs_buf, std::ref(avg_times[i])));
}

for (int i = 0; i < avg_times.size(); i++) {
Expand Down