From 483d0f2c293e0cb0cbcd86280a9fbf4e897d5690 Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Mon, 11 Oct 2021 20:27:02 -0700 Subject: [PATCH 1/7] fix `--help` --- src/capture.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/capture.cpp b/src/capture.cpp index a60bbc30f..dd6538697 100644 --- a/src/capture.cpp +++ b/src/capture.cpp @@ -977,7 +977,7 @@ const char *locale = DEFAULT_LOCALE; // The old names should be removed below in a future version. for (i=1 ; i <= argc - 1 ; i++) { - if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "-help") == 0 || strcmp(argv[i], "-help") == 0) + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "-help") == 0 || strcmp(argv[i], "--help") == 0) { help = 1; } From 4a3d2cc646a5e012194dbc45a20288ee84477025 Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Mon, 11 Oct 2021 18:23:14 -0700 Subject: [PATCH 2/7] Multithreaded startrail processing. Inspired by @sebmueller. This splits up the input images into batches of work more or less evenly divided across the number of processors. Two new flags are added: * -n / --nice to make the workers be nice. default = 10 * -m / --max-threads to control the number of worker threads. default = ncpus Allows me to process a night of startrails on my workstation in under 3.5s. On my pi3b+, it'll use between 350% - 395% CPU without making it feel slow ``` root@skycam:/home/allsky/src# time ./startrails -b 0.255 -d ../images/20210912/ -e jpg -o /tmp/junk.jpg -s 1280x960 Minimum: 0.0265446 maximum: 0.804403 mean: 0.143959 median: 0.137787 real 5m5.951s user 11m22.243s sys 1m5.347s root@skycam:/home/allsky/src# time ./startrails -b 0.255 -d ../images/20210912/ -e jpg -o /tmp/junk2.jpg -s 1280x960 --max-threads 1 Minimum: 0.0265446 maximum: 0.804403 mean: 0.143959 median: 0.137787 real 12m8.758s user 11m3.073s sys 0m54.200s ``` Fixes thomasjacquin/allsky#628 --- src/startrails.cpp | 291 ++++++++++++++++++++++++++++++++------------- 1 file changed, 208 insertions(+), 83 deletions(-) diff --git a/src/startrails.cpp b/src/startrails.cpp index 2ae8b1408..3464b3e48 100644 --- a/src/startrails.cpp +++ b/src/startrails.cpp @@ -3,17 +3,24 @@ // Based on script by Thomas Jacquin // SPDX-License-Identifier: MIT +using namespace std; + #include #include +#include +#include #include #include #include #include +#include #include +#include #include #include #include +#include #define KNRM "\x1B[0m" #define KRED "\x1B[31m" @@ -27,23 +34,155 @@ struct config_t { std::string img_src_dir, img_src_ext, dst_startrails; bool startrails_enabled; + int num_threads; + int nice_level; int img_width; int img_height; int verbose; double brightness_limit; } config; +std::mutex stdio_mutex; + void parse_args(int, char**, struct config_t*); void usage_and_exit(int); +void startrail_worker(int, // thread num + struct config_t*, // config + glob_t*, // file list + std::mutex*, // mutex + cv::Mat*, // statistics + cv::Mat* // accumulated +); + +void startrail_worker(int thread_num, + struct config_t* cf, + glob_t* files, + std::mutex* mtx, + cv::Mat* stats_ptr, + cv::Mat* main_accumulator) { + int start_num, end_num, batch_size, nchan = 0; + unsigned long nfiles = files->gl_pathc; + cv::Mat thread_accumulator; + + batch_size = nfiles / cf->num_threads; + start_num = thread_num * batch_size; + + // last thread has more work to do if the number of images isn't multiple of + // the number of threads + if ((thread_num + 1) == cf->num_threads) + end_num = nfiles - 1; + else + end_num = start_num + batch_size - 1; + + if (cf->verbose > 1) { + stdio_mutex.lock(); + fprintf(stderr, "thread %d/%d processing files %d-%d (%d/%lu)\n", + thread_num + 1, cf->num_threads, start_num, end_num, + end_num - start_num + 1, nfiles); + stdio_mutex.unlock(); + } + + for (int f = start_num; f <= end_num; f++) { + char* filename = files->gl_pathv[f]; + cv::Mat image = cv::imread(filename, cv::IMREAD_UNCHANGED); + filename = basename(filename); + if (!image.data) { + stdio_mutex.lock(); + fprintf(stderr, "Error reading file %s\n", filename); + stdio_mutex.unlock(); + continue; + } + + if (cf->img_height && cf->img_width && + (image.cols != cf->img_width || image.rows != cf->img_height)) { + if (cf->verbose) { + stdio_mutex.lock(); + fprintf(stderr, "skip %s size %dx%d != %dx%d\n", filename, image.cols, + image.cols, cf->img_width, cf->img_height); + stdio_mutex.unlock(); + } + continue; + } + + // first valid image sets the number of channels we expect + if (nchan == 0 && image.channels()) + nchan = image.channels(); + + cv::Scalar mean_scalar = cv::mean(image); + double image_mean; + switch (image.channels()) { + default: // mono case + image_mean = mean_scalar.val[0]; + break; + case 3: // for color choose maximum channel + case 4: + image_mean = + cv::max(mean_scalar[0], cv::max(mean_scalar[1], mean_scalar[2])); + break; + } + // Scale to 0-1 range + switch (image.depth()) { + case CV_8U: + image_mean /= 255.0; + break; + case CV_16U: + image_mean /= 65535.0; + break; + } + if (cf->verbose > 1) { + stdio_mutex.lock(); + fprintf(stderr, "[%d/%lu] %s %.3f\n", f + 1, nfiles, filename, + image_mean); + stdio_mutex.unlock(); + } + + // the matrix pointed to by stats_ptr has already been initialized to NAN + // so we just update the entry once the image is successfully loaded + stats_ptr->col(f) = image_mean; + + if (cf->startrails_enabled && image_mean <= cf->brightness_limit) { + if (image.channels() != nchan) { + if (cf->verbose) { + stdio_mutex.lock(); + fprintf(stderr, "repairing channel mismatch: %d != %d\n", + image.channels(), nchan); + stdio_mutex.unlock(); + } + if (image.channels() < nchan) + cv::cvtColor(image, image, cv::COLOR_GRAY2BGR, nchan); + else if (image.channels() > nchan) + cv::cvtColor(image, image, cv::COLOR_BGR2GRAY, nchan); + } + if (thread_accumulator.empty()) { + image.copyTo(thread_accumulator); + } else { + thread_accumulator = cv::max(thread_accumulator, image); + } + } + } + + if (cf->startrails_enabled) { + // skip unlucky threads that might have got only bad images + if (!thread_accumulator.empty()) { + mtx->lock(); + if (main_accumulator->empty()) { + thread_accumulator.copyTo(*main_accumulator); + } else { + *main_accumulator = cv::max(thread_accumulator, *main_accumulator); + } + mtx->unlock(); + } + } +} -//------------------------------------------------------------------------------------------------------- -//------------------------------------------------------------------------------------------------------- void parse_args(int argc, char** argv, struct config_t* cf) { - int c; + int c, tmp; cf->verbose = 0; cf->startrails_enabled = true; cf->img_height = cf->img_width = 0; cf->brightness_limit = 0.35; // not terrible in the city + cf->nice_level = 10; + cf->num_threads = std::thread::hardware_concurrency(); while (1) { // getopt loop int option_index = 0; @@ -51,6 +190,8 @@ void parse_args(int argc, char** argv, struct config_t* cf) { {"brightness", optional_argument, 0, 'b'}, {"directory", required_argument, 0, 'd'}, {"extension", required_argument, 0, 'e'}, + {"max-threads", required_argument, 0, 'm'}, + {"nice-level", required_argument, 0, 'n'}, {"output", required_argument, 0, 'o'}, {"image-size", required_argument, 0, 's'}, {"statistics", no_argument, 0, 'S'}, @@ -58,7 +199,8 @@ void parse_args(int argc, char** argv, struct config_t* cf) { {"help", no_argument, 0, 'h'}, {0, 0, 0, 0}}; - c = getopt_long(argc, argv, "hvsd:e:o:s:", long_options, &option_index); + c = getopt_long(argc, argv, "hvSb:d:e:m:n:o:s:", long_options, + &option_index); if (c == -1) break; switch (c) { // option switch @@ -93,6 +235,22 @@ void parse_args(int argc, char** argv, struct config_t* cf) { case 'e': cf->img_src_ext = optarg; break; + case 'm': + tmp = atoi(optarg); + if ((tmp >= 1) || (tmp < cf->num_threads)) + cf->num_threads = tmp; + break; + case 'n': + tmp = atoi(optarg); + if (PRIO_MIN > tmp) { + tmp = PRIO_MIN; + fprintf(stderr, "clamping scheduler priority to PRIO_MIN\n"); + } else if (PRIO_MAX < tmp) { + fprintf(stderr, "clamping scheduler priority to PRIO_MAX\n"); + tmp = PRIO_MAX; + } + cf->nice_level = atoi(optarg); + break; case 'o': cf->dst_startrails = optarg; break; @@ -104,7 +262,7 @@ void parse_args(int argc, char** argv, struct config_t* cf) { void usage_and_exit(int x) { std::cout << "Usage: startrails [-v] -d -e [-b -o " - " -s | -S]" + " | -s] [-m ] [-n ]" << std::endl; if (x) { std::cout << KRED @@ -125,6 +283,11 @@ void usage_and_exit(int x) { << std::endl; std::cout << "-e | --extension : filter images to just this extension" << std::endl; + std::cout << "-m | --max-threads : limit maximum number of processing " + "threads. (0 = nproc)" + << std::endl; + std::cout << "-n | --nice : nice(2) level of processing threads (10)" + << std::endl; std::cout << "-o | --output-file : output image filename" << std::endl; std::cout << "-s | --image-size x : restrict processed images to " "this size" @@ -143,6 +306,7 @@ void usage_and_exit(int x) { } int main(int argc, char* argv[]) { + int r; struct config_t config; int i; char* e; @@ -165,6 +329,12 @@ int main(int argc, char* argv[]) { if (!config.dst_startrails.empty() && config.brightness_limit < 0) usage_and_exit(3); + r = setpriority(PRIO_PROCESS, 0, config.nice_level); + if (r) { + config.nice_level = getpriority(PRIO_PROCESS, 0); + fprintf(stderr, "unable to set nice level: %s\n", strerror(errno)); + } + // Find files glob_t files; std::string wildcard = config.img_src_dir + "/*." + config.img_src_ext; @@ -175,101 +345,56 @@ int main(int argc, char* argv[]) { return 0; } + std::mutex accumulated_mutex; cv::Mat accumulated; - - // Create space for statistics cv::Mat stats; stats.create(1, files.gl_pathc, CV_64F); - int nchan = 0; - - for (size_t f = 0; f < files.gl_pathc; f++) { - cv::Mat image = cv::imread(files.gl_pathv[f], cv::IMREAD_UNCHANGED); - if (!image.data) { - if (config.verbose) - fprintf(stderr, "Error reading file %s\n", basename(files.gl_pathv[f])); - stats.col(f) = 1.0; // mark as invalid - continue; - } - - if (config.img_height && config.img_width && - (image.cols != config.img_width || image.rows != config.img_height)) { - if (config.verbose) - fprintf(stderr, "skipped %s - got size %dx%d, want %dx%d\n", - files.gl_pathv[f], image.cols, image.cols, config.img_width, - config.img_height); - continue; - } - - // first valid image sets the number of channels we expect - if (nchan == 0 && image.channels()) - nchan = image.channels(); - - cv::Scalar mean_scalar = cv::mean(image); - double mean; - switch (image.channels()) { - default: // mono case - mean = mean_scalar.val[0]; - break; - case 3: // for color choose maximum channel - case 4: - mean = cv::max(mean_scalar[0], cv::max(mean_scalar[1], mean_scalar[2])); - break; - } - // Scale to 0-1 range - switch (image.depth()) { - case CV_8U: - mean /= 255.0; - break; - case CV_16U: - mean /= 65535.0; - break; - } - if (config.verbose > 1) - std::cout << "[" << f + 1 << "/" << files.gl_pathc << "] " - << basename(files.gl_pathv[f]) << " " << mean << std::endl; + // initialize stats to NAN because some images might legitimately be 100% + // brightness if they're massively overexposed. They should be counted in + // the summary statistics. It is not entirely accurate to signal invalid + // images with 1.0 brightness since no image data was read. + stats = NAN; - stats.col(f) = mean; + std::vector threadpool; + for (int tid = 0; tid < config.num_threads; tid++) + threadpool.push_back(std::thread(startrail_worker, tid, &config, &files, + &accumulated_mutex, &stats, &accumulated)); - if (config.startrails_enabled && mean <= config.brightness_limit) { - if (image.channels() != nchan) { - if (config.verbose) - fprintf(stderr, "repairing %s channel mismatch: got %d, want %d\n", - files.gl_pathv[f], image.channels(), nchan); - if (image.channels() < nchan) - cv::cvtColor(image, image, cv::COLOR_GRAY2BGR, nchan); - else if (image.channels() > nchan) - cv::cvtColor(image, image, cv::COLOR_BGR2GRAY, nchan); - } - if (accumulated.empty()) { - image.copyTo(accumulated); - } else { - accumulated = cv::max(accumulated, image); - } - } - } + for (auto& t : threadpool) + t.join(); - // Calculate some statistics - double min_mean, max_mean; + // Calculate some descriptive statistics + double ds_min, ds_max, ds_mean, ds_median; cv::Point min_loc; - cv::minMaxLoc(stats, &min_mean, &max_mean, &min_loc); - double mean_mean = cv::mean(stats)[0]; + + // Each thread will have updated stats with the brightness of the images + // that were successfully processed. Invalid images will be left as NAN. + // In OpenCV, NAN is unequal to everything including NAN which means we can + // filter out bogus entries by checking stats for element-wise equality + // with itself. + cv::Mat nan_mask = cv::Mat(stats == stats); + cv::Mat filtered_stats; + stats.copyTo(filtered_stats, nan_mask); + cv::minMaxLoc(stats, &ds_min, &ds_max, &min_loc); + ds_mean = cv::mean(filtered_stats)[0]; // For median, do partial sort and take middle value std::vector vec; - stats.copyTo(vec); + filtered_stats.copyTo(vec); std::nth_element(vec.begin(), vec.begin() + (vec.size() / 2), vec.end()); - double median_mean = vec[vec.size() / 2]; + ds_median = vec[vec.size() / 2]; - std::cout << "Minimum: " << min_mean << " maximum: " << max_mean - << " mean: " << mean_mean << " median: " << median_mean - << std::endl; + std::cout << "Minimum: " << ds_min << " maximum: " << ds_max + << " mean: " << ds_mean << " median: " << ds_median << std::endl; // If we still don't have an image (no images below threshold), copy the // minimum mean image so we see why if (config.startrails_enabled) { if (accumulated.empty()) { - std::cout << "No images below threshold, writing the minimum image only" - << std::endl; + fprintf( + stderr, + "No images below threshold %.3f, writing the minimum image only\n", + config.brightness_limit); accumulated = cv::imread(files.gl_pathv[min_loc.x], cv::IMREAD_UNCHANGED); } From 9874b6d49f8c103d4547fb1ccd9abba4d9aaee5e Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Wed, 13 Oct 2021 20:28:45 -0700 Subject: [PATCH 3/7] fix concurrency option --- src/startrails.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/startrails.cpp b/src/startrails.cpp index 3464b3e48..00d69a0ad 100644 --- a/src/startrails.cpp +++ b/src/startrails.cpp @@ -187,7 +187,7 @@ void parse_args(int argc, char** argv, struct config_t* cf) { while (1) { // getopt loop int option_index = 0; static struct option long_options[] = { - {"brightness", optional_argument, 0, 'b'}, + {"brightness", required_argument, 0, 'b'}, {"directory", required_argument, 0, 'd'}, {"extension", required_argument, 0, 'e'}, {"max-threads", required_argument, 0, 'm'}, @@ -237,8 +237,11 @@ void parse_args(int argc, char** argv, struct config_t* cf) { break; case 'm': tmp = atoi(optarg); - if ((tmp >= 1) || (tmp < cf->num_threads)) + if ((tmp >= 1) && (tmp < cf->num_threads)) cf->num_threads = tmp; + else + fprintf(stderr, "invalid number of threads %d; using %d\n", tmp, + cf->num_threads); break; case 'n': tmp = atoi(optarg); From d8148e3cd2c3159733713b85fafabf1135858470 Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Wed, 13 Oct 2021 21:05:14 -0700 Subject: [PATCH 4/7] better concurrency limit check --- src/startrails.cpp | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/startrails.cpp b/src/startrails.cpp index 00d69a0ad..2920617cf 100644 --- a/src/startrails.cpp +++ b/src/startrails.cpp @@ -176,13 +176,13 @@ void startrail_worker(int thread_num, } void parse_args(int argc, char** argv, struct config_t* cf) { - int c, tmp; + int c, tmp, ncpu = std::thread::hardware_concurrency(); cf->verbose = 0; cf->startrails_enabled = true; cf->img_height = cf->img_width = 0; cf->brightness_limit = 0.35; // not terrible in the city cf->nice_level = 10; - cf->num_threads = std::thread::hardware_concurrency(); + cf->num_threads = ncpu; while (1) { // getopt loop int option_index = 0; @@ -237,7 +237,7 @@ void parse_args(int argc, char** argv, struct config_t* cf) { break; case 'm': tmp = atoi(optarg); - if ((tmp >= 1) && (tmp < cf->num_threads)) + if ((tmp >= 1) && (tmp <= ncpu)) cf->num_threads = tmp; else fprintf(stderr, "invalid number of threads %d; using %d\n", tmp, From cd5ee1edd138eb98d6cdfb3aa782e5df16841bd2 Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Wed, 13 Oct 2021 21:07:48 -0700 Subject: [PATCH 5/7] clarify behavior of concurrency limit --- src/startrails.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/startrails.cpp b/src/startrails.cpp index 2920617cf..d098c8249 100644 --- a/src/startrails.cpp +++ b/src/startrails.cpp @@ -287,7 +287,7 @@ void usage_and_exit(int x) { std::cout << "-e | --extension : filter images to just this extension" << std::endl; std::cout << "-m | --max-threads : limit maximum number of processing " - "threads. (0 = nproc)" + "threads. (unspecified = use all cpus)" << std::endl; std::cout << "-n | --nice : nice(2) level of processing threads (10)" << std::endl; From ee4327a2985ed81e3dbc41dc91313cd00328bd27 Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Fri, 15 Oct 2021 20:26:05 -0700 Subject: [PATCH 6/7] Multithreaded keogram processing. Inspired by @sebmueller. This splits up the input images into batches of work more or less evenly divided across the number of processors. Two new flags are added (-m and -n were already used): * -q / --level to make the workers be nice. default = 10 * -Q / --max-threads to control the number of worker threads. default = ncpus Generates a keogram on my laptop in under 3.5s, even after doing horrible string manipulations to compute time of exposure for label generation. Tested on rpi3b too. --- src/keogram.cpp | 373 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 256 insertions(+), 117 deletions(-) diff --git a/src/keogram.cpp b/src/keogram.cpp index a7122c70a..c80ad1d0e 100644 --- a/src/keogram.cpp +++ b/src/keogram.cpp @@ -4,13 +4,18 @@ // Rotation added by Agustin Nunez @agnunez // SPDX-License-Identifier: MIT +using namespace std; + #include #include #include +#include #include +#include #include #include #include +#include #include #include @@ -35,21 +40,219 @@ struct config_t { int fontType; int lineWidth; int verbose; + int num_threads; + int nice_level; uint8_t a, r, g, b; double fontScale; double rotation_angle; double brightness_limit; } config; +std::mutex stdio_mutex; + void parse_args(int, char**, struct config_t*); void usage_and_exit(int); int get_font_by_name(char*); +void keogram_worker(int, // thread num + struct config_t*, // config + glob_t*, // file list + std::mutex*, // mutex + cv::Mat*, // accumulated + cv::Mat* // annotations +); + +void keogram_worker(int thread_num, + struct config_t* cf, + glob_t* files, + std::mutex* mtx, + cv::Mat* acc, + cv::Mat* ann) { + int start_num, end_num, batch_size, prevHour = -1, nchan = 0; + unsigned long nfiles = files->gl_pathc; + cv::Mat thread_accumulator; + + batch_size = nfiles / cf->num_threads; + start_num = thread_num * batch_size; + + // last thread has more work to do if the number of images isn't multiple of + // the number of threads + if ((thread_num + 1) == cf->num_threads) + end_num = nfiles - 1; + else + end_num = start_num + batch_size - 1; + + if (cf->verbose > 1) { + stdio_mutex.lock(); + fprintf(stderr, "thread %d/%d processing files %d-%d (%d/%lu)\n", + thread_num + 1, cf->num_threads, start_num, end_num, + end_num - start_num + 1, nfiles); + stdio_mutex.unlock(); + } + + for (int f = start_num; f <= end_num; f++) { + char* filename = files->gl_pathv[f]; + if (cf->verbose > 1) { + stdio_mutex.lock(); + fprintf(stderr, "[%d/%lu] %s\n", f + 1, nfiles, filename); + stdio_mutex.unlock(); + } -//------------------------------------------------------------------------------------------------------- -//------------------------------------------------------------------------------------------------------- + cv::Mat imagesrc = cv::imread(filename, cv::IMREAD_UNCHANGED); + if (!imagesrc.data) { + if (cf->verbose) { + stdio_mutex.lock(); + std::cout << "Error reading file " << filename << std::endl; + stdio_mutex.unlock(); + } + continue; + } + + if (cf->img_height && cf->img_width && + (imagesrc.cols != cf->img_width || imagesrc.rows != cf->img_height)) { + if (cf->verbose) { + stdio_mutex.lock(); + fprintf(stderr, + "%s image size %dx%d does not match expected size %dx%d\n", + filename, imagesrc.cols, imagesrc.cols, cf->img_width, + cf->img_height); + stdio_mutex.unlock(); + } + continue; + } + + if (nchan == 0) + nchan = imagesrc.channels(); + + if (imagesrc.channels() != nchan) { + if (cf->verbose) { + stdio_mutex.lock(); + fprintf(stderr, "repairing channel mismatch: %d != %d\n", + imagesrc.channels(), nchan); + stdio_mutex.unlock(); + } + if (imagesrc.channels() < nchan) + cv::cvtColor(imagesrc, imagesrc, cv::COLOR_GRAY2BGR, nchan); + else if (imagesrc.channels() > nchan) + cv::cvtColor(imagesrc, imagesrc, cv::COLOR_BGR2GRAY, nchan); + } + + if (cf->rotation_angle) { + if (cf->verbose > 1) { + stdio_mutex.lock(); + fprintf(stderr, "rotating image by %.2f\n", cf->rotation_angle); + stdio_mutex.unlock(); + } + cv::Point2f center((imagesrc.cols - 1) / 2.0, (imagesrc.rows - 1) / 2.0); + cv::Mat rot = cv::getRotationMatrix2D(center, cf->rotation_angle, 1.0); + cv::Rect2f bbox = + cv::RotatedRect(cv::Point2f(), imagesrc.size(), cf->rotation_angle) + .boundingRect2f(); + rot.at(0, 2) += bbox.width / 2.0 - imagesrc.cols / 2.0; + rot.at(1, 2) += bbox.height / 2.0 - imagesrc.rows / 2.0; + // cv::Mat imagedst; + // cv::warpAffine(imagesrc, imagedst, rot, bbox.size()); + cv::warpAffine(imagesrc, imagesrc, rot, bbox.size()); + } + + /* This seemingly redundant check saves a bunch of locking and unlocking + later. Maybe all the threads will see the accumlator as empty, so they will + all try grab the lock... + + The winner of that race initializes the accumulator with its image, and + releases the lock. The rest of the threads will - in turn - get the lock, + and on checking the accumulator again, find it no longer in need of + initialization, so they skip the .create(). + + Future iterations will all see that the accumulator is non-empty. + */ + if (acc->empty()) { + mtx->lock(); + if (acc->empty()) { + acc->create(imagesrc.rows, nfiles, imagesrc.type()); + if (cf->verbose > 1) { + stdio_mutex.lock(); + fprintf(stderr, "thread %d initialized accumulator\n", thread_num); + stdio_mutex.unlock(); + } + } + mtx->unlock(); + } + + // Copy middle column to destination + // locking not required - we have absolute index into the accumulator + imagesrc.col(imagesrc.cols / 2).copyTo(acc->col(f)); + + if (cf->labels_enabled) { + struct tm ft; // the time of the file, by any means necessary + if (cf->parse_filename) { + // engage your safety squints! + char* s; + s = strrchr(filename, '-'); + s++; + sscanf(s, "%04d%02d%02d%02d%02d%02d.%*s", &ft.tm_year, &ft.tm_mon, + &ft.tm_mday, &ft.tm_hour, &ft.tm_min, &ft.tm_sec); + } else { + // sometimes you can believe the file time on disk + struct stat s; + stat(filename, &s); + struct tm* t = localtime(&s.st_mtime); + ft.tm_hour = t->tm_hour; + } + + // record the annotation + if (ft.tm_hour != prevHour) { + if (prevHour != -1) { + mtx->lock(); + cv::Mat a = (cv::Mat_(1, 2) << f, ft.tm_hour); + ann->push_back(a); + mtx->unlock(); + } + prevHour = ft.tm_hour; + } + } + } +} + +void annotate_image(cv::Mat* ann, cv::Mat* acc, struct config_t* cf) { + int baseline = 0; + char hour[3]; + + if (cf->labels_enabled && !ann->empty()) { + for (int r = 0; r < ann->rows; r++) { + // Draw a dashed line and label for hour + cv::LineIterator it(*acc, cv::Point(ann->at(r, 0), 0), + cv::Point(ann->at(r, 0), acc->rows)); + for (int i = 0; i < it.count; i++, ++it) { + // 4 pixel dashed line + if (i & 4) { + uchar* p = *it; + for (int c = 0; c < it.elemSize; c++) { + *p = ~(*p); + p++; + } + } + } + + // Draw text label to the left of the dash + snprintf(hour, 3, "%02d", ann->at(r, 1)); + std::string text(hour); + cv::Size textSize = cv::getTextSize(text, cf->fontFace, cf->fontScale, + cf->lineWidth, &baseline); + + if (ann->at(r, 0) - textSize.width >= 0) { + cv::putText(*acc, text, + cv::Point(ann->at(r, 0) - textSize.width, + acc->rows - textSize.height), + cf->fontFace, cf->fontScale, + cv::Scalar(cf->b, cf->g, cf->r), cf->lineWidth, + cf->fontType); + } + } + } +} void parse_args(int argc, char** argv, struct config_t* cf) { - int c; + int c, tmp, ncpu = std::thread::hardware_concurrency(); cf->labels_enabled = true; cf->parse_filename = false; @@ -61,6 +264,8 @@ void parse_args(int argc, char** argv, struct config_t* cf) { cf->b = 0xff; cf->rotation_angle = 0; cf->verbose = cf->img_width = cf->img_height = 0; + cf->num_threads = ncpu; + cf->nice_level = 10; while (1) { // getopt loop int option_index = 0; @@ -75,13 +280,15 @@ void parse_args(int argc, char** argv, struct config_t* cf) { {"font-size", required_argument, 0, 'S'}, {"font-type", required_argument, 0, 'T'}, {"rotate", required_argument, 0, 'r'}, + {"max-threads", required_argument, 0, 'Q'}, + {"nice-level", required_argument, 0, 'q'}, {"parse-filename", no_argument, 0, 'p'}, {"no-label", no_argument, 0, 'n'}, {"verbose", no_argument, 0, 'v'}, {"help", no_argument, 0, 'h'}, {0, 0, 0, 0}}; - c = getopt_long(argc, argv, "d:e:o:r:s:C:L:N:S:T:npvh", long_options, + c = getopt_long(argc, argv, "d:e:o:r:s:C:L:N:S:T:Q:q:npvh", long_options, &option_index); if (c == -1) break; @@ -120,7 +327,6 @@ void parse_args(int argc, char** argv, struct config_t* cf) { cf->verbose++; break; case 'C': - int tmp; if (strchr(optarg, ' ')) { int r, g, b; sscanf(optarg, "%d %d %d", &b, &g, &r); @@ -154,6 +360,25 @@ void parse_args(int argc, char** argv, struct config_t* cf) { else cf->fontType = cv::LINE_8; break; + case 'Q': + tmp = atoi(optarg); + if ((tmp >= 1) && (tmp <= ncpu)) + cf->num_threads = tmp; + else + fprintf(stderr, "invalid number of threads %d; using %d\n", tmp, + cf->num_threads); + break; + case 'q': + tmp = atoi(optarg); + if (PRIO_MIN > tmp) { + tmp = PRIO_MIN; + fprintf(stderr, "clamping scheduler priority to PRIO_MIN\n"); + } else if (PRIO_MAX < tmp) { + fprintf(stderr, "clamping scheduler priority to PRIO_MAX\n"); + tmp = PRIO_MAX; + } + cf->nice_level = atoi(optarg); + break; default: break; } // option switch @@ -195,6 +420,12 @@ void usage_and_exit(int x) { std::cout << "-N | --font-name : font name (simplex)" << std::endl; std::cout << "-S | --font-size : font size (2.0)" << std::endl; std::cout << "-T | --font-type : font line type (1)" << std::endl; + std::cout << "-Q | --max-threads : limit maximum number of processing " + "threads. (use all cpus)" + << std::endl; + std::cout + << "-q | --nice-level : nice(2) level of processing threads (10)" + << std::endl; std::cout << KNRM << std::endl; std::cout @@ -238,7 +469,7 @@ int get_font_by_name(char* s) { int main(int argc, char* argv[]) { struct config_t config; - int i; + int i, r; char* e; parse_args(argc, argv, &config); @@ -252,6 +483,12 @@ int main(int argc, char* argv[]) { config.dst_keogram.empty()) usage_and_exit(3); + r = setpriority(PRIO_PROCESS, 0, config.nice_level); + if (r) { + config.nice_level = getpriority(PRIO_PROCESS, 0); + fprintf(stderr, "unable to set nice level: %s\n", strerror(errno)); + } + glob_t files; std::string wildcard = config.img_src_dir + "/*." + config.img_src_ext; glob(wildcard.c_str(), 0, NULL, &files); @@ -261,121 +498,23 @@ int main(int argc, char* argv[]) { return 0; } + std::mutex accumulated_mutex; cv::Mat accumulated; + cv::Mat annotations; + annotations.create(0, 2, CV_32S); + annotations = -1; - int prevHour = -1; - int nchan = 0; - - for (size_t f = 0; f < files.gl_pathc; f++) { - cv::Mat imagesrc = cv::imread(files.gl_pathv[f], cv::IMREAD_UNCHANGED); - if (!imagesrc.data) { - if (config.verbose) - std::cout << "Error reading file " << basename(files.gl_pathv[f]) - << std::endl; - continue; - } - - if (config.verbose > 1) - std::cout << "[" << f + 1 << "/" << files.gl_pathc << "] " - << basename(files.gl_pathv[f]) << std::endl; - - if (config.img_height && config.img_width && - (imagesrc.cols != config.img_width || - imagesrc.rows != config.img_height)) { - if (config.verbose) { - fprintf(stderr, - "%s image size %dx%d does not match expected size %dx%d\n", - files.gl_pathv[f], imagesrc.cols, imagesrc.cols, - config.img_width, config.img_height); - } - continue; - } - - if (nchan == 0) - nchan = imagesrc.channels(); - - if (imagesrc.channels() != nchan) { - if (config.verbose) { - fprintf(stderr, "repairing channel mismatch: %d != %d\n", - imagesrc.channels(), nchan); - } - if (imagesrc.channels() < nchan) - cv::cvtColor(imagesrc, imagesrc, cv::COLOR_GRAY2BGR, nchan); - else if (imagesrc.channels() > nchan) - cv::cvtColor(imagesrc, imagesrc, cv::COLOR_BGR2GRAY, nchan); - } - - cv::Point2f center((imagesrc.cols - 1) / 2.0, (imagesrc.rows - 1) / 2.0); - cv::Mat rot = cv::getRotationMatrix2D(center, config.rotation_angle, 1.0); - cv::Rect2f bbox = - cv::RotatedRect(cv::Point2f(), imagesrc.size(), config.rotation_angle) - .boundingRect2f(); - rot.at(0, 2) += bbox.width / 2.0 - imagesrc.cols / 2.0; - rot.at(1, 2) += bbox.height / 2.0 - imagesrc.rows / 2.0; - cv::Mat imagedst; - cv::warpAffine(imagesrc, imagedst, rot, bbox.size()); - if (accumulated.empty()) { - accumulated.create(imagedst.rows, files.gl_pathc, imagesrc.type()); - } - - // Copy middle column to destination - imagedst.col(imagedst.cols / 2).copyTo(accumulated.col(f)); - - if (config.labels_enabled) { - struct tm ft; // the time of the file, by any means necessary - if (config.parse_filename) { - // engage your safety squints! - char* s; - s = strrchr(files.gl_pathv[f], '-'); - s++; - sscanf(s, "%04d%02d%02d%02d%02d%02d.%*s", &ft.tm_year, &ft.tm_mon, - &ft.tm_mday, &ft.tm_hour, &ft.tm_min, &ft.tm_sec); - } else { - // sometimes you can believe the file time on disk - struct stat s; - stat(files.gl_pathv[f], &s); - struct tm* t = localtime(&s.st_mtime); - ft.tm_hour = t->tm_hour; - } + std::vector threadpool; + for (int tid = 0; tid < config.num_threads; tid++) + threadpool.push_back(std::thread(keogram_worker, tid, &config, &files, + &accumulated_mutex, &accumulated, + &annotations)); - if (ft.tm_hour != prevHour) { - if (prevHour != -1) { - // Draw a dashed line and label for hour - cv::LineIterator it(accumulated, cv::Point(f, 0), - cv::Point(f, accumulated.rows)); - for (int i = 0; i < it.count; i++, ++it) { - // 4 pixel dashed line - if (i & 4) { - uchar* p = *it; - for (int c = 0; c < it.elemSize; c++) { - *p = ~(*p); - p++; - } - } - } + for (auto& t : threadpool) + t.join(); - // Draw text label to the left of the dash - char hour[3]; - snprintf(hour, 3, "%02d", ft.tm_hour); - std::string text(hour); - int baseline = 0; - cv::Size textSize = - cv::getTextSize(text, config.fontFace, config.fontScale, - config.lineWidth, &baseline); - - if (f - textSize.width >= 0) { - cv::putText(accumulated, text, - cv::Point(f - textSize.width, - accumulated.rows - textSize.height), - config.fontFace, config.fontScale, - cv::Scalar(config.b, config.g, config.r), - config.lineWidth, config.fontType); - } - } - prevHour = ft.tm_hour; - } - } - } + if (config.labels_enabled) + annotate_image(&annotations, &accumulated, &config); globfree(&files); std::vector compression_params; From fde886f498ea059fbe619d35792aa6a39374c925 Mon Sep 17 00:00:00 2001 From: Chris Kuethe Date: Fri, 15 Oct 2021 23:46:14 -0700 Subject: [PATCH 7/7] unify multiprocessor flags now both of these tools use -Q / --max-threads to control the number of worker threads, and -q / --nice-level to set the scheduler priority. Q is vaguely mnemonic for queue (job scheduling) and it's not likely that you will want to change the settings without a good reason. Use as much CPU as possible, without dragging the whole system down, right? --- src/startrails.cpp | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/startrails.cpp b/src/startrails.cpp index d098c8249..39b79d95b 100644 --- a/src/startrails.cpp +++ b/src/startrails.cpp @@ -190,8 +190,8 @@ void parse_args(int argc, char** argv, struct config_t* cf) { {"brightness", required_argument, 0, 'b'}, {"directory", required_argument, 0, 'd'}, {"extension", required_argument, 0, 'e'}, - {"max-threads", required_argument, 0, 'm'}, - {"nice-level", required_argument, 0, 'n'}, + {"max-threads", required_argument, 0, 'Q'}, + {"nice-level", required_argument, 0, 'q'}, {"output", required_argument, 0, 'o'}, {"image-size", required_argument, 0, 's'}, {"statistics", no_argument, 0, 'S'}, @@ -199,7 +199,7 @@ void parse_args(int argc, char** argv, struct config_t* cf) { {"help", no_argument, 0, 'h'}, {0, 0, 0, 0}}; - c = getopt_long(argc, argv, "hvSb:d:e:m:n:o:s:", long_options, + c = getopt_long(argc, argv, "hvSb:d:e:Q:q:o:s:", long_options, &option_index); if (c == -1) break; @@ -235,7 +235,7 @@ void parse_args(int argc, char** argv, struct config_t* cf) { case 'e': cf->img_src_ext = optarg; break; - case 'm': + case 'Q': tmp = atoi(optarg); if ((tmp >= 1) && (tmp <= ncpu)) cf->num_threads = tmp; @@ -243,7 +243,7 @@ void parse_args(int argc, char** argv, struct config_t* cf) { fprintf(stderr, "invalid number of threads %d; using %d\n", tmp, cf->num_threads); break; - case 'n': + case 'q': tmp = atoi(optarg); if (PRIO_MIN > tmp) { tmp = PRIO_MIN; @@ -265,7 +265,7 @@ void parse_args(int argc, char** argv, struct config_t* cf) { void usage_and_exit(int x) { std::cout << "Usage: startrails [-v] -d -e [-b -o " - " | -s] [-m ] [-n ]" + " | -s] [-Q ] [-q ]" << std::endl; if (x) { std::cout << KRED @@ -286,10 +286,10 @@ void usage_and_exit(int x) { << std::endl; std::cout << "-e | --extension : filter images to just this extension" << std::endl; - std::cout << "-m | --max-threads : limit maximum number of processing " + std::cout << "-Q | --max-threads : limit maximum number of processing " "threads. (unspecified = use all cpus)" << std::endl; - std::cout << "-n | --nice : nice(2) level of processing threads (10)" + std::cout << "-q | --nice : nice(2) level of processing threads (10)" << std::endl; std::cout << "-o | --output-file : output image filename" << std::endl; std::cout << "-s | --image-size x : restrict processed images to "