Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refine extension start and close flow #55

Merged
merged 1 commit into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions core/include_internal/ten_runtime/engine/on_xxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@
#include "ten_runtime/ten_config.h"

typedef struct ten_env_t ten_env_t;
typedef struct ten_engine_t ten_engine_t;
typedef struct ten_extension_thread_t ten_extension_thread_t;

TEN_RUNTIME_PRIVATE_API void ten_engine_on_all_extensions_added(void *self_,
void *arg);
TEN_RUNTIME_PRIVATE_API void
ten_engine_find_extension_info_for_all_extensions_of_extension_thread(
void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void ten_engine_on_extension_thread_closed(void *self_,
void *arg);

TEN_RUNTIME_PRIVATE_API void ten_engine_on_extension_thread_initted(void *self_,
void *arg);

TEN_RUNTIME_PRIVATE_API void ten_engine_on_addon_create_extension_group_done(
void *self_, void *arg);

Expand Down
6 changes: 1 addition & 5 deletions core/include_internal/ten_runtime/extension/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,10 +170,9 @@ struct ten_extension_t {
ten_env_t *ten_env;

ten_extension_thread_t *extension_thread;
ten_hashhandle_t hh_in_extension_thread_extension_store;
ten_hashhandle_t hh_in_extension_store;

ten_extension_context_t *extension_context;
ten_hashhandle_t hh_in_extension_context_extension_store;

// TODO(Wei): The current situation is if an extension is generated by an
// extension_group addon and the extension is not an addon, then this
Expand Down Expand Up @@ -229,9 +228,6 @@ TEN_RUNTIME_PRIVATE_API bool
ten_extension_determine_and_merge_all_interface_dest_extension(
ten_extension_t *self);

TEN_RUNTIME_PRIVATE_API void ten_extension_link_its_ten_to_extension_context(
ten_extension_t *self, ten_extension_context_t *extension_context);

TEN_RUNTIME_PRIVATE_API void ten_extension_on_init(ten_env_t *ten_env);

TEN_RUNTIME_PRIVATE_API void ten_extension_on_start(ten_extension_t *self);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,10 @@ struct ten_extension_context_t {
ten_list_t extension_threads;

size_t extension_threads_cnt_of_initted;
size_t extension_threads_cnt_of_all_extensions_added_to_engine;
size_t extension_threads_cnt_of_all_extensions_stopped;
size_t extension_threads_cnt_of_closing_flag_is_set;
size_t extension_threads_cnt_of_closed;

ten_extension_store_t *extension_store;

ten_list_t extension_groups_info_from_graph;
ten_list_t extensions_info_from_graph; // ten_extension_info_t*

Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ TEN_RUNTIME_PRIVATE_API void ten_extension_store_destroy(
ten_extension_store_t *self);

TEN_RUNTIME_PRIVATE_API bool ten_extension_store_add_extension(
ten_extension_store_t *self, ten_extension_t *extension,
bool of_extension_thread);
ten_extension_store_t *self, ten_extension_t *extension);

TEN_RUNTIME_PRIVATE_API void ten_extension_store_del_extension(
ten_extension_store_t *self, ten_extension_t *extension,
bool of_extension_thread);
ten_extension_store_t *self, ten_extension_t *extension);

TEN_RUNTIME_PRIVATE_API ten_extension_t *ten_extension_store_find_extension(
ten_extension_store_t *self, const char *extension_group_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ typedef struct ten_extension_thread_t {
ten_list_t pending_msgs;

ten_list_t extensions; // ten_extension_t*
size_t extensions_cnt_of_added_to_engine;
size_t extensions_cnt_of_deleted_from_engine;
size_t extensions_cnt_of_on_stop_done;
size_t extensions_cnt_of_set_closing_flag;
Expand Down Expand Up @@ -117,15 +116,10 @@ ten_extension_thread_get_state(ten_extension_thread_t *self);
TEN_RUNTIME_PRIVATE_API void ten_extension_thread_set_state(
ten_extension_thread_t *self, TEN_EXTENSION_THREAD_STATE state);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_determine_all_extension_dest_from_graph(
ten_extension_thread_t *self);

TEN_RUNTIME_PRIVATE_API void ten_extension_thread_call_all_extension_on_start(
ten_extension_thread_t *self);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_start_to_add_all_created_extension_to_engine(
TEN_RUNTIME_PRIVATE_API void ten_extension_thread_add_all_created_extensions(
ten_extension_thread_t *self);

TEN_RUNTIME_PRIVATE_API ten_runloop_t *
Expand Down
12 changes: 1 addition & 11 deletions core/include_internal/ten_runtime/extension_thread/on_xxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,6 @@ ten_extension_thread_on_addon_create_extension_done_info_destroy(
TEN_RUNTIME_API void ten_extension_inherit_thread_ownership(
ten_extension_t *self, ten_extension_thread_t *extension_thread);

TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_extension_added_to_engine(
void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_on_extension_deleted_from_engine(void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_on_extension_group_on_init_done(void *self_, void *arg);

Expand All @@ -47,12 +41,8 @@ TEN_RUNTIME_PRIVATE_API void ten_extension_thread_pre_close(void *self_,
TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_extension_set_closing_flag(
void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_extension_on_deinit_done(
void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_on_all_extensions_in_all_extension_threads_added_to_engine(
void *self_, void *arg);
ten_extension_thread_start_life_cycle_of_all_extensions(void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_on_extension_group_on_deinit_done(void *self_, void *arg);
Expand Down
179 changes: 90 additions & 89 deletions core/src/ten_runtime/engine/on_xxx.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "include_internal/ten_runtime/engine/on_xxx.h"

#include "include_internal/ten_runtime/addon/addon.h"
#include "include_internal/ten_runtime/app/app.h"
#include "include_internal/ten_runtime/engine/engine.h"
#include "include_internal/ten_runtime/engine/msg_interface/common.h"
#include "include_internal/ten_runtime/extension/extension.h"
Expand All @@ -26,88 +27,10 @@
#include "ten_utils/macro/mark.h"
#include "ten_utils/sanitizer/thread_check.h"

void ten_engine_on_all_extensions_added(void *self_, void *arg) {
ten_engine_t *self = self_;
static void ten_engine_on_extension_thread_is_ready(
ten_engine_t *self, ten_extension_thread_t *extension_thread) {
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");

TEN_UNUSED ten_extension_thread_t *extension_thread = arg;
TEN_ASSERT(
extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: this function does not access this extension_thread,
// we just check if the arg is an ten_extension_thread_t.
ten_extension_thread_check_integrity(extension_thread, false),
"Should not happen.");

self->extension_context
->extension_threads_cnt_of_all_extensions_added_to_engine++;
if (self->extension_context
->extension_threads_cnt_of_all_extensions_added_to_engine ==
ten_list_size(&self->extension_context->extension_threads)) {
TEN_LOGD("[%s] All extension threads has added all extensions to engine.",
ten_engine_get_name(self));

ten_list_foreach (&self->extension_context->extension_threads, iter) {
ten_extension_thread_t *extension_thread =
ten_ptr_listnode_get(iter.node);
TEN_ASSERT(extension_thread && ten_extension_thread_check_integrity(
extension_thread, false),
"Should not happen.");

ten_runloop_post_task_tail(
ten_extension_thread_get_attached_runloop(extension_thread),
ten_extension_thread_on_all_extensions_in_all_extension_threads_added_to_engine,
extension_thread, NULL);
}
}
}

void ten_engine_on_extension_thread_closed(void *self_, void *arg) {
ten_engine_t *self = self_;
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");

ten_extension_thread_t *extension_thread = arg;
TEN_ASSERT(
extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: this function does not access this extension_thread,
// we just check if the arg is an ten_extension_thread_t.
ten_extension_thread_check_integrity(extension_thread, false),
"Should not happen.");

TEN_LOGD("[%s] Waiting for extension thread (%p) be reclaimed.",
ten_engine_get_name(self), extension_thread);
TEN_UNUSED int rc =
ten_thread_join(ten_sanitizer_thread_check_get_belonging_thread(
&extension_thread->thread_check),
-1);
TEN_ASSERT(!rc, "Should not happen.");
TEN_LOGD("[%s] Extension thread (%p) is reclaimed.",
ten_engine_get_name(self), extension_thread);

// Extension thread is disappear, so we migrate the extension_group and
// extension_thread to the engine thread now.
ten_sanitizer_thread_check_inherit_from(&extension_thread->thread_check,
&self->thread_check);
ten_sanitizer_thread_check_inherit_from(
&extension_thread->extension_group->thread_check, &self->thread_check);
ten_sanitizer_thread_check_inherit_from(
&extension_thread->extension_group->ten_env->thread_check,
&self->thread_check);

self->extension_context->extension_threads_cnt_of_closed++;

ten_extension_context_on_close(self->extension_context);
}

void ten_engine_on_extension_thread_initted(void *self_, void *arg) {
ten_engine_t *self = self_;
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");

ten_extension_thread_t *extension_thread = arg;
TEN_ASSERT(
extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
Expand All @@ -122,15 +45,6 @@ void ten_engine_on_extension_thread_initted(void *self_, void *arg) {
TEN_LOGD("[%s] All extension threads are initted.",
ten_engine_get_name(self));

ten_list_foreach (&self->extension_context->extension_threads, iter) {
ten_extension_thread_t *extension_thread =
ten_ptr_listnode_get(iter.node);
TEN_ASSERT(extension_thread && ten_extension_thread_check_integrity(
extension_thread, false),
"Should not happen.");
}

// =-=-= 还需要嘛?
// All the extension threads requested by this command have been completed,
// return the result for this command.
//
Expand Down Expand Up @@ -195,6 +109,93 @@ void ten_engine_on_extension_thread_initted(void *self_, void *arg) {
}
}

void ten_engine_find_extension_info_for_all_extensions_of_extension_thread(
void *self_, void *arg) {
ten_engine_t *self = self_;
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");

ten_extension_context_t *extension_context = self->extension_context;
TEN_ASSERT(extension_context &&
ten_extension_context_check_integrity(extension_context, true),
"Should not happen.");

TEN_UNUSED ten_extension_thread_t *extension_thread = arg;
TEN_ASSERT(
extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: this function does not access this extension_thread,
// we just check if the arg is an ten_extension_thread_t.
ten_extension_thread_check_integrity(extension_thread, false),
"Should not happen.");

ten_list_foreach (&extension_thread->extensions, iter) {
ten_extension_t *extension = ten_ptr_listnode_get(iter.node);
TEN_ASSERT(ten_extension_check_integrity(extension, false),
"Should not happen.");

// Setup 'extension_context' field, this is the most important field when
// extension is initiating.
extension->extension_context = extension_context;

// Find the extension_info of the specified 'extension'.
extension->extension_info =
ten_extension_context_get_extension_info_by_name(
extension_context,
ten_string_get_raw_str(
ten_app_get_uri(extension_context->engine->app)),
ten_string_get_raw_str(&extension_context->engine->graph_name),
ten_string_get_raw_str(&extension_thread->extension_group->name),
ten_string_get_raw_str(&extension->name));
}

ten_engine_on_extension_thread_is_ready(self, extension_thread);

ten_runloop_post_task_tail(
ten_extension_thread_get_attached_runloop(extension_thread),
ten_extension_thread_start_life_cycle_of_all_extensions, extension_thread,
NULL);
}

void ten_engine_on_extension_thread_closed(void *self_, void *arg) {
ten_engine_t *self = self_;
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
"Should not happen.");

ten_extension_thread_t *extension_thread = arg;
TEN_ASSERT(
extension_thread &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: this function does not access this extension_thread,
// we just check if the arg is an ten_extension_thread_t.
ten_extension_thread_check_integrity(extension_thread, false),
"Should not happen.");

TEN_LOGD("[%s] Waiting for extension thread (%p) be reclaimed.",
ten_engine_get_name(self), extension_thread);
TEN_UNUSED int rc =
ten_thread_join(ten_sanitizer_thread_check_get_belonging_thread(
&extension_thread->thread_check),
-1);
TEN_ASSERT(!rc, "Should not happen.");
TEN_LOGD("[%s] Extension thread (%p) is reclaimed.",
ten_engine_get_name(self), extension_thread);

// Extension thread is disappear, so we migrate the extension_group and
// extension_thread to the engine thread now.
ten_sanitizer_thread_check_inherit_from(&extension_thread->thread_check,
&self->thread_check);
ten_sanitizer_thread_check_inherit_from(
&extension_thread->extension_group->thread_check, &self->thread_check);
ten_sanitizer_thread_check_inherit_from(
&extension_thread->extension_group->ten_env->thread_check,
&self->thread_check);

self->extension_context->extension_threads_cnt_of_closed++;

ten_extension_context_on_close(self->extension_context);
}

void ten_engine_on_addon_create_extension_group_done(void *self_, void *arg) {
ten_engine_t *self = self_;
TEN_ASSERT(self && ten_engine_check_integrity(self, true),
Expand Down
21 changes: 0 additions & 21 deletions core/src/ten_runtime/extension/extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -767,27 +767,6 @@ ten_runloop_t *ten_extension_get_attached_runloop(ten_extension_t *self) {
return self->extension_thread->runloop;
}

void ten_extension_link_its_ten_to_extension_context(
ten_extension_t *self, ten_extension_context_t *extension_context) {
TEN_ASSERT(self, "Invalid argument.");
TEN_ASSERT(ten_extension_check_integrity(self, true),
"Invalid use of extension %p.", self);

TEN_ASSERT(
extension_context &&
// TEN_NOLINTNEXTLINE(thread-check)
// thread-check: We are in the extension thread, and throughout the
// entire lifecycle of the extension, the extension_context where the
// extension resides remains unchanged. Even in the closing flow, the
// extension_context is closed later than the extension itself.
// Therefore, using a pointer to the extension_context within the
// extension thread is thread-safe.
ten_extension_context_check_integrity(extension_context, false),
"Should not happen.");

self->extension_context = extension_context;
}

static void ten_extension_on_configure(ten_env_t *ten_env) {
TEN_ASSERT(ten_env && ten_env_check_integrity(ten_env, true),
"Should not happen.");
Expand Down
Loading
Loading