From 6773fa4ccda2e404f5d6a7d0113002e84e0028e0 Mon Sep 17 00:00:00 2001 From: Balazs Scheidler Date: Sun, 3 Apr 2022 11:07:25 +0200 Subject: [PATCH] iv_work: add support for submitting work_items from within worker threads This patch adds iv_work_pool_submit_continuation() function that allows the submission of work items from any of the worker threads. The idea is to make it possible for work item handlers to decide if some of their tasks is best carried out by a separate work item, to be executed in parallel. Signed-off-by: Balazs Scheidler --- libivykis.posix.ver | 4 ++++ man3/iv_work.3 | 20 ++++++++++++++++- src/include/iv_work.h | 2 ++ src/iv_work.c | 51 ++++++++++++++++++++++++++++++++++++++++--- 4 files changed, 73 insertions(+), 4 deletions(-) diff --git a/libivykis.posix.ver b/libivykis.posix.ver index 2962cb60..9bb7891f 100644 --- a/libivykis.posix.ver +++ b/libivykis.posix.ver @@ -111,3 +111,7 @@ IVYKIS_0.40 { # iv_timer __iv_now_location_valid; } IVYKIS_0.33; + +IVYKIS_0.42 { + iv_work_pool_submit_continuation; +} IVYKIS_0.40; diff --git a/man3/iv_work.3 b/man3/iv_work.3 index 011fcf9a..4d7f5b8a 100644 --- a/man3/iv_work.3 +++ b/man3/iv_work.3 @@ -5,7 +5,7 @@ .\" of the modification is added to the header. .TH iv_work 3 2010-09-14 "ivykis" "ivykis programmer's manual" .SH NAME -IV_WORK_POOL_INIT, iv_work_pool_create, iv_work_pool_put, IV_WORK_ITEM_INIT, iv_work_pool_submit_work \- ivykis +IV_WORK_POOL_INIT, iv_work_pool_create, iv_work_pool_put, IV_WORK_ITEM_INIT, iv_work_pool_submit_work, iv_work_pool_submit_continuation \- ivykis worker thread management .SH SYNOPSIS .B #include @@ -35,6 +35,8 @@ struct iv_work_item { .br .BI "int iv_work_pool_submit_work(struct iv_work_pool *" this ", struct iv_work_item *" work ");" .br +.BI "int iv_work_pool_submit_continuation(struct iv_work_pool *" this ", struct iv_work_item *" work ");" +.br .SH DESCRIPTION Calling .B iv_work_pool_create @@ -81,6 +83,20 @@ as its sole argument, in the thread that .B iv_work_pool_create was called in for this pool object. .PP +Calling +.B iv_work_pool_submit_continuation +from a worker thread allows submitting a work item similarly to +.B iv_work_pool_submit_work. +But while +.B iv_work_pool_submit_work +can only be called from the thread owning +.B iv_work, +.B iv_work_pool_submit_continuation +can be called from any of the worker threads. The +.B ->completion +callback of these jobs will be executed from the thread owning +.B iv_work. +.PP As a special case, calling .B iv_work_pool_submit_work with a @@ -117,6 +133,8 @@ are also not explicitly serialised. can only be called from the thread that .B iv_work_pool_create for this pool object was called in. +.B iv_work_pool_submit_continuation +can called from any of the worker threads. .PP There is no way to cancel submitted work items. .PP diff --git a/src/include/iv_work.h b/src/include/iv_work.h index 5e90fc7e..840ddfa1 100644 --- a/src/include/iv_work.h +++ b/src/include/iv_work.h @@ -59,6 +59,8 @@ int iv_work_pool_create(struct iv_work_pool *this); void iv_work_pool_put(struct iv_work_pool *this); void iv_work_pool_submit_work(struct iv_work_pool *this, struct iv_work_item *work); +void iv_work_pool_submit_continuation(struct iv_work_pool *this, + struct iv_work_item *work); #ifdef __cplusplus } diff --git a/src/iv_work.c b/src/iv_work.c index 20af69d2..9ee74cf2 100644 --- a/src/iv_work.c +++ b/src/iv_work.c @@ -34,7 +34,9 @@ struct work_pool_priv { ___mutex_t lock; struct iv_event ev; + struct iv_event thread_needed; int shutting_down; + int max_threads; int started_threads; struct iv_list_head idle_threads; void *cookie; @@ -44,6 +46,7 @@ struct work_pool_priv { uint32_t seq_tail; struct iv_list_head work_items; struct iv_list_head work_done; + unsigned long tid; }; struct work_pool_thread { @@ -213,6 +216,7 @@ static void iv_work_event(void *_pool) ___mutex_unlock(&pool->lock); ___mutex_destroy(&pool->lock); iv_event_unregister(&pool->ev); + iv_event_unregister(&pool->thread_needed); free(pool); return; } @@ -220,6 +224,23 @@ static void iv_work_event(void *_pool) } } +static int iv_work_start_thread(struct work_pool_priv *pool); + +static void iv_work_thread_needed(void *_pool) +{ + struct work_pool_priv *pool = _pool; + + ___mutex_lock(&pool->lock); + + if (iv_list_empty(&pool->idle_threads) && + pool->started_threads < pool->max_threads) { + iv_work_start_thread(pool); + } + + ___mutex_unlock(&pool->lock); + +} + int iv_work_pool_create(struct iv_work_pool *this) { struct work_pool_priv *pool; @@ -240,6 +261,12 @@ int iv_work_pool_create(struct iv_work_pool *this) pool->ev.handler = iv_work_event; iv_event_register(&pool->ev); + IV_EVENT_INIT(&pool->thread_needed); + pool->thread_needed.cookie = pool; + pool->thread_needed.handler = iv_work_thread_needed; + iv_event_register(&pool->thread_needed); + + pool->max_threads = this->max_threads; pool->shutting_down = 0; pool->started_threads = 0; INIT_IV_LIST_HEAD(&pool->idle_threads); @@ -250,6 +277,8 @@ int iv_work_pool_create(struct iv_work_pool *this) pool->seq_tail = 0; INIT_IV_LIST_HEAD(&pool->work_items); INIT_IV_LIST_HEAD(&pool->work_done); + + pool->tid = iv_get_thread_id(); this->priv = pool; @@ -308,9 +337,13 @@ static int iv_work_start_thread(struct work_pool_priv *pool) } static void -iv_work_submit_pool(struct iv_work_pool *this, struct iv_work_item *work) +iv_work_submit_pool(struct iv_work_pool *this, struct iv_work_item *work, int continuation) { struct work_pool_priv *pool = this->priv; + int called_from_owner_thread = (pool->tid == iv_get_thread_id()); + + if (!continuation && !called_from_owner_thread) + iv_fatal("iv_work_submit_pool: work items can only be submitted from the owning thread"); ___mutex_lock(&pool->lock); @@ -325,7 +358,10 @@ iv_work_submit_pool(struct iv_work_pool *this, struct iv_work_item *work) thr->kicked = 1; iv_event_post(&thr->kick); } else if (pool->started_threads < this->max_threads) { - iv_work_start_thread(pool); + if (called_from_owner_thread) + iv_work_start_thread(pool); + else + iv_event_post(&pool->thread_needed); } ___mutex_unlock(&pool->lock); @@ -391,7 +427,16 @@ void iv_work_pool_submit_work(struct iv_work_pool *this, struct iv_work_item *work) { if (this != NULL) - iv_work_submit_pool(this, work); + iv_work_submit_pool(this, work, 0); + else + iv_work_submit_local(work); +} + +void +iv_work_pool_submit_continuation(struct iv_work_pool *this, struct iv_work_item *work) +{ + if (this != NULL) + iv_work_submit_pool(this, work, 1); else iv_work_submit_local(work); }