From b1a3b4ae18937fc79eef908246b50b5314ae4509 Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Thu, 26 Sep 2024 22:38:33 +0800 Subject: [PATCH 1/3] feat: add more standalone testing logic --- .vscode/launch.json | 2 +- .../extension_thread/extension_thread.h | 5 +- .../ten_runtime/test/extension_test.h | 7 ++ .../ten_runtime/engine/msg_interface/common.c | 19 +--- .../extension_thread/extension_thread.c | 70 +++++++++---- .../src/ten_runtime/extension_thread/on_xxx.c | 5 + core/src/ten_runtime/test/extension_test.c | 98 ++++++++++++++----- .../ten_runtime/smoke/standalone_test/new.cc | 2 + 8 files changed, 143 insertions(+), 65 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 78cfc7472..b26bccb29 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -129,7 +129,7 @@ "request": "launch", "program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test", "args": [ - "--gtest_filter=DataTest.MultiDestData" + "--gtest_filter=StandaloneTest.New" ], "cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/", "env": { diff --git a/core/include_internal/ten_runtime/extension_thread/extension_thread.h b/core/include_internal/ten_runtime/extension_thread/extension_thread.h index 6a97b7ebb..dbdc2bbc6 100644 --- a/core/include_internal/ten_runtime/extension_thread/extension_thread.h +++ b/core/include_internal/ten_runtime/extension_thread/extension_thread.h @@ -81,6 +81,8 @@ typedef struct ten_extension_thread_t { ten_extension_context_t *extension_context; ten_runloop_t *runloop; + + ten_event_t *runloop_is_ready_to_use; } ten_extension_thread_t; TEN_RUNTIME_API bool ten_extension_thread_not_call_by_me( @@ -103,9 +105,6 @@ TEN_RUNTIME_PRIVATE_API void ten_extension_thread_attach_to_context_and_group( ten_extension_thread_t *self, ten_extension_context_t *extension_context, ten_extension_group_t *extension_group); -TEN_RUNTIME_PRIVATE_API void ten_extension_thread_attach_to_group( - ten_extension_thread_t *self, ten_extension_group_t *extension_group); - TEN_RUNTIME_PRIVATE_API void ten_extension_thread_destroy( ten_extension_thread_t *self); diff --git a/core/include_internal/ten_runtime/test/extension_test.h b/core/include_internal/ten_runtime/test/extension_test.h index 3466d1175..632ff6fb0 100644 --- a/core/include_internal/ten_runtime/test/extension_test.h +++ b/core/include_internal/ten_runtime/test/extension_test.h @@ -27,6 +27,7 @@ TEN_RUNTIME_API void ten_extension_test_destroy(ten_extension_test_t *self); typedef struct ten_extension_test_new_t { ten_thread_t *test_app_thread; + ten_string_t test_extension_addon_name; ten_env_proxy_t *test_app_ten_env_proxy; ten_event_t *test_app_ten_env_proxy_create_completed; } ten_extension_test_new_t; @@ -35,3 +36,9 @@ TEN_RUNTIME_API ten_extension_test_new_t *ten_extension_test_create_new(void); TEN_RUNTIME_API void ten_extension_test_destroy_new( ten_extension_test_new_t *self); + +TEN_RUNTIME_API void ten_extension_test_start_new( + ten_extension_test_new_t *self); + +TEN_RUNTIME_API void ten_extension_test_add_addon( + ten_extension_test_new_t *self, const char *addon_name); diff --git a/core/src/ten_runtime/engine/msg_interface/common.c b/core/src/ten_runtime/engine/msg_interface/common.c index d30234e99..31171d366 100644 --- a/core/src/ten_runtime/engine/msg_interface/common.c +++ b/core/src/ten_runtime/engine/msg_interface/common.c @@ -24,7 +24,6 @@ #include "include_internal/ten_runtime/msg/msg.h" #include "include_internal/ten_runtime/msg/msg_info.h" #include "include_internal/ten_runtime/remote/remote.h" -#include "ten_utils/macro/check.h" #include "ten_runtime/app/app.h" #include "ten_utils/container/list.h" #include "ten_utils/container/list_node.h" @@ -32,6 +31,7 @@ #include "ten_utils/lib/mutex.h" #include "ten_utils/lib/smart_ptr.h" #include "ten_utils/lib/string.h" +#include "ten_utils/macro/check.h" #include "ten_utils/macro/mark.h" static void ten_engine_prepend_to_in_msgs_queue(ten_engine_t *self, @@ -124,23 +124,6 @@ static void ten_engine_handle_in_msgs_sync(ten_engine_t *self) { ten_msg_get_src_app_uri(msg)); } } - } else { - // The cmd's 'origin_connection' will be NULL in the following two - // cases. - // - // 1) When the engine is the predefined graph engine, and the cmd is fed - // directly by the TEN app to this engine, so there is no - // 'connection' for this cmd. - // - // 2) The engine receives a cmd whose receiver is expected to be another - // engine in the same app, but the receiver (engine) does _not_ - // exist. In this case, the app would create an error cmd result - // as the result, and send back to the original engine, so the cmd - // in this case would be a cmd result returned from the app. Refer to - // 'ten_app_on_msg_default_handler()'. - TEN_ASSERT(ten_list_size(&self->app->predefined_graph_infos) || - ten_msg_get_type(msg) == TEN_MSG_TYPE_CMD_RESULT, - "Should not happen."); } } diff --git a/core/src/ten_runtime/extension_thread/extension_thread.c b/core/src/ten_runtime/extension_thread/extension_thread.c index 28bee802a..b0b0a783d 100644 --- a/core/src/ten_runtime/extension_thread/extension_thread.c +++ b/core/src/ten_runtime/extension_thread/extension_thread.c @@ -19,20 +19,22 @@ #include "include_internal/ten_runtime/extension_context/extension_context.h" #include "include_internal/ten_runtime/extension_context/internal/add_extension.h" #include "include_internal/ten_runtime/extension_group/extension_group.h" +#include "include_internal/ten_runtime/extension_group/on_xxx.h" #include "include_internal/ten_runtime/extension_store/extension_store.h" #include "include_internal/ten_runtime/extension_thread/msg_interface/common.h" #include "include_internal/ten_runtime/msg/msg.h" #include "include_internal/ten_runtime/ten_env/ten_env.h" -#include "ten_utils/macro/check.h" #include "include_internal/ten_utils/sanitizer/thread_check.h" #include "ten_runtime/extension/extension.h" #include "ten_runtime/ten_env/ten_env.h" #include "ten_utils/container/list.h" #include "ten_utils/io/runloop.h" #include "ten_utils/lib/alloc.h" +#include "ten_utils/lib/event.h" #include "ten_utils/lib/mutex.h" #include "ten_utils/lib/string.h" #include "ten_utils/lib/thread.h" +#include "ten_utils/macro/check.h" #include "ten_utils/macro/mark.h" #include "ten_utils/sanitizer/thread_check.h" @@ -102,12 +104,14 @@ ten_extension_thread_t *ten_extension_thread_create(void) { self->lock_mode_lock = ten_mutex_create(); ten_sanitizer_thread_check_init(&self->thread_check); + self->runloop = NULL; + self->runloop_is_ready_to_use = ten_event_create(0, 0); return self; } -void ten_extension_thread_attach_to_group( +static void ten_extension_thread_attach_to_group( ten_extension_thread_t *self, ten_extension_group_t *extension_group) { TEN_ASSERT(self, "Invalid argument."); TEN_ASSERT(ten_extension_thread_check_integrity(self, false), @@ -146,6 +150,8 @@ void ten_extension_thread_destroy(ten_extension_thread_t *self) { self->runloop = NULL; } + ten_event_destroy(self->runloop_is_ready_to_use); + ten_sanitizer_thread_check_deinit(&self->thread_check); ten_extension_store_destroy(self->extension_store); @@ -193,15 +199,6 @@ static void ten_extension_thread_notify_engine_we_are_closed( engine, self); } -static void ten_extension_thread_start_runloop(ten_extension_thread_t *self) { - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT(ten_extension_thread_check_integrity(self, true), - "Invalid use of extension_thread %p.", self); - - ten_runloop_post_task_tail( - self->runloop, ten_extension_thread_handle_start_msg_task, self, NULL); -} - ten_runloop_t *ten_extension_thread_get_attached_runloop( ten_extension_thread_t *self) { TEN_ASSERT(self && @@ -259,8 +256,15 @@ void *ten_extension_thread_main_actual(ten_extension_thread_t *self) { self->runloop = ten_runloop_create(NULL); TEN_ASSERT(self->runloop, "Should not happen."); - // Run the extension thread event loop - ten_extension_thread_start_runloop(self); + ten_runloop_post_task_tail( + self->runloop, ten_extension_thread_handle_start_msg_task, self, NULL); + + // Before actually starting the extension thread's runloop, first notify the + // engine (extension_context) that the extension thread's runloop is ready for + // use. + ten_event_set(self->runloop_is_ready_to_use); + + // Run the extension thread event loop. ten_runloop_run(self->runloop); ten_extension_thread_notify_engine_we_are_closed(self); @@ -285,6 +289,21 @@ void ten_extension_thread_start(ten_extension_thread_t *self) { "Should not happen."); ten_thread_create("extension thread", ten_extension_thread_main, self); + + // The runloop of the extension_thread is created within the extension thread + // itself, which introduces a time gap. If the engine (extension_context) + // attempts to post a task to the runloop of extension_thread before the + // runloop has been created, it would result in a segmentation fault since the + // runloop would still be NULL. There are two approaches to handle this + // situation: + // + // 1) Protect both the extension_thread and engine access to + // extension_thread::runloop with a mutex. But this is too heavy. + // 2) The approach adopted here is to have the engine thread wait briefly + // until the runloop is successfully created by the extension_thread before + // proceeding. This eliminates the need to lock every time the runloop is + // accessed. + ten_event_wait(self->runloop_is_ready_to_use, -1); } static void ten_extension_thread_on_triggering_close(void *self_, @@ -303,13 +322,26 @@ static void ten_extension_thread_on_triggering_close(void *self_, ten_extension_thread_set_state(self, TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE); - // Loop for all the containing extensions, and call their on_stop(). - ten_list_foreach (&self->extensions, iter) { - ten_extension_t *extension = ten_ptr_listnode_get(iter.node); - TEN_ASSERT(ten_extension_check_integrity(extension, true), - "Should not happen."); + if (ten_list_size(&self->extensions)) { + // Loop for all the containing extensions, and call their on_stop(). + ten_list_foreach (&self->extensions, iter) { + ten_extension_t *extension = ten_ptr_listnode_get(iter.node); + TEN_ASSERT(ten_extension_check_integrity(extension, true), + "Should not happen."); - ten_extension_on_stop(extension); + ten_extension_on_stop(extension); + } + } else { + // TODO(Wei): There will be a time gap that needs to be handled. After the + // extension group starts to create_extensions, but before + // create_extensions_done, if a close command is received, a memory leak may + // occur (i.e., those created extensions). A possible solution is to enter a + // specific state when create_extensions is called. If a close command is + // received during this state, simply record it. Later, when + // create_extensions_done is reached, based on this state, decide whether to + // proceed normally or to destroy the created extensions and enter the + // standard extension group deinitialization process. + ten_extension_group_on_deinit(self->extension_group); } } diff --git a/core/src/ten_runtime/extension_thread/on_xxx.c b/core/src/ten_runtime/extension_thread/on_xxx.c index 793fd9ad9..754800b87 100644 --- a/core/src/ten_runtime/extension_thread/on_xxx.c +++ b/core/src/ten_runtime/extension_thread/on_xxx.c @@ -146,6 +146,11 @@ void ten_extension_thread_on_extension_group_on_init_done( TEN_ASSERT(ten_extension_thread_check_integrity(self, true), "Invalid use of extension_thread %p.", self); + if (ten_extension_thread_get_state(self) >= + TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) { + return; + } + ten_extension_group_t *extension_group = self->extension_group; TEN_ASSERT(extension_group && ten_extension_group_check_integrity(extension_group, true), diff --git a/core/src/ten_runtime/test/extension_test.c b/core/src/ten_runtime/test/extension_test.c index ee67a5e4d..70557a912 100644 --- a/core/src/ten_runtime/test/extension_test.c +++ b/core/src/ten_runtime/test/extension_test.c @@ -18,6 +18,7 @@ #include "ten_runtime/app/app.h" #include "ten_runtime/extension/extension.h" #include "ten_runtime/msg/cmd/close_app/cmd.h" +#include "ten_runtime/msg/cmd/start_graph/cmd.h" #include "ten_runtime/msg/msg.h" #include "ten_runtime/ten_env/internal/metadata.h" #include "ten_runtime/ten_env/internal/on_xxx_done.h" @@ -25,7 +26,9 @@ #include "ten_utils/container/list.h" #include "ten_utils/container/list_ptr.h" #include "ten_utils/lib/event.h" +#include "ten_utils/lib/json.h" #include "ten_utils/lib/smart_ptr.h" +#include "ten_utils/lib/string.h" #include "ten_utils/lib/thread.h" #include "ten_utils/macro/check.h" #include "ten_utils/macro/mark.h" @@ -50,30 +53,6 @@ static void *ten_extension_thread_main(void *self_) { } static void test_ten_app_on_configure(ten_app_t *app, ten_env_t *ten_env) { -#if 0 - const char *property_json = - "{\ - \"_ten\": {\ - \"predefined_graphs\": [{\ - \"name\": \"0\",\ - \"auto_start\": false,\ - \"nodes\": [{\ - \"type\": \"extension_group\",\ - \"name\": \"test_extension_group\",\ - \"addon\": \"test_extension_group\"\ - },{\ - \"type\": \"extension\",\ - \"name\": \"...\",\ - \"addon\": \"...\",\ - \"extension_group\": \"test_extension_group\"\ - }]\ - }]\ - }\ - }"; - bool rc = ten_env_init_property_from_json(ten_env, property_json, NULL); - TEN_ASSERT(rc, "Should not happen."); -#endif - bool rc = ten_env_on_configure_done(ten_env, NULL); TEN_ASSERT(rc, "Should not happen."); } @@ -128,6 +107,7 @@ ten_extension_test_new_t *ten_extension_test_create_new(void) { TEN_ASSERT(self, "Failed to allocate memory."); self->test_app_ten_env_proxy = NULL; + ten_string_init(&self->test_extension_addon_name); self->test_app_ten_env_proxy_create_completed = ten_event_create(0, 1); self->test_app_thread = @@ -138,6 +118,14 @@ ten_extension_test_new_t *ten_extension_test_create_new(void) { return self; } +void ten_extension_test_add_addon(ten_extension_test_new_t *self, + const char *addon_name) { + TEN_ASSERT(self, "Invalid argument."); + TEN_ASSERT(addon_name, "Invalid argument."); + + ten_string_set_formatted(&self->test_extension_addon_name, "%s", addon_name); +} + static void ten_env_proxy_notify_close_app(ten_env_t *ten_env, TEN_UNUSED void *user_data) { TEN_ASSERT( @@ -159,6 +147,57 @@ static void ten_env_proxy_notify_close_app(ten_env_t *ten_env, TEN_ASSERT(rc, "Should not happen."); } +static void ten_env_proxy_notify_start(ten_env_t *ten_env, void *user_data) { + TEN_ASSERT( + ten_env && + ten_env_check_integrity( + ten_env, + ten_env->attach_to != TEN_ENV_ATTACH_TO_ADDON ? true : false), + "Should not happen."); + + ten_extension_test_new_t *test_info = user_data; + TEN_ASSERT(test_info, "Should not happen."); + + ten_shared_ptr_t *start_graph_cmd = ten_cmd_start_graph_create(); + TEN_ASSERT(start_graph_cmd, "Should not happen."); + + // Set the destination so that the recipient is the app itself. + bool rc = ten_msg_clear_and_set_dest(start_graph_cmd, TEN_STR_LOCALHOST, NULL, + NULL, NULL, NULL, NULL); + TEN_ASSERT(rc, "Should not happen."); + + ten_string_t start_graph_cmd_json_str; + ten_string_init_formatted( + &start_graph_cmd_json_str, + "{\ + \"_ten\": {\ + \"type\": \"start_graph\",\ + \"nodes\": [{\ + \"type\": \"extension\",\ + \"name\": \"%s\",\ + \"addon\": \"%s\",\ + \"extension_group\": \"default_extension_group\",\ + \"app\": \"localhost\"\ + }]\ + }\ + }", + ten_string_get_raw_str(&test_info->test_extension_addon_name), + ten_string_get_raw_str(&test_info->test_extension_addon_name)); + + ten_json_t *start_graph_cmd_json = ten_json_from_string( + ten_string_get_raw_str(&start_graph_cmd_json_str), NULL); + + ten_string_deinit(&start_graph_cmd_json_str); + + rc = ten_msg_from_json(start_graph_cmd, start_graph_cmd_json, NULL); + TEN_ASSERT(rc, "Should not happen."); + + ten_json_destroy(start_graph_cmd_json); + + rc = ten_env_send_cmd(ten_env, start_graph_cmd, NULL, NULL, NULL); + TEN_ASSERT(rc, "Should not happen."); +} + void ten_extension_test_destroy_new(ten_extension_test_new_t *self) { TEN_ASSERT(self, "Invalid argument."); TEN_ASSERT(self->test_app_ten_env_proxy, "Invalid argument."); @@ -171,9 +210,20 @@ void ten_extension_test_destroy_new(ten_extension_test_new_t *self) { TEN_ASSERT(self->test_app_ten_env_proxy == NULL, "Should not happen."); ten_event_destroy(self->test_app_ten_env_proxy_create_completed); + ten_string_deinit(&self->test_extension_addon_name); + TEN_FREE(self); } +void ten_extension_test_start_new(ten_extension_test_new_t *self) { + TEN_ASSERT(self, "Invalid argument."); + TEN_ASSERT(self->test_app_ten_env_proxy, "Invalid argument."); + + bool rc = ten_env_proxy_notify(self->test_app_ten_env_proxy, + ten_env_proxy_notify_start, self, false, NULL); + TEN_ASSERT(rc, "Should not happen."); +} + ten_extension_test_t *ten_extension_test_create( ten_extension_t *test_extension, ten_extension_t *target_extension) { ten_extension_test_t *self = TEN_MALLOC(sizeof(ten_extension_test_t)); diff --git a/tests/ten_runtime/smoke/standalone_test/new.cc b/tests/ten_runtime/smoke/standalone_test/new.cc index 5ed25ac97..c6614b655 100644 --- a/tests/ten_runtime/smoke/standalone_test/new.cc +++ b/tests/ten_runtime/smoke/standalone_test/new.cc @@ -37,5 +37,7 @@ TEN_CPP_REGISTER_ADDON_AS_EXTENSION(standalone_test_new__test_extension_1, TEST(StandaloneTest, New) { // NOLINT ten_extension_test_new_t *test = ten_extension_test_create_new(); + ten_extension_test_add_addon(test, "standalone_test_new__test_extension_1"); + ten_extension_test_start_new(test); ten_extension_test_destroy_new(test); } From 54ab8db16fee2c7928556eef42ab364e2b852f85 Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Fri, 27 Sep 2024 20:59:14 +0800 Subject: [PATCH 2/3] refactor!: refine extension startup flow With the introduction of the on_configure() initialization phase, the startup process of the extension has been better planned. - on_configure ~ on_configure_done + on_init ~ on_init_done: Handles its own initialization; cannot send or receive messages. The reason for this is that, before `on_init_done`, the extension may not be ready to handle external requests, so the received messages need to be temporarily stored. - ~ on_start: The messages received before on_start() will be temporarily stored, and only after on_start() is called will they be sent to the extension. The reason for this is developers generally expect `on_start` to occur before any `on_cmd` events. - on_start ~ on_stop_done: Normal sending and receiving of all messages and results. BREAKING CHANGE: The initialization actions that were originally performed in on_start() need to be moved to on_init(). This is because once on_start() is called, on_cmd() will start being received. --- .../ten_runtime/extension/extension.h | 53 +-- .../extension/extension_cb_default.h | 34 -- .../ten_runtime/extension/on_xxx.h | 22 -- .../extension_context/extension_context.h | 1 - .../internal/extension_group_is_initted.h | 13 - .../extension_thread/extension_thread.h | 21 +- .../extension_thread/msg_interface/common.h | 2 +- .../ten_runtime/extension_thread/on_xxx.h | 15 - .../ten_runtime/engine/msg_interface/common.c | 2 +- core/src/ten_runtime/extension/extension.c | 78 +++- .../ten_runtime/extension/internal/close.c | 2 +- .../extension/internal/extension_cb_default.c | 92 ----- core/src/ten_runtime/extension/msg_handling.c | 5 +- .../ten_runtime/extension/ten_env/on_xxx.c | 180 +++++++--- .../extension_context/extension_context.c | 1 - .../internal/extension_group_is_initted.c | 71 ---- .../extension_group/extension_group.c | 8 + .../extension_group/ten_env/on_xxx.c | 24 +- .../extension_thread/extension_thread.c | 64 ++-- .../extension_thread/msg_interface/common.c | 162 ++++----- .../src/ten_runtime/extension_thread/on_xxx.c | 334 +----------------- .../src/ten_runtime/ten_env/internal/return.c | 6 +- core/src/ten_runtime/ten_env/internal/send.c | 6 +- core/src/ten_runtime/ten_env/ten_proxy.c | 4 +- .../basic/basic_extensions_init_dependency.cc | 13 +- .../ten_runtime/smoke/standalone_test/new.cc | 2 +- 26 files changed, 375 insertions(+), 840 deletions(-) delete mode 100644 core/include_internal/ten_runtime/extension/extension_cb_default.h delete mode 100644 core/include_internal/ten_runtime/extension_context/internal/extension_group_is_initted.h delete mode 100644 core/src/ten_runtime/extension/internal/extension_cb_default.c delete mode 100644 core/src/ten_runtime/extension_context/internal/extension_group_is_initted.c diff --git a/core/include_internal/ten_runtime/extension/extension.h b/core/include_internal/ten_runtime/extension/extension.h index c347ca0c2..6f1868bb5 100644 --- a/core/include_internal/ten_runtime/extension/extension.h +++ b/core/include_internal/ten_runtime/extension/extension.h @@ -43,22 +43,20 @@ typedef struct ten_timer_t ten_timer_t; // The relationship between several lifecycle stages and their connection to // sending messages: // -// - on_init ~ on_init_done: Handles its own initialization; cannot send or -// receive messages. +// - on_configure ~ on_configure_done + on_init ~ on_init_done: Handles its own +// initialization; cannot send or receive messages. The reason for this is +// that, before `on_init_done`, the extension may not be ready to handle +// external requests, so the received messages need to be temporarily stored. // -// [ After everyone has completed on_init_done, they will collectively move -// into on_start ] +// - ~ on_start: The messages received before on_start() will be temporarily +// stored, and only after on_start() is called will they be sent to the +// extension. The reason for this is developers generally expect `on_start` to +// occur before any `on_cmd` events. // -// - on_start ~ on_start_done: Can send messages and receive the results -// of sent messages, but cannot receive other messages. Since properties are -// initialized within on_start, you can perform initialization actions that -// depend on these properties being set up. However, as it's still in the -// initializing phase, it won't receive messages initiated by others, avoiding -// the need for various checks. You can actively send messages out, though. -// -// - After on_start_done ~ on_stop_done: Normal sending and receiving of all +// - on_start ~ on_stop_done: Normal sending and receiving of all // messages and results. // +// TODO(WEi): Does it still needs to be considered? // [ After everyone has completed on_stop_done, they will collectively move into // on_deinit. ] // @@ -66,13 +64,27 @@ typedef struct ten_timer_t ten_timer_t; // or receive messages. typedef enum TEN_EXTENSION_STATE { TEN_EXTENSION_STATE_INIT, - TEN_EXTENSION_STATE_CONFIGURED, // on_configure_done is completed. - TEN_EXTENSION_STATE_INITTED, // on_init_done() is completed. - TEN_EXTENSION_STATE_STARTED, // on_start_done() is completed. - TEN_EXTENSION_STATE_CLOSING, // on_stop_done() is completed and could proceed - // to be closed. - TEN_EXTENSION_STATE_DEINITING, // on_deinit() is started. - TEN_EXTENSION_STATE_DEINITTED, // on_deinit_done() is called. + + // on_configure_done() is completed. + TEN_EXTENSION_STATE_ON_CONFIGURE_DONE, + + // on_init_done() is completed. + TEN_EXTENSION_STATE_ON_INIT_DONE, + + // on_start() is called. + TEN_EXTENSION_STATE_ON_START, + + // on_start_done() is completed. + TEN_EXTENSION_STATE_ON_START_DONE, + + // on_stop_done() is completed and could proceed to be closed. + TEN_EXTENSION_STATE_CLOSING, + + // on_deinit() is called. + TEN_EXTENSION_STATE_ON_DEINIT, + + // on_deinit_done() is called. + TEN_EXTENSION_STATE_ON_DEINIT_DONE, } TEN_EXTENSION_STATE; struct ten_extension_t { @@ -214,9 +226,6 @@ struct ten_extension_t { // @} }; -TEN_RUNTIME_PRIVATE_API void ten_extension_set_state(ten_extension_t *self, - TEN_EXTENSION_STATE state); - TEN_RUNTIME_PRIVATE_API void ten_extension_determine_all_dest_extension( ten_extension_t *self, ten_extension_context_t *extension_context); diff --git a/core/include_internal/ten_runtime/extension/extension_cb_default.h b/core/include_internal/ten_runtime/extension/extension_cb_default.h deleted file mode 100644 index d59427a86..000000000 --- a/core/include_internal/ten_runtime/extension/extension_cb_default.h +++ /dev/null @@ -1,34 +0,0 @@ -// -// Copyright © 2024 Agora -// This file is part of TEN Framework, an open source project. -// Licensed under the Apache License, Version 2.0, with certain conditions. -// Refer to the "LICENSE" file in the root directory for more information. -// -#pragma once - -#include "ten_runtime/ten_config.h" - -#include "ten_runtime/extension/extension.h" - -typedef struct ten_env_t ten_env_t; - -TEN_RUNTIME_PRIVATE_API void ten_extension_on_start_default( - ten_extension_t *self, ten_env_t *ten_env); - -TEN_RUNTIME_PRIVATE_API void ten_extension_on_stop_default( - ten_extension_t *self, ten_env_t *ten_env); - -TEN_RUNTIME_PRIVATE_API void ten_extension_on_deinit_default( - ten_extension_t *self, ten_env_t *ten_env); - -TEN_RUNTIME_PRIVATE_API void ten_extension_on_cmd_default( - ten_extension_t *self, ten_env_t *ten_env, ten_shared_ptr_t *cmd); - -TEN_RUNTIME_PRIVATE_API void ten_extension_on_data_default( - ten_extension_t *self, ten_env_t *ten_env, ten_shared_ptr_t *data); - -TEN_RUNTIME_PRIVATE_API void ten_extension_on_audio_frame_default( - ten_extension_t *self, ten_env_t *ten_env, ten_shared_ptr_t *frame); - -TEN_RUNTIME_PRIVATE_API void ten_extension_on_video_frame_default( - ten_extension_t *self, ten_env_t *ten_env, ten_shared_ptr_t *frame); \ No newline at end of file diff --git a/core/include_internal/ten_runtime/extension/on_xxx.h b/core/include_internal/ten_runtime/extension/on_xxx.h index 6f43f2b2f..49cf66b2f 100644 --- a/core/include_internal/ten_runtime/extension/on_xxx.h +++ b/core/include_internal/ten_runtime/extension/on_xxx.h @@ -10,22 +10,6 @@ #include "ten_runtime/ten_env/ten_env.h" -/** - * @brief Indicate that extension on_configure is completed. - */ -typedef struct ten_extension_on_configure_done_t { - // Indicates which extension's on_configure ends. - ten_extension_t *extension; -} ten_extension_on_configure_done_t; - -/** - * @brief Indicate that extension on_init is completed. - */ -typedef struct ten_extension_on_init_done_t { - // Indicates which extension's on_init ends. - ten_extension_t *extension; -} ten_extension_on_init_done_t; - /** * @brief Indicate that extension on_start/on_stop/on_deinit is completed. */ @@ -49,9 +33,3 @@ TEN_RUNTIME_PRIVATE_API void ten_extension_on_start_done(ten_env_t *self); TEN_RUNTIME_PRIVATE_API void ten_extension_on_stop_done(ten_env_t *self); TEN_RUNTIME_PRIVATE_API void ten_extension_on_deinit_done(ten_env_t *self); - -TEN_RUNTIME_PRIVATE_API void ten_extension_on_configure_done_destroy( - ten_extension_on_configure_done_t *self); - -TEN_RUNTIME_PRIVATE_API void ten_extension_on_init_done_destroy( - ten_extension_on_init_done_t *self); diff --git a/core/include_internal/ten_runtime/extension_context/extension_context.h b/core/include_internal/ten_runtime/extension_context/extension_context.h index 54c12ba29..eabd3fe52 100644 --- a/core/include_internal/ten_runtime/extension_context/extension_context.h +++ b/core/include_internal/ten_runtime/extension_context/extension_context.h @@ -49,7 +49,6 @@ struct ten_extension_context_t { size_t extension_threads_cnt_of_initted; size_t extension_threads_cnt_of_all_extensions_added_to_engine; size_t extension_threads_cnt_of_all_extensions_stopped; - size_t extension_threads_cnt_of_all_extensions_initted; size_t extension_threads_cnt_of_closing_flag_is_set; size_t extension_threads_cnt_of_closed; diff --git a/core/include_internal/ten_runtime/extension_context/internal/extension_group_is_initted.h b/core/include_internal/ten_runtime/extension_context/internal/extension_group_is_initted.h deleted file mode 100644 index 206bdd6e5..000000000 --- a/core/include_internal/ten_runtime/extension_context/internal/extension_group_is_initted.h +++ /dev/null @@ -1,13 +0,0 @@ -// -// Copyright © 2024 Agora -// This file is part of TEN Framework, an open source project. -// Licensed under the Apache License, Version 2.0, with certain conditions. -// Refer to the "LICENSE" file in the root directory for more information. -// -#pragma once - -#include "ten_runtime/ten_config.h" - -TEN_RUNTIME_PRIVATE_API void -ten_extension_context_on_all_extensions_in_extension_group_are_initted( - void *self_, void *arg); diff --git a/core/include_internal/ten_runtime/extension_thread/extension_thread.h b/core/include_internal/ten_runtime/extension_thread/extension_thread.h index dbdc2bbc6..e8e057ade 100644 --- a/core/include_internal/ten_runtime/extension_thread/extension_thread.h +++ b/core/include_internal/ten_runtime/extension_thread/extension_thread.h @@ -26,22 +26,15 @@ typedef struct ten_extension_context_t ten_extension_context_t; typedef struct ten_extension_t ten_extension_t; typedef enum TEN_EXTENSION_THREAD_STATE { - // All received messages will be kept in a temporary buffer, and wait until - // the state switched to NORMAL. TEN_EXTENSION_THREAD_STATE_INIT, - - // All received messages will be fed into the extensions directly. + TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS, TEN_EXTENSION_THREAD_STATE_NORMAL, - - // All the extensions have been started completely. The extension thread could - // be 'suspended' only in this state. - TEN_EXTENSION_THREAD_STATE_ALL_STARTED, - - // Give extension a chance to do something before the whole engine shuting - // down. TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE, - // All received messages will be dropped. + // All the extensions of the extension thread have been closed, so the + // extension thread can now proceed with its own closing flow. Additionally, + // since the extensions have been closed, any messages received by the + // extension thread from this point on will be directly dropped. TEN_EXTENSION_THREAD_STATE_CLOSING, // The closing procedure is completed, so the extension thread can be @@ -59,6 +52,7 @@ typedef struct ten_extension_thread_t { ten_sanitizer_thread_check_t thread_check; TEN_EXTENSION_THREAD_STATE state; + bool is_close_triggered; ten_mutex_t *lock_mode_lock; bool in_lock_mode; @@ -68,8 +62,6 @@ typedef struct ten_extension_thread_t { ten_list_t extensions; // ten_extension_t* size_t extensions_cnt_of_added_to_engine; size_t extensions_cnt_of_deleted_from_engine; - size_t extensions_cnt_of_on_init_done; - size_t extensions_cnt_of_on_start_done; size_t extensions_cnt_of_on_stop_done; size_t extensions_cnt_of_set_closing_flag; @@ -81,7 +73,6 @@ typedef struct ten_extension_thread_t { ten_extension_context_t *extension_context; ten_runloop_t *runloop; - ten_event_t *runloop_is_ready_to_use; } ten_extension_thread_t; diff --git a/core/include_internal/ten_runtime/extension_thread/msg_interface/common.h b/core/include_internal/ten_runtime/extension_thread/msg_interface/common.h index 00ca52d21..d0e3acf80 100644 --- a/core/include_internal/ten_runtime/extension_thread/msg_interface/common.h +++ b/core/include_internal/ten_runtime/extension_thread/msg_interface/common.h @@ -14,7 +14,7 @@ typedef struct ten_extension_thread_t ten_extension_thread_t; -TEN_RUNTIME_PRIVATE_API void ten_extension_thread_handle_msg_async( +TEN_RUNTIME_PRIVATE_API void ten_extension_thread_handle_in_msg_async( ten_extension_thread_t *self, ten_shared_ptr_t *msg); TEN_RUNTIME_PRIVATE_API void ten_extension_thread_dispatch_msg( diff --git a/core/include_internal/ten_runtime/extension_thread/on_xxx.h b/core/include_internal/ten_runtime/extension_thread/on_xxx.h index 7dd246756..81b7944af 100644 --- a/core/include_internal/ten_runtime/extension_thread/on_xxx.h +++ b/core/include_internal/ten_runtime/extension_thread/on_xxx.h @@ -35,21 +35,9 @@ ten_extension_thread_on_extension_deleted_from_engine(void *self_, void *arg); TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_extension_group_on_init_done(void *self_, void *arg); -TEN_RUNTIME_PRIVATE_API void -ten_extension_thread_on_extension_on_configure_done(void *self_, void *arg); - -TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_extension_on_init_done( - void *self_, void *arg); - -TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_extension_on_start_done( - void *self_, void *arg); - TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_extension_on_stop_done( void *self_, void *arg); -TEN_RUNTIME_PRIVATE_API void ten_extension_thread_call_all_extensions_on_start( - void *self_, void *arg); - TEN_RUNTIME_PRIVATE_API void ten_extension_thread_call_all_extensions_on_deinit( void *self_, void *arg); @@ -72,9 +60,6 @@ ten_extension_thread_on_extension_group_on_deinit_done(void *self_, void *arg); TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_all_extensions_deleted( void *self_, void *arg); -TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_all_extensions_created( - void *self_, void *arg); - TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_addon_create_extension_done(void *self_, void *arg); diff --git a/core/src/ten_runtime/engine/msg_interface/common.c b/core/src/ten_runtime/engine/msg_interface/common.c index 31171d366..0ba3e5d73 100644 --- a/core/src/ten_runtime/engine/msg_interface/common.c +++ b/core/src/ten_runtime/engine/msg_interface/common.c @@ -305,7 +305,7 @@ void ten_engine_dispatch_msg(ten_engine_t *self, ten_shared_ptr_t *msg) { &dest_loc->extension_group_name)) { // Find the correct extension thread, ask it to handle the message. found = true; - ten_extension_thread_handle_msg_async( + ten_extension_thread_handle_in_msg_async( extension_group->extension_thread, msg); break; } diff --git a/core/src/ten_runtime/extension/extension.c b/core/src/ten_runtime/extension/extension.c index cc25157f7..61b19a640 100644 --- a/core/src/ten_runtime/extension/extension.c +++ b/core/src/ten_runtime/extension/extension.c @@ -13,11 +13,11 @@ #include "include_internal/ten_runtime/addon/addon.h" #include "include_internal/ten_runtime/common/loc.h" #include "include_internal/ten_runtime/engine/engine.h" -#include "include_internal/ten_runtime/extension/extension_cb_default.h" #include "include_internal/ten_runtime/extension/extension_hdr.h" #include "include_internal/ten_runtime/extension/extension_info/extension_info.h" #include "include_internal/ten_runtime/extension/msg_dest_info/json.h" #include "include_internal/ten_runtime/extension/msg_dest_info/msg_dest_info.h" +#include "include_internal/ten_runtime/extension/msg_handling.h" #include "include_internal/ten_runtime/extension/on_xxx.h" #include "include_internal/ten_runtime/extension_context/extension_context.h" #include "include_internal/ten_runtime/extension_group/extension_group.h" @@ -354,7 +354,7 @@ bool ten_extension_determine_and_merge_all_interface_dest_extension( ten_extension_t *self) { TEN_ASSERT(self && ten_extension_check_integrity(self, true), "Invalid argument."); - TEN_ASSERT(self->state == TEN_EXTENSION_STATE_CONFIGURED, + TEN_ASSERT(self->state == TEN_EXTENSION_STATE_ON_CONFIGURE_DONE, "Extension should be on_configure_done."); if (!self->extension_info) { @@ -914,6 +914,43 @@ void ten_extension_on_init(ten_env_t *ten_env) { } } +static void ten_extension_flush_all_pending_msgs(ten_extension_t *self) { + TEN_ASSERT(self, "Invalid argument."); + TEN_ASSERT(ten_extension_check_integrity(self, true), + "Invalid use of extension %p.", self); + + // The developer expects that on_start() will execute before all on_cmd() + // events. Therefore, after on_start() has been executed, there is no need + // to wait for on_start_done() before sending all previously buffered + // messages into the extension. + + // Flush the previously got messages, which are received before + // on_start_done(), into the extension. + ten_extension_thread_t *extension_thread = self->extension_thread; + ten_list_foreach (&extension_thread->pending_msgs, iter) { + ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node); + TEN_ASSERT(msg, "Should not happen."); + + ten_loc_t *dest_loc = ten_msg_get_first_dest_loc(msg); + TEN_ASSERT(dest_loc, "Should not happen."); + + if (ten_string_is_equal(&dest_loc->extension_name, &self->name)) { + ten_extension_handle_in_msg(self, msg); + ten_list_remove_node(&extension_thread->pending_msgs, iter.node); + } + } + + // Flush the previously got messages, which are received before + // on_start_done(), into the extension. + ten_list_foreach (&self->pending_msgs, iter) { + ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node); + TEN_ASSERT(msg, "Should not happen."); + + ten_extension_handle_in_msg(self, msg); + } + ten_list_clear(&self->pending_msgs); +} + void ten_extension_on_start(ten_extension_t *self) { TEN_ASSERT(self, "Invalid argument."); TEN_ASSERT(ten_extension_check_integrity(self, true), @@ -921,10 +958,16 @@ void ten_extension_on_start(ten_extension_t *self) { TEN_LOGI("[%s] on_start().", ten_extension_get_name(self)); + self->state = TEN_EXTENSION_STATE_ON_START; + if (self->on_start) { self->on_start(self, self->ten_env); + + ten_extension_flush_all_pending_msgs(self); } else { - ten_extension_on_start_default(self, self->ten_env); + ten_extension_flush_all_pending_msgs(self); + + ten_extension_on_start_done(self->ten_env); } } @@ -938,7 +981,7 @@ void ten_extension_on_stop(ten_extension_t *self) { if (self->on_stop) { self->on_stop(self, self->ten_env); } else { - ten_extension_on_stop_default(self, self->ten_env); + ten_extension_on_stop_done(self->ten_env); } } @@ -949,12 +992,12 @@ void ten_extension_on_deinit(ten_extension_t *self) { TEN_LOGD("[%s] on_deinit().", ten_extension_get_name(self)); - ten_extension_set_state(self, TEN_EXTENSION_STATE_DEINITING); + self->state = TEN_EXTENSION_STATE_ON_DEINIT; if (self->on_deinit) { self->on_deinit(self, self->ten_env); } else { - ten_extension_on_deinit_default(self, self->ten_env); + ten_extension_on_deinit_done(self->ten_env); } } @@ -969,7 +1012,12 @@ void ten_extension_on_cmd(ten_extension_t *self, ten_shared_ptr_t *msg) { if (self->on_cmd) { self->on_cmd(self, self->ten_env, msg); } else { - ten_extension_on_cmd_default(self, self->ten_env, msg); + // The default behavior of 'on_cmd' is to _not_ forward this command out, + // and return an 'OK' result to the previous stage. + ten_shared_ptr_t *cmd_result = + ten_cmd_result_create_from_cmd(TEN_STATUS_CODE_OK, msg); + ten_env_return_result(self->ten_env, cmd_result, msg, NULL); + ten_shared_ptr_destroy(cmd_result); } } @@ -984,7 +1032,8 @@ void ten_extension_on_data(ten_extension_t *self, ten_shared_ptr_t *msg) { if (self->on_data) { self->on_data(self, self->ten_env, msg); } else { - ten_extension_on_data_default(self, self->ten_env, msg); + // Bypass the data. + ten_env_send_data(self->ten_env, msg, NULL); } } @@ -1000,7 +1049,8 @@ void ten_extension_on_video_frame(ten_extension_t *self, if (self->on_video_frame) { self->on_video_frame(self, self->ten_env, msg); } else { - ten_extension_on_video_frame_default(self, self->ten_env, msg); + // Bypass the video frame. + ten_env_send_video_frame(self->ten_env, msg, NULL); } } @@ -1016,7 +1066,8 @@ void ten_extension_on_audio_frame(ten_extension_t *self, if (self->on_audio_frame) { self->on_audio_frame(self, self->ten_env, msg); } else { - ten_extension_on_audio_frame_default(self, self->ten_env, msg); + // Bypass the audio frame. + ten_env_send_audio_frame(self->ten_env, msg, NULL); } } @@ -1141,13 +1192,6 @@ ten_string_t *ten_extension_get_base_dir(ten_extension_t *self) { return &self->base_dir; } -void ten_extension_set_state(ten_extension_t *self, TEN_EXTENSION_STATE state) { - TEN_ASSERT(self && ten_extension_check_integrity(self, true), - "Invalid argument."); - - self->state = state; -} - bool ten_extension_validate_msg_schema(ten_extension_t *self, ten_shared_ptr_t *msg, bool is_msg_out, ten_error_t *err) { diff --git a/core/src/ten_runtime/extension/internal/close.c b/core/src/ten_runtime/extension/internal/close.c index 6891dbe9b..e895ff268 100644 --- a/core/src/ten_runtime/extension/internal/close.c +++ b/core/src/ten_runtime/extension/internal/close.c @@ -31,7 +31,7 @@ static void ten_extension_do_close(ten_extension_t *self) { ten_extension_thread_check_integrity(extension_thread, true), "Should not happen."); - ten_extension_set_state(self, TEN_EXTENSION_STATE_CLOSING); + self->state = TEN_EXTENSION_STATE_CLOSING; ten_runloop_post_task_tail(ten_extension_get_attached_runloop(self), ten_extension_thread_on_extension_set_closing_flag, diff --git a/core/src/ten_runtime/extension/internal/extension_cb_default.c b/core/src/ten_runtime/extension/internal/extension_cb_default.c deleted file mode 100644 index d194b98ca..000000000 --- a/core/src/ten_runtime/extension/internal/extension_cb_default.c +++ /dev/null @@ -1,92 +0,0 @@ -// -// Copyright © 2024 Agora -// This file is part of TEN Framework, an open source project. -// Licensed under the Apache License, Version 2.0, with certain conditions. -// Refer to the "LICENSE" file in the root directory for more information. -// -#include "include_internal/ten_runtime/extension/extension_cb_default.h" - -#include "ten_utils/macro/check.h" -#include "ten_runtime/common/status_code.h" -#include "ten_runtime/msg/cmd_result/cmd_result.h" -#include "ten_runtime/ten_env/internal/on_xxx_done.h" -#include "ten_runtime/ten_env/internal/return.h" -#include "ten_runtime/ten_env/internal/send.h" -#include "ten_runtime/ten_env/ten_env.h" -#include "ten_utils/macro/mark.h" - -void ten_extension_on_init_default(ten_extension_t *self, ten_env_t *ten_env) { - TEN_ASSERT(self && ten_extension_check_integrity(self, true), - "Should not happen."); - TEN_ASSERT(ten_env, "Should not happen."); - - ten_env_on_init_done(ten_env, NULL); -} - -void ten_extension_on_start_default(ten_extension_t *self, ten_env_t *ten_env) { - TEN_ASSERT(self && ten_extension_check_integrity(self, true), - "Should not happen."); - TEN_ASSERT(ten_env, "Should not happen."); - - ten_env_on_start_done(ten_env, NULL); -} - -void ten_extension_on_stop_default(ten_extension_t *self, ten_env_t *ten_env) { - TEN_ASSERT(self && ten_extension_check_integrity(self, true) && ten_env, - "Should not happen."); - - ten_env_on_stop_done(ten_env, NULL); -} - -void ten_extension_on_deinit_default(ten_extension_t *self, - ten_env_t *ten_env) { - TEN_ASSERT(self && ten_extension_check_integrity(self, true) && ten_env, - "Should not happen."); - - ten_env_on_deinit_done(ten_env, NULL); -} - -void ten_extension_on_cmd_default(TEN_UNUSED ten_extension_t *self, - TEN_UNUSED ten_env_t *ten_env, - TEN_UNUSED ten_shared_ptr_t *cmd) { - TEN_ASSERT( - self && ten_extension_check_integrity(self, true) && ten_env && cmd, - "Should not happen."); - - // The default behavior of 'on_cmd' is to _not_ forward this command out, and - // return an 'OK' result to the previous stage. - ten_shared_ptr_t *cmd_result = - ten_cmd_result_create_from_cmd(TEN_STATUS_CODE_OK, cmd); - ten_env_return_result(ten_env, cmd_result, cmd, NULL); - ten_shared_ptr_destroy(cmd_result); -} - -void ten_extension_on_data_default(TEN_UNUSED ten_extension_t *self, - TEN_UNUSED ten_env_t *ten_env, - ten_shared_ptr_t *data) { - TEN_ASSERT( - self && ten_extension_check_integrity(self, true) && ten_env && data, - "Should not happen."); - // Bypass the data. - ten_env_send_data(ten_env, data, NULL); -} - -void ten_extension_on_audio_frame_default(TEN_UNUSED ten_extension_t *self, - TEN_UNUSED ten_env_t *ten_env, - ten_shared_ptr_t *frame) { - TEN_ASSERT( - self && ten_extension_check_integrity(self, true) && ten_env && frame, - "Should not happen."); - // Bypass the audio frame. - ten_env_send_audio_frame(ten_env, frame, NULL); -} - -void ten_extension_on_video_frame_default(ten_extension_t *self, - ten_env_t *ten_env, - ten_shared_ptr_t *frame) { - TEN_ASSERT( - self && ten_extension_check_integrity(self, true) && ten_env && frame, - "Should not happen."); - // Bypass the video frame. - ten_env_send_video_frame(ten_env, frame, NULL); -} diff --git a/core/src/ten_runtime/extension/msg_handling.c b/core/src/ten_runtime/extension/msg_handling.c index 9fa502dfb..01ef27608 100644 --- a/core/src/ten_runtime/extension/msg_handling.c +++ b/core/src/ten_runtime/extension/msg_handling.c @@ -82,9 +82,8 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) { // command sent by this extension in on_start() can be delivered to this // extension before its on_start_done(). - if (!(self->state == TEN_EXTENSION_STATE_STARTED) && - !(ten_msg_is_cmd_result(msg) && - ten_msg_get_type(msg) == TEN_MSG_TYPE_CMD_RESULT)) { + if (self->state < TEN_EXTENSION_STATE_ON_START && + !ten_msg_is_cmd_result(msg)) { // The extension is not started, and the msg is not a cmd result, so // cache the msg to the pending list. ten_list_push_smart_ptr_back(&self->pending_msgs, msg); diff --git a/core/src/ten_runtime/extension/ten_env/on_xxx.c b/core/src/ten_runtime/extension/ten_env/on_xxx.c index a7b30b43f..326fbfa95 100644 --- a/core/src/ten_runtime/extension/ten_env/on_xxx.c +++ b/core/src/ten_runtime/extension/ten_env/on_xxx.c @@ -8,27 +8,65 @@ #include "include_internal/ten_runtime/common/loc.h" #include "include_internal/ten_runtime/extension/extension.h" +#include "include_internal/ten_runtime/extension/metadata.h" #include "include_internal/ten_runtime/extension/path_timer.h" #include "include_internal/ten_runtime/extension_thread/extension_thread.h" #include "include_internal/ten_runtime/extension_thread/msg_interface/common.h" #include "include_internal/ten_runtime/extension_thread/on_xxx.h" #include "include_internal/ten_runtime/metadata/metadata_info.h" #include "include_internal/ten_runtime/ten_env/ten_env.h" +#include "include_internal/ten_runtime/timer/timer.h" #include "ten_utils/lib/alloc.h" #include "ten_utils/macro/check.h" -static ten_extension_on_configure_done_t * -ten_extension_on_configure_done_create(ten_extension_t *extension) { - TEN_ASSERT(extension && ten_extension_check_integrity(extension, true), +static bool ten_extension_parse_interface_schema(ten_extension_t *self, + ten_value_t *api_definition, + ten_error_t *err) { + TEN_ASSERT(self && ten_extension_check_integrity(self, true), + "Invalid argument."); + TEN_ASSERT(api_definition && ten_value_check_integrity(api_definition), + "Invalid argument."); + + bool result = ten_schema_store_set_interface_schema_definition( + &self->schema_store, api_definition, + ten_string_get_raw_str(&self->base_dir), err); + if (!result) { + TEN_LOGW("[%s] Failed to set interface schema definition: %s.", + ten_extension_get_name(self), ten_error_errmsg(err)); + } + + return result; +} + +static void ten_extension_adjust_and_validate_property_on_configure_done( + ten_extension_t *self) { + TEN_ASSERT(self && ten_extension_check_integrity(self, true), "Should not happen."); - ten_extension_on_configure_done_t *on_configure_done = - TEN_MALLOC(sizeof(ten_extension_on_configure_done_t)); - TEN_ASSERT(on_configure_done, "Failed to allocate memory."); + ten_error_t err; + ten_error_init(&err); + + bool success = ten_schema_store_adjust_properties(&self->schema_store, + &self->property, &err); + if (!success) { + TEN_LOGW("[%s] Failed to adjust property type: %s.", + ten_extension_get_name(self), ten_error_errmsg(&err)); + goto done; + } - on_configure_done->extension = extension; + success = ten_schema_store_validate_properties(&self->schema_store, + &self->property, &err); + if (!success) { + TEN_LOGW("[%s] Invalid property: %s.", ten_extension_get_name(self), + ten_error_errmsg(&err)); + goto done; + } - return on_configure_done; +done: + ten_error_deinit(&err); + if (!success) { + TEN_ASSERT(0, "Invalid property."); + } } void ten_extension_on_configure_done(ten_env_t *self) { @@ -43,32 +81,82 @@ void ten_extension_on_configure_done(ten_env_t *self) { TEN_LOGD("[%s] on_configure() done.", ten_extension_get_name(extension)); + extension->state = TEN_EXTENSION_STATE_ON_CONFIGURE_DONE; + ten_extension_thread_t *extension_thread = extension->extension_thread; TEN_ASSERT(extension_thread && ten_extension_thread_check_integrity(extension_thread, true), "Should not happen."); - ten_extension_on_configure_done_t *on_configure_done = - ten_extension_on_configure_done_create(extension); + if (extension_thread->is_close_triggered) { + // Do not proceed with the subsequent init/start flow, as the extension + // thread is about to shut down. + return; + } + + ten_error_t err; + ten_error_init(&err); - ten_runloop_post_task_tail( - ten_extension_get_attached_runloop(extension), - ten_extension_thread_on_extension_on_configure_done, extension_thread, - on_configure_done); -} + bool rc = ten_handle_manifest_info_when_on_configure_done( + &extension->manifest_info, + ten_string_get_raw_str(ten_extension_get_base_dir(extension)), + &extension->manifest, &err); + if (!rc) { + TEN_LOGW("Failed to load extension manifest data, FATAL ERROR."); + exit(EXIT_FAILURE); + } -static ten_extension_on_init_done_t *ten_extension_on_init_done_create( - ten_extension_t *extension) { - TEN_ASSERT(extension && ten_extension_check_integrity(extension, true), - "Should not happen."); + rc = ten_handle_property_info_when_on_configure_done( + &extension->property_info, + ten_string_get_raw_str(ten_extension_get_base_dir(extension)), + &extension->property, &err); + if (!rc) { + TEN_LOGW("Failed to load extension property data, FATAL ERROR."); + exit(EXIT_FAILURE); + } + + rc = ten_extension_resolve_properties_in_graph(extension, &err); + TEN_ASSERT(rc, "Failed to resolve properties in graph."); + + ten_extension_merge_properties_from_graph(extension); + + rc = ten_extension_handle_ten_namespace_properties( + extension, extension->extension_context); + TEN_ASSERT(rc, "[%s] Failed to handle '_ten' properties.", + ten_string_get_raw_str(&extension->name)); + + ten_value_t *api_definition = ten_metadata_init_schema_store( + &extension->manifest, &extension->schema_store); + if (api_definition) { + bool success = + ten_extension_parse_interface_schema(extension, api_definition, &err); + TEN_ASSERT(success, "Failed to parse interface schema."); + } + + ten_extension_adjust_and_validate_property_on_configure_done(extension); - ten_extension_on_init_done_t *on_init_done = - TEN_MALLOC(sizeof(ten_extension_on_init_done_t)); - TEN_ASSERT(on_init_done, "Failed to allocate memory."); + // Create timers for automatically cleaning expired IN_PATHs and OUT_PATHs. + ten_timer_t *in_path_timer = + ten_extension_create_timer_for_in_path(extension); + ten_list_push_ptr_back(&extension->path_timers, in_path_timer, NULL); + ten_timer_enable(in_path_timer); - on_init_done->extension = extension; + ten_timer_t *out_path_timer = + ten_extension_create_timer_for_out_path(extension); + ten_list_push_ptr_back(&extension->path_timers, out_path_timer, NULL); + ten_timer_enable(out_path_timer); - return on_init_done; + // The interface info has been resolved, and extensions might send msg out + // during `on_start()`, so it's the best time to merge the interface info to + // the extension_info. + rc = + ten_extension_determine_and_merge_all_interface_dest_extension(extension); + TEN_ASSERT(rc, "Should not happen."); + + // Trigger the extension on_init flow. + ten_extension_on_init(extension->ten_env); + + ten_error_deinit(&err); } void ten_extension_on_init_done(ten_env_t *self) { @@ -83,17 +171,19 @@ void ten_extension_on_init_done(ten_env_t *self) { TEN_LOGD("[%s] on_init() done.", ten_extension_get_name(extension)); + extension->state = TEN_EXTENSION_STATE_ON_INIT_DONE; + ten_extension_thread_t *extension_thread = extension->extension_thread; TEN_ASSERT(extension_thread && ten_extension_thread_check_integrity(extension_thread, true), "Should not happen."); - ten_extension_on_init_done_t *on_init_done = - ten_extension_on_init_done_create(extension); + if (extension_thread->is_close_triggered) { + return; + } - ten_runloop_post_task_tail(ten_extension_get_attached_runloop(extension), - ten_extension_thread_on_extension_on_init_done, - extension_thread, on_init_done); + // Trigger on_start of extension. + ten_extension_on_start(extension); } void ten_extension_on_start_done(ten_env_t *self) { @@ -108,18 +198,7 @@ void ten_extension_on_start_done(ten_env_t *self) { TEN_LOGI("[%s] on_start() done.", ten_extension_get_name(extension)); - ten_extension_thread_t *extension_thread = extension->extension_thread; - TEN_ASSERT(extension_thread && - ten_extension_thread_check_integrity(extension_thread, true), - "Should not happen."); - - // Notify that the extension is started completely. - ten_extension_on_start_stop_deinit_done_t *on_start_done = - ten_extension_on_start_stop_deinit_done_create(extension); - - ten_runloop_post_task_tail(ten_extension_get_attached_runloop(extension), - ten_extension_thread_on_extension_on_start_done, - extension_thread, on_start_done); + extension->state = TEN_EXTENSION_STATE_ON_START_DONE; } void ten_extension_on_stop_done(ten_env_t *self) { @@ -166,14 +245,14 @@ void ten_extension_on_deinit_done(ten_env_t *self) { return; } - TEN_ASSERT(extension->state >= TEN_EXTENSION_STATE_DEINITING, + TEN_ASSERT(extension->state >= TEN_EXTENSION_STATE_ON_DEINIT, "Should not happen."); - if (extension->state == TEN_EXTENSION_STATE_DEINITTED) { + if (extension->state == TEN_EXTENSION_STATE_ON_DEINIT_DONE) { return; } - ten_extension_set_state(extension, TEN_EXTENSION_STATE_DEINITTED); + extension->state = TEN_EXTENSION_STATE_ON_DEINIT_DONE; TEN_LOGD("[%s] on_deinit() done.", ten_extension_get_name(extension)); @@ -185,12 +264,6 @@ void ten_extension_on_deinit_done(ten_env_t *self) { extension->extension_thread, on_deinit_done); } -void ten_extension_on_init_done_destroy(ten_extension_on_init_done_t *self) { - TEN_ASSERT(self, "Should not happen."); - - TEN_FREE(self); -} - ten_extension_on_start_stop_deinit_done_t * ten_extension_on_start_stop_deinit_done_create(ten_extension_t *extension) { TEN_ASSERT(extension && ten_extension_check_integrity(extension, true), @@ -211,10 +284,3 @@ void ten_extension_on_start_stop_deinit_done_destroy( TEN_FREE(self); } - -void ten_extension_on_configure_done_destroy( - ten_extension_on_configure_done_t *self) { - TEN_ASSERT(self, "Should not happen."); - - TEN_FREE(self); -} diff --git a/core/src/ten_runtime/extension_context/extension_context.c b/core/src/ten_runtime/extension_context/extension_context.c index 9841eb779..6667b4ae5 100644 --- a/core/src/ten_runtime/extension_context/extension_context.c +++ b/core/src/ten_runtime/extension_context/extension_context.c @@ -91,7 +91,6 @@ ten_extension_context_t *ten_extension_context_create(ten_engine_t *engine) { self->extension_threads_cnt_of_initted = 0; self->extension_threads_cnt_of_all_extensions_added_to_engine = 0; - self->extension_threads_cnt_of_all_extensions_initted = 0; self->extension_threads_cnt_of_all_extensions_stopped = 0; self->extension_threads_cnt_of_closing_flag_is_set = 0; self->extension_threads_cnt_of_closed = 0; diff --git a/core/src/ten_runtime/extension_context/internal/extension_group_is_initted.c b/core/src/ten_runtime/extension_context/internal/extension_group_is_initted.c deleted file mode 100644 index 6882529fd..000000000 --- a/core/src/ten_runtime/extension_context/internal/extension_group_is_initted.c +++ /dev/null @@ -1,71 +0,0 @@ -// -// Copyright © 2024 Agora -// This file is part of TEN Framework, an open source project. -// Licensed under the Apache License, Version 2.0, with certain conditions. -// Refer to the "LICENSE" file in the root directory for more information. -// -#include "include_internal/ten_runtime/extension_context/internal/extension_group_is_initted.h" - -#include "include_internal/ten_runtime/engine/engine.h" -#include "include_internal/ten_runtime/extension/extension.h" -#include "include_internal/ten_runtime/extension_context/extension_context.h" -#include "include_internal/ten_runtime/extension_group/extension_group.h" -#include "include_internal/ten_runtime/extension_thread/extension_thread.h" -#include "include_internal/ten_runtime/extension_thread/on_xxx.h" -#include "ten_utils/container/list.h" -#include "ten_utils/container/list_node_ptr.h" -#include "ten_utils/macro/check.h" - -void ten_extension_context_on_all_extensions_in_extension_group_are_initted( - void *self_, void *arg) { - ten_extension_context_t *self = self_; - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT(ten_extension_context_check_integrity(self, true), - "Invalid use of extension_context %p.", self); - - ten_extension_group_t *extension_group = arg; - TEN_ASSERT(extension_group && - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: We only access the read-only fields of the - // extension_group in this function, so it's safe to use it in - // the engine thread. - ten_extension_group_check_integrity(extension_group, false), - "Should not happen."); - - TEN_LOGD("[%s] Engine is notified that %s is initted", - ten_engine_get_name(self->engine), - ten_string_get_raw_str(&extension_group->name)); - - self->extension_threads_cnt_of_all_extensions_initted++; - - // TODO(Wei): At present, we only check whether all extension threads in the - // same TEN app has reached the 'initted' state. And if this condition is met, - // the engine will enable all the extension threads (in the belonging TEN app) - // proceed to the 'on_start' stage. - // - // If, in the future, we need to ensure that all extension threads in _all_ - // the TEN app have reached the 'initted' state before entering into the - // 'on_start' stage, this location is the right place to add more logic about - // this. - - if (self->extension_threads_cnt_of_all_extensions_initted == - ten_list_size(&self->extension_threads)) { - TEN_LOGD( - "[%s] All extension threads enter 'all extensions are initted' state.", - ten_engine_get_name(self->engine)); - - ten_list_foreach (&self->extension_threads, iter) { - ten_extension_thread_t *thread = ten_ptr_listnode_get(iter.node); - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: We only access the read-only fields of the - // extension_thread in this function, so it's safe to use it in - // the engine thread. - TEN_ASSERT(thread && ten_extension_thread_check_integrity(thread, false), - "Should not happen."); - - ten_runloop_post_task_tail( - ten_extension_group_get_attached_runloop(thread->extension_group), - ten_extension_thread_call_all_extensions_on_start, thread, NULL); - } - } -} diff --git a/core/src/ten_runtime/extension_group/extension_group.c b/core/src/ten_runtime/extension_group/extension_group.c index 701640061..0d0d684d6 100644 --- a/core/src/ten_runtime/extension_group/extension_group.c +++ b/core/src/ten_runtime/extension_group/extension_group.c @@ -177,6 +177,14 @@ void ten_extension_group_create_extensions(ten_extension_group_t *self) { TEN_LOGD("[%s] create_extensions.", ten_extension_group_get_name(self)); + ten_extension_thread_t *extension_thread = self->extension_thread; + TEN_ASSERT(extension_thread, "Should not happen."); + TEN_ASSERT(ten_extension_thread_check_integrity(extension_thread, true), + "Should not happen."); + + ten_extension_thread_set_state( + extension_thread, TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS); + self->on_create_extensions(self, self->ten_env); } diff --git a/core/src/ten_runtime/extension_group/ten_env/on_xxx.c b/core/src/ten_runtime/extension_group/ten_env/on_xxx.c index 9e0cdcd99..1a0c8d022 100644 --- a/core/src/ten_runtime/extension_group/ten_env/on_xxx.c +++ b/core/src/ten_runtime/extension_group/ten_env/on_xxx.c @@ -142,12 +142,26 @@ void ten_extension_group_on_create_extensions_done(ten_extension_group_t *self, ten_extension_thread_check_integrity(extension_thread, true), "Should not happen."); - ten_list_t *arg = ten_list_create(); - ten_list_swap(arg, extensions); + ten_list_swap(&extension_thread->extensions, extensions); - ten_runloop_post_task_tail(ten_extension_group_get_attached_runloop(self), - ten_extension_thread_on_all_extensions_created, - extension_thread, arg); + ten_list_foreach (&extension_thread->extensions, iter) { + ten_extension_t *extension = ten_ptr_listnode_get(iter.node); + TEN_ASSERT(extension, "Invalid argument."); + + ten_extension_inherit_thread_ownership(extension, extension_thread); + TEN_ASSERT(ten_extension_check_integrity(extension, true), + "Invalid use of extension %p.", extension); + } + + ten_extension_thread_set_state(extension_thread, + TEN_EXTENSION_THREAD_STATE_NORMAL); + + if (extension_thread->is_close_triggered) { + return; + } + + ten_extension_thread_start_to_add_all_created_extension_to_engine( + extension_thread); } void ten_extension_group_on_destroy_extensions_done( diff --git a/core/src/ten_runtime/extension_thread/extension_thread.c b/core/src/ten_runtime/extension_thread/extension_thread.c index b0b0a783d..9e27709a4 100644 --- a/core/src/ten_runtime/extension_thread/extension_thread.c +++ b/core/src/ten_runtime/extension_thread/extension_thread.c @@ -84,6 +84,7 @@ ten_extension_thread_t *ten_extension_thread_create(void) { (ten_signature_t)TEN_EXTENSION_THREAD_SIGNATURE); self->state = TEN_EXTENSION_THREAD_STATE_INIT; + self->is_close_triggered = false; self->extension_context = NULL; self->extension_group = NULL; @@ -93,8 +94,6 @@ ten_extension_thread_t *ten_extension_thread_create(void) { ten_list_init(&self->extensions); self->extensions_cnt_of_added_to_engine = 0; self->extensions_cnt_of_deleted_from_engine = 0; - self->extensions_cnt_of_on_init_done = 0; - self->extensions_cnt_of_on_start_done = 0; self->extensions_cnt_of_on_stop_done = 0; self->extensions_cnt_of_set_closing_flag = 0; @@ -314,34 +313,45 @@ static void ten_extension_thread_on_triggering_close(void *self_, "Invalid use of extension_thread %p.", self); // The closing flow should be executed only once. - if (ten_extension_thread_get_state(self) >= - TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) { + if (self->is_close_triggered) { return; } - ten_extension_thread_set_state(self, - TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE); - - if (ten_list_size(&self->extensions)) { - // Loop for all the containing extensions, and call their on_stop(). - ten_list_foreach (&self->extensions, iter) { - ten_extension_t *extension = ten_ptr_listnode_get(iter.node); - TEN_ASSERT(ten_extension_check_integrity(extension, true), - "Should not happen."); - - ten_extension_on_stop(extension); - } - } else { - // TODO(Wei): There will be a time gap that needs to be handled. After the - // extension group starts to create_extensions, but before - // create_extensions_done, if a close command is received, a memory leak may - // occur (i.e., those created extensions). A possible solution is to enter a - // specific state when create_extensions is called. If a close command is - // received during this state, simply record it. Later, when - // create_extensions_done is reached, based on this state, decide whether to - // proceed normally or to destroy the created extensions and enter the - // standard extension group deinitialization process. - ten_extension_group_on_deinit(self->extension_group); + self->is_close_triggered = true; + + switch (self->state) { + case TEN_EXTENSION_THREAD_STATE_INIT: + // Enter the deinit flow of the extension group directly. + ten_extension_group_on_deinit(self->extension_group); + break; + + case TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS: + // We need to wait until `on_create_extensions_done()` is called, as that + // is the point when all the created extensions can be retrieved to begin + // the close process. Otherwise, memory leaks caused by those extensions + // may occur. + break; + + case TEN_EXTENSION_THREAD_STATE_NORMAL: + ten_extension_thread_set_state( + self, TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE); + + // Loop for all the containing extensions, and call their on_stop(). + ten_list_foreach (&self->extensions, iter) { + ten_extension_t *extension = ten_ptr_listnode_get(iter.node); + TEN_ASSERT(ten_extension_check_integrity(extension, true), + "Should not happen."); + + ten_extension_on_stop(extension); + } + break; + + case TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE: + case TEN_EXTENSION_THREAD_STATE_CLOSING: + case TEN_EXTENSION_THREAD_STATE_CLOSED: + default: + TEN_ASSERT(0, "Should not happen."); + break; } } diff --git a/core/src/ten_runtime/extension_thread/msg_interface/common.c b/core/src/ten_runtime/extension_thread/msg_interface/common.c index 997205765..2120e9f2a 100644 --- a/core/src/ten_runtime/extension_thread/msg_interface/common.c +++ b/core/src/ten_runtime/extension_thread/msg_interface/common.c @@ -21,17 +21,15 @@ #include "include_internal/ten_runtime/msg/cmd_base/cmd_base.h" #include "include_internal/ten_runtime/msg/msg.h" #include "include_internal/ten_utils/log/log.h" -#include "ten_utils/macro/check.h" #include "include_internal/ten_utils/value/value.h" #include "ten_runtime/app/app.h" -#include "ten_runtime/common/errno.h" #include "ten_runtime/msg/cmd_result/cmd_result.h" #include "ten_runtime/ten_env/ten_env.h" #include "ten_utils/io/runloop.h" -#include "ten_utils/lib/error.h" #include "ten_utils/lib/event.h" #include "ten_utils/lib/smart_ptr.h" #include "ten_utils/lib/string.h" +#include "ten_utils/macro/check.h" #include "ten_utils/macro/mark.h" void ten_extension_thread_handle_start_msg_task(void *self_, @@ -121,67 +119,51 @@ static void ten_extension_thread_handle_msg_task(void *self_, void *arg) { TEN_ASSERT(msg && ten_msg_check_integrity(msg), "Invalid argument."); TEN_ASSERT(ten_msg_get_dest_cnt(msg) == 1, "Should not happen."); - switch (ten_msg_get_type(msg)) { - case TEN_MSG_TYPE_CMD_RESULT: - switch (ten_extension_thread_get_state(self)) { - case TEN_EXTENSION_THREAD_STATE_INIT: - case TEN_EXTENSION_THREAD_STATE_NORMAL: - case TEN_EXTENSION_THREAD_STATE_ALL_STARTED: - case TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE: - ten_extension_thread_handle_msg_sync(self, msg); - break; - - case TEN_EXTENSION_THREAD_STATE_CLOSING: - case TEN_EXTENSION_THREAD_STATE_CLOSED: - // Discard this cmd result. - break; - - default: - TEN_ASSERT(0, "Should not happen."); - break; - } - break; - - default: - switch (ten_extension_thread_get_state(self)) { - case TEN_EXTENSION_THREAD_STATE_INIT: { + if (ten_msg_get_type(msg) == TEN_MSG_TYPE_CMD_RESULT) { + if (ten_extension_thread_get_state(self) <= + TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) { + // The receipt of a result is definitely because some extension of this + // extension thread previously sent out a command. As long as the + // extension can issue a command, the corresponding result must be + // delivered to and processed by the respective extension. + ten_extension_thread_handle_msg_sync(self, msg); + } else { + // Discard this cmd result. + } + } else { + switch (ten_extension_thread_get_state(self)) { + case TEN_EXTENSION_THREAD_STATE_INIT: + case TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS: { #if defined(_DEBUG) - ten_msg_dump(msg, NULL, - "A message (^m) comes when extension thread (%p) is in " - "state (%d)", - self, ten_extension_thread_get_state(self)); + ten_msg_dump(msg, NULL, + "A message (^m) comes when extension thread (%p) is in " + "state (%d)", + self, ten_extension_thread_get_state(self)); #endif - // When a TEN app is started, clients will start to send messages into - // the TEN app. At this time, extensions in the TEN app might not - // complete its on_init() (i.e., on_init_done()) and its on_start() - // (i.e., on_start_done()), so we must put these messages into a - // 'pending_msgs' list first, then after an extension complete its - // on_start() (i.e., on_start_done()), these messages will be sent to - // the extension. - - // Push those messages into 'pending_msgs' list, so that we could - // flush them out when the extension thread enters its normal state. - ten_list_push_smart_ptr_back(&self->pending_msgs, msg); - break; - } + // At this stage, the extensions have not been created yet, so any + // received messages are placed into a `pending_msgs` list. Once the + // extensions are created, the messages will be delivered to the + // corresponding extensions. + ten_list_push_smart_ptr_back(&self->pending_msgs, msg); + break; + } - case TEN_EXTENSION_THREAD_STATE_NORMAL: - case TEN_EXTENSION_THREAD_STATE_ALL_STARTED: - case TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE: - ten_extension_thread_handle_msg_sync(self, msg); - break; + case TEN_EXTENSION_THREAD_STATE_NORMAL: + case TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE: + ten_extension_thread_handle_msg_sync(self, msg); + break; - case TEN_EXTENSION_THREAD_STATE_CLOSING: - case TEN_EXTENSION_THREAD_STATE_CLOSED: - // Discard all uninterested messages directly. - break; + case TEN_EXTENSION_THREAD_STATE_CLOSING: + case TEN_EXTENSION_THREAD_STATE_CLOSED: + // All the extensions of the extension thread have been closed, so + // discard all received messages directly. + break; - default: - TEN_ASSERT(0, "Should not happen."); - break; - } - break; + default: + TEN_ASSERT(0, "Should not happen."); + break; + } } ten_shared_ptr_destroy(msg); @@ -216,40 +198,30 @@ void ten_extension_thread_process_acquire_lock_mode_task(void *self_, (ten_acquire_lock_mode_result_t *)arg; TEN_ASSERT(acquire_result, "Invalid argument."); - if (ten_extension_thread_get_state(self) < - TEN_EXTENSION_THREAD_STATE_ALL_STARTED) { - // The 'acquire_lock_mode' action can only be successful after the - // corresponding extension thread has reached the ALL_STARTED state. This - // implies that all extensions within the thread must be started prior to - // the successful execution of 'acquire_lock_mode'. - ten_error_set(&acquire_result->err, TEN_ERRNO_GENERIC, - "Try to acquire lock_mode before its ALL_STARTED state."); - } else { - // Because the extension thread is about to acquire the lock mode lock to - // prevent the outer thread from directly using the TEN world, a task to - // release the lock mode is inserted, allowing the extension thread to exit - // this mode and giving the outer thread a chance to acquire the lock mode - // lock. - int rc = ten_runloop_post_task_tail( - self->runloop, ten_extension_thread_process_release_lock_mode_task, - self, NULL); - TEN_ASSERT(!rc, "Should not happen."); - - // Set `in_lock_mode` to reflect the effect of the below `ten_mutex_lock` - // blocking the extension thread. - self->in_lock_mode = true; - - // Inform the outer thread that the extension thread has also entered the - // lock mode. - ten_event_set(acquire_result->completed); - - rc = ten_mutex_lock(self->lock_mode_lock); - TEN_ASSERT(!rc, "Should not happen."); - } + // Because the extension thread is about to acquire the lock mode lock to + // prevent the outer thread from directly using the TEN world, a task to + // release the lock mode is inserted, allowing the extension thread to exit + // this mode and giving the outer thread a chance to acquire the lock mode + // lock. + int rc = ten_runloop_post_task_tail( + self->runloop, ten_extension_thread_process_release_lock_mode_task, self, + NULL); + TEN_ASSERT(!rc, "Should not happen."); + + // Set `in_lock_mode` to reflect the effect of the below `ten_mutex_lock` + // blocking the extension thread. + self->in_lock_mode = true; + + // Inform the outer thread that the extension thread has also entered the + // lock mode. + ten_event_set(acquire_result->completed); + + rc = ten_mutex_lock(self->lock_mode_lock); + TEN_ASSERT(!rc, "Should not happen."); } -void ten_extension_thread_handle_msg_async(ten_extension_thread_t *self, - ten_shared_ptr_t *msg) { +void ten_extension_thread_handle_in_msg_async(ten_extension_thread_t *self, + ten_shared_ptr_t *msg) { TEN_ASSERT(self, "Invalid argument."); TEN_ASSERT(ten_extension_thread_check_integrity(self, false), "Invalid use of extension %p.", self); @@ -322,8 +294,8 @@ void ten_extension_thread_dispatch_msg(ten_extension_thread_t *self, ten_extension_handle_in_msg(dest_extension, msg); } else { // Put the msg into the message queue of the destination extension thread. - ten_extension_thread_handle_msg_async(dest_extension->extension_thread, - msg); + ten_extension_thread_handle_in_msg_async(dest_extension->extension_thread, + msg); } } else { // Slow path. @@ -360,7 +332,7 @@ void ten_extension_thread_dispatch_msg(ten_extension_thread_t *self, ten_extension_context_find_extension_group_by_name( engine->extension_context, &dest_loc->extension_group_name); if (extension_group_) { - ten_extension_thread_handle_msg_async( + ten_extension_thread_handle_in_msg_async( extension_group_->extension_thread, msg); } else { ten_string_t loc_str; @@ -388,7 +360,7 @@ void ten_extension_thread_dispatch_msg(ten_extension_thread_t *self, // result to the message queue of the extension thread, and TEN // runtime would later route the result to the correct extension // or client which sends the original command. - ten_extension_thread_handle_msg_async(self, cmd_result); + ten_extension_thread_handle_in_msg_async(self, cmd_result); ten_shared_ptr_destroy(cmd_result); } @@ -396,7 +368,7 @@ void ten_extension_thread_dispatch_msg(ten_extension_thread_t *self, // The message should be handled in the current extension thread, so // dispatch the message to the current extension thread. - ten_extension_thread_handle_msg_async(self, msg); + ten_extension_thread_handle_in_msg_async(self, msg); } } } diff --git a/core/src/ten_runtime/extension_thread/on_xxx.c b/core/src/ten_runtime/extension_thread/on_xxx.c index 754800b87..76a61d30c 100644 --- a/core/src/ten_runtime/extension_thread/on_xxx.c +++ b/core/src/ten_runtime/extension_thread/on_xxx.c @@ -14,14 +14,12 @@ #include "include_internal/ten_runtime/engine/on_xxx.h" #include "include_internal/ten_runtime/extension/close.h" #include "include_internal/ten_runtime/extension/extension.h" -#include "include_internal/ten_runtime/extension/extension_cb_default.h" #include "include_internal/ten_runtime/extension/metadata.h" #include "include_internal/ten_runtime/extension/msg_handling.h" #include "include_internal/ten_runtime/extension/on_xxx.h" #include "include_internal/ten_runtime/extension/path_timer.h" #include "include_internal/ten_runtime/extension_context/extension_context.h" #include "include_internal/ten_runtime/extension_context/internal/del_extension.h" -#include "include_internal/ten_runtime/extension_context/internal/extension_group_is_initted.h" #include "include_internal/ten_runtime/extension_context/internal/extension_group_is_stopped.h" #include "include_internal/ten_runtime/extension_context/internal/extension_thread_is_closing.h" #include "include_internal/ten_runtime/extension_group/extension_group.h" @@ -36,7 +34,6 @@ #include "include_internal/ten_runtime/msg/msg.h" #include "include_internal/ten_runtime/path/path.h" #include "include_internal/ten_runtime/path/path_table.h" -#include "include_internal/ten_runtime/schema_store/store.h" #include "include_internal/ten_runtime/ten_env/ten_env.h" #include "ten_runtime/extension/extension.h" #include "ten_runtime/msg/cmd_result/cmd_result.h" @@ -146,8 +143,9 @@ void ten_extension_thread_on_extension_group_on_init_done( TEN_ASSERT(ten_extension_thread_check_integrity(self, true), "Invalid use of extension_thread %p.", self); - if (ten_extension_thread_get_state(self) >= - TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) { + // The extension system is about to be shut down, so do not proceed with + // initialization anymore. + if (self->is_close_triggered) { return; } @@ -182,307 +180,6 @@ void ten_extension_thread_on_extension_group_on_init_done( ten_extension_group_create_extensions(self->extension_group); } -static void ten_extension_adjust_and_validate_property_on_configure_done( - ten_extension_t *self) { - TEN_ASSERT(self && ten_extension_check_integrity(self, true), - "Should not happen."); - - ten_error_t err; - ten_error_init(&err); - - bool success = ten_schema_store_adjust_properties(&self->schema_store, - &self->property, &err); - if (!success) { - TEN_LOGW("[%s] Failed to adjust property type: %s.", - ten_extension_get_name(self), ten_error_errmsg(&err)); - goto done; - } - - success = ten_schema_store_validate_properties(&self->schema_store, - &self->property, &err); - if (!success) { - TEN_LOGW("[%s] Invalid property: %s.", ten_extension_get_name(self), - ten_error_errmsg(&err)); - goto done; - } - -done: - ten_error_deinit(&err); - if (!success) { - TEN_ASSERT(0, "Invalid property."); - } -} - -static bool ten_extension_parse_interface_schema(ten_extension_t *self, - ten_value_t *api_definition, - ten_error_t *err) { - TEN_ASSERT(self && ten_extension_check_integrity(self, true), - "Invalid argument."); - TEN_ASSERT(api_definition && ten_value_check_integrity(api_definition), - "Invalid argument."); - - bool result = ten_schema_store_set_interface_schema_definition( - &self->schema_store, api_definition, - ten_string_get_raw_str(&self->base_dir), err); - if (!result) { - TEN_LOGW("[%s] Failed to set interface schema definition: %s.", - ten_extension_get_name(self), ten_error_errmsg(err)); - } - - return result; -} - -void ten_extension_thread_on_extension_on_configure_done(void *self_, - void *arg) { - ten_extension_thread_t *self = (ten_extension_thread_t *)self_; - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT(ten_extension_thread_check_integrity(self, true), - "Invalid use of extension_thread %p.", self); - - ten_error_t err; - ten_error_init(&err); - - ten_extension_on_configure_done_t *on_configure_done = arg; - TEN_ASSERT(on_configure_done, "Should not happen."); - - ten_extension_t *extension = on_configure_done->extension; - TEN_ASSERT(extension && ten_extension_check_integrity(extension, true), - "Should not happen."); - - if (ten_extension_thread_get_state(self) >= - TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) { - goto done; - } - - ten_extension_set_state(extension, TEN_EXTENSION_STATE_CONFIGURED); - - bool rc = ten_handle_manifest_info_when_on_configure_done( - &extension->manifest_info, - ten_string_get_raw_str(ten_extension_get_base_dir(extension)), - &extension->manifest, &err); - if (!rc) { - TEN_LOGW("Failed to load extension manifest data, FATAL ERROR."); - exit(EXIT_FAILURE); - } - - rc = ten_handle_property_info_when_on_configure_done( - &extension->property_info, - ten_string_get_raw_str(ten_extension_get_base_dir(extension)), - &extension->property, &err); - if (!rc) { - TEN_LOGW("Failed to load extension property data, FATAL ERROR."); - exit(EXIT_FAILURE); - } - - rc = ten_extension_resolve_properties_in_graph(extension, &err); - TEN_ASSERT(rc, "Failed to resolve properties in graph."); - - ten_extension_merge_properties_from_graph(extension); - - rc = ten_extension_handle_ten_namespace_properties( - extension, extension->extension_context); - TEN_ASSERT(rc, "[%s] Failed to handle '_ten' properties.", - ten_string_get_raw_str(&extension->name)); - - ten_value_t *api_definition = ten_metadata_init_schema_store( - &extension->manifest, &extension->schema_store); - if (api_definition) { - bool success = - ten_extension_parse_interface_schema(extension, api_definition, &err); - TEN_ASSERT(success, "Failed to parse interface schema."); - } - - ten_extension_adjust_and_validate_property_on_configure_done(extension); - - // Create timers for automatically cleaning expired IN_PATHs and OUT_PATHs. - ten_timer_t *in_path_timer = - ten_extension_create_timer_for_in_path(extension); - ten_list_push_ptr_back(&extension->path_timers, in_path_timer, NULL); - ten_timer_enable(in_path_timer); - - ten_timer_t *out_path_timer = - ten_extension_create_timer_for_out_path(extension); - ten_list_push_ptr_back(&extension->path_timers, out_path_timer, NULL); - ten_timer_enable(out_path_timer); - - // The interface info has been resolved, and extensions might send msg out - // during `on_start()`, so it's the best time to merge the interface info to - // the extension_info. - rc = - ten_extension_determine_and_merge_all_interface_dest_extension(extension); - TEN_ASSERT(rc, "Should not happen."); - - ten_extension_on_init(extension->ten_env); - -done: - ten_error_deinit(&err); - - ten_extension_on_configure_done_destroy(on_configure_done); -} - -void ten_extension_thread_on_extension_on_init_done(void *self_, void *arg) { - ten_extension_thread_t *self = (ten_extension_thread_t *)self_; - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT(ten_extension_thread_check_integrity(self, true), - "Invalid use of extension_thread %p.", self); - - ten_error_t err; - ten_error_init(&err); - - ten_extension_on_init_done_t *on_init_done = arg; - TEN_ASSERT(on_init_done, "Should not happen."); - - ten_extension_t *extension = on_init_done->extension; - TEN_ASSERT(extension && ten_extension_check_integrity(extension, true), - "Should not happen."); - - if (ten_extension_thread_get_state(self) >= - TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) { - goto done; - } - - ten_extension_set_state(extension, TEN_EXTENSION_STATE_INITTED); - - self->extensions_cnt_of_on_init_done++; - - if (self->extensions_cnt_of_on_init_done == - ten_list_size(&self->extensions)) { - // All extensions in this extension group/thread have been initted. - - // Because the extension's on_init() may initialize some states of the - // extension, we must wait until all extensions have completed their - // 'on_init()' before they can start processing 'on_cmd()'. - // - // When the state of the extension thread is switched to - // TEN_EXTENSION_THREAD_STATE_NORMAL, the messages will be pushed into the - // extensions contained in the extension thread. Therefore, we can only - // change the state of the extension thread to - // TEN_EXTENSION_THREAD_STATE_NORMAL at this time. - ten_extension_thread_set_state(self, TEN_EXTENSION_THREAD_STATE_NORMAL); - - ten_extension_context_t *extension_context = self->extension_context; - TEN_ASSERT(extension_context, "Invalid argument."); - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: This function will be called in the extension thread, - // however, the extension_context would not be changed after the extension - // system is starting, so it's safe to access the extension_context - // information in the extension thead. - // - // However, for the strict thread safety, it's possible to modify the - // logic here to use asynchronous operations (i.e., add a task to the - // extension_context, and add a task to the extension_thread when the - // result is found) here. - TEN_ASSERT(ten_extension_context_check_integrity(extension_context, false), - "Invalid use of extension_context %p.", extension_context); - - ten_engine_t *engine = extension_context->engine; - TEN_ASSERT(engine, "Invalid argument."); - // TEN_NOLINTNEXTLINE(thread-check) - // thread-check: The runloop of the engine will not be changed during the - // whole lifetime of the extension thread, so it's thread safe to access - // it here. - TEN_ASSERT(ten_engine_check_integrity(engine, false), - "Invalid use of engine %p.", engine); - - ten_runloop_post_task_tail( - ten_engine_get_attached_runloop(engine), - ten_extension_context_on_all_extensions_in_extension_group_are_initted, - extension_context, self->extension_group); - } - -done: - ten_error_deinit(&err); - - ten_extension_on_init_done_destroy(on_init_done); -} - -void ten_extension_thread_call_all_extensions_on_start(void *self_, - TEN_UNUSED void *arg) { - ten_extension_thread_t *self = (ten_extension_thread_t *)self_; - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT(ten_extension_thread_check_integrity(self, true), - "Invalid use of extension_thread %p.", self); - - if (ten_extension_thread_get_state(self) >= - TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) { - // Already in the closing flow. - return; - } - - // Call on_start() of each containing extensions. - ten_list_foreach (&self->extensions, iter) { - ten_extension_t *extension = ten_ptr_listnode_get(iter.node); - TEN_ASSERT(extension && ten_extension_check_integrity(extension, true), - "Should not happen."); - - ten_extension_on_start(extension); - } - - size_t pending_msgs_size = ten_list_size(&self->pending_msgs); - if (pending_msgs_size) { - // Flush the previously got messages, which are received before on_start(), - // into the extension thread. - TEN_LOGD("Flushing %zu pending msgs received before on_start().", - pending_msgs_size); - - ten_list_foreach (&self->pending_msgs, iter) { - ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node); - TEN_ASSERT(msg, "Should not happen."); - - ten_extension_thread_handle_msg_async(self, msg); - } - ten_list_clear(&self->pending_msgs); - } -} - -void ten_extension_thread_on_extension_on_start_done(void *self_, void *arg) { - ten_extension_thread_t *self = (ten_extension_thread_t *)self_; - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT(ten_extension_thread_check_integrity(self, true), - "Invalid use of extension_thread %p.", self); - - ten_extension_on_start_stop_deinit_done_t *on_start_done = arg; - TEN_ASSERT(on_start_done, "Should not happen."); - - if (ten_extension_thread_get_state(self) >= - TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) { - goto done; - } - - ten_extension_t *extension = on_start_done->extension; - TEN_ASSERT(extension && ten_extension_check_integrity(extension, true), - "Should not happen."); - - self->extensions_cnt_of_on_start_done++; - - if (self->extensions_cnt_of_on_start_done == - ten_list_size(&self->extensions)) { - ten_extension_thread_set_state(self, - TEN_EXTENSION_THREAD_STATE_ALL_STARTED); - } - - ten_extension_set_state(extension, TEN_EXTENSION_STATE_STARTED); - - size_t pending_msgs_size = ten_list_size(&extension->pending_msgs); - if (pending_msgs_size) { - // Flush the previously got messages, which are received before - // on_start_done(), into the extension thread. - TEN_LOGD("Flushing %zu pending msgs received before on_start_done().", - pending_msgs_size); - - ten_list_foreach (&extension->pending_msgs, iter) { - ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node); - TEN_ASSERT(msg, "Should not happen."); - - ten_extension_handle_in_msg(extension, msg); - } - ten_list_clear(&extension->pending_msgs); - } - -done: - ten_extension_on_start_stop_deinit_done_destroy(on_start_done); -} - static void ten_extension_thread_process_remaining_paths( ten_extension_t *extension) { TEN_ASSERT(extension && ten_extension_check_integrity(extension, true), @@ -757,8 +454,7 @@ void ten_extension_thread_on_all_extensions_in_all_extension_threads_added_to_en ten_engine_on_extension_thread_initted, engine, self); - if (ten_extension_thread_get_state(self) >= - TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) { + if (self->is_close_triggered) { return; } @@ -810,28 +506,6 @@ void ten_extension_thread_on_all_extensions_deleted(void *self_, ten_extension_group_on_deinit(extension_group); } -void ten_extension_thread_on_all_extensions_created(void *self_, void *arg) { - ten_extension_thread_t *self = (ten_extension_thread_t *)self_; - TEN_ASSERT(self, "Invalid argument."); - TEN_ASSERT(ten_extension_thread_check_integrity(self, true), - "Invalid use of extension_thread %p.", self); - - ten_list_t *extensions = (ten_list_t *)arg; - ten_list_swap(&self->extensions, extensions); - TEN_FREE(extensions); - - ten_list_foreach (&self->extensions, iter) { - ten_extension_t *extension = ten_ptr_listnode_get(iter.node); - TEN_ASSERT(extension, "Invalid argument."); - - ten_extension_inherit_thread_ownership(extension, self); - TEN_ASSERT(ten_extension_check_integrity(extension, true), - "Invalid use of extension %p.", extension); - } - - ten_extension_thread_start_to_add_all_created_extension_to_engine(self); -} - void ten_extension_thread_on_addon_create_extension_done(void *self_, void *arg) { ten_extension_thread_t *self = (ten_extension_thread_t *)self_; diff --git a/core/src/ten_runtime/ten_env/internal/return.c b/core/src/ten_runtime/ten_env/internal/return.c index 629e92642..d8c3cd30f 100644 --- a/core/src/ten_runtime/ten_env/internal/return.c +++ b/core/src/ten_runtime/ten_env/internal/return.c @@ -51,10 +51,10 @@ static bool ten_env_return_result_internal(ten_env_t *self, bool result = true; - if (extension->state < TEN_EXTENSION_STATE_INITTED) { - TEN_LOGE("Cannot return results before on_init_done."); + if (extension->state < TEN_EXTENSION_STATE_ON_CONFIGURE_DONE) { + TEN_LOGE("Cannot return results before on_configure_done."); ten_error_set(err, TEN_ERRNO_GENERIC, - "Cannot return results before on_init_done."); + "Cannot return results before on_configure_done."); result = false; goto done; } diff --git a/core/src/ten_runtime/ten_env/internal/send.c b/core/src/ten_runtime/ten_env/internal/send.c index 25a7377ab..f60ba519c 100644 --- a/core/src/ten_runtime/ten_env/internal/send.c +++ b/core/src/ten_runtime/ten_env/internal/send.c @@ -91,10 +91,10 @@ static bool ten_send_msg_internal( ten_extension_t *extension = ten_env_get_attached_extension(self); TEN_ASSERT(extension, "Should not happen."); - if (extension->state < TEN_EXTENSION_STATE_INITTED) { - TEN_LOGE("Cannot send messages before on_init_done."); + if (extension->state < TEN_EXTENSION_STATE_ON_CONFIGURE_DONE) { + TEN_LOGE("Cannot send messages before on_configure_done."); ten_error_set(err, TEN_ERRNO_GENERIC, - "Cannot send messages before on_init_done."); + "Cannot send messages before on_configure_done."); result = false; goto done; } diff --git a/core/src/ten_runtime/ten_env/ten_proxy.c b/core/src/ten_runtime/ten_env/ten_proxy.c index aa9c9d51b..14d7b1769 100644 --- a/core/src/ten_runtime/ten_env/ten_proxy.c +++ b/core/src/ten_runtime/ten_env/ten_proxy.c @@ -11,11 +11,11 @@ #include "include_internal/ten_runtime/extension_group/extension_group.h" #include "include_internal/ten_runtime/ten_env/ten_env.h" #include "include_internal/ten_runtime/ten_env_proxy/ten_env_proxy.h" -#include "ten_utils/macro/check.h" #include "ten_runtime/app/app.h" #include "ten_runtime/extension/extension.h" #include "ten_runtime/ten_env/ten_env.h" #include "ten_utils/container/list_ptr.h" +#include "ten_utils/macro/check.h" void ten_env_add_ten_proxy(ten_env_t *self, ten_env_proxy_t *ten_env_proxy) { TEN_ASSERT(self && ten_env_check_integrity(self, true), "Invalid argument."); @@ -40,7 +40,7 @@ void ten_env_delete_ten_proxy(ten_env_t *self, ten_env_proxy_t *ten_env_proxy) { TEN_ASSERT(extension && ten_extension_check_integrity(extension, true), "Should not happen."); - if (extension->state == TEN_EXTENSION_STATE_DEINITING) { + if (extension->state == TEN_EXTENSION_STATE_ON_DEINIT) { ten_env_on_deinit_done(self, NULL); } break; diff --git a/tests/ten_runtime/smoke/extension_test/basic/basic_extensions_init_dependency.cc b/tests/ten_runtime/smoke/extension_test/basic/basic_extensions_init_dependency.cc index 6d1de6aa0..99753daf2 100644 --- a/tests/ten_runtime/smoke/extension_test/basic/basic_extensions_init_dependency.cc +++ b/tests/ten_runtime/smoke/extension_test/basic/basic_extensions_init_dependency.cc @@ -28,8 +28,6 @@ class test_extension_1 : public ten::extension_t { public: explicit test_extension_1(const std::string &name) : ten::extension_t(name) {} - void on_start(ten::ten_env_t &ten_env) override { ten_env.on_start_done(); } - void on_cmd(ten::ten_env_t &ten_env, std::unique_ptr cmd) override { nlohmann::json json = nlohmann::json::parse(cmd->to_json()); @@ -62,16 +60,15 @@ class test_extension_2 : public ten::extension_t { ten_env.on_configure_done(); } - void on_start(ten::ten_env_t &ten_env) override { + void on_init(ten::ten_env_t &ten_env) override { auto *ten_env_proxy = ten::ten_env_proxy_t::create(ten_env); fetch_property_thread_ = std::thread( [this](ten::ten_env_proxy_t *ten_env_proxy) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); - ten_env_proxy->notify([this](ten::ten_env_t &ten_env) { - this->get_property_from_outer_thread(ten_env); - }); + ten_env_proxy->notify( + [this](ten::ten_env_t &ten_env) { this->get_property(ten_env); }); delete ten_env_proxy; }, @@ -100,7 +97,7 @@ class test_extension_2 : public ten::extension_t { std::string greeting_; std::thread fetch_property_thread_; - void get_property_from_outer_thread(ten::ten_env_t &ten_env) { + void get_property(ten::ten_env_t &ten_env) { ten_env.get_property_string_async( EXTENSION_PROP_NAME_GREETING, [this](ten::ten_env_t &ten_env, const std::string &result, @@ -115,7 +112,7 @@ class test_extension_2 : public ten::extension_t { auto name = cmd_result->get_property_string("detail"); greeting_ += name; - ten_env.on_start_done(); + ten_env.on_init_done(); return true; }, nullptr); diff --git a/tests/ten_runtime/smoke/standalone_test/new.cc b/tests/ten_runtime/smoke/standalone_test/new.cc index c6614b655..d4100b20c 100644 --- a/tests/ten_runtime/smoke/standalone_test/new.cc +++ b/tests/ten_runtime/smoke/standalone_test/new.cc @@ -35,7 +35,7 @@ TEN_CPP_REGISTER_ADDON_AS_EXTENSION(standalone_test_new__test_extension_1, } // namespace -TEST(StandaloneTest, New) { // NOLINT +TEST(StandaloneTest, DISABLED_New) { // NOLINT ten_extension_test_new_t *test = ten_extension_test_create_new(); ten_extension_test_add_addon(test, "standalone_test_new__test_extension_1"); ten_extension_test_start_new(test); From 1bca6c97ecaee4fc1e8fd952ec12260956dda70c Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Fri, 27 Sep 2024 21:58:16 +0800 Subject: [PATCH 3/3] refactor!: refine extension startup flow With the introduction of the on_configure() initialization phase, the startup process of the extension has been better planned. - on_configure ~ on_configure_done + on_init ~ on_init_done: Handles its own initialization; cannot send or receive messages. The reason for this is that, before `on_init_done`, the extension may not be ready to handle external requests, so the received messages need to be temporarily stored. - ~ on_start: The messages received before on_start() will be temporarily stored, and only after on_start() is called will they be sent to the extension. The reason for this is developers generally expect `on_start` to occur before any `on_cmd` events. - on_start ~ on_stop_done: Normal sending and receiving of all messages and results. BREAKING CHANGE: --- .../ten_runtime/ten_env_proxy/ten_env_proxy.h | 9 ++- .../binding/python/native/common/common.c | 2 +- .../ten_env/ten_env_on_configure_done.c | 6 +- .../native/ten_env/ten_env_on_init_done.c | 5 +- .../native/ten_env/ten_env_on_start_done.c | 5 +- .../native/ten_env/ten_env_on_stop_done.c | 5 +- .../src/ten_runtime/ten_env/internal/return.c | 2 +- .../ten_env_proxy/internal/notify.c | 79 +++++++++++++++++++ 8 files changed, 97 insertions(+), 16 deletions(-) diff --git a/core/include_internal/ten_runtime/ten_env_proxy/ten_env_proxy.h b/core/include_internal/ten_runtime/ten_env_proxy/ten_env_proxy.h index 12ed0f7ad..5d1570a00 100644 --- a/core/include_internal/ten_runtime/ten_env_proxy/ten_env_proxy.h +++ b/core/include_internal/ten_runtime/ten_env_proxy/ten_env_proxy.h @@ -39,7 +39,12 @@ typedef struct ten_notify_data_t { TEN_RUNTIME_API bool ten_env_proxy_check_integrity(ten_env_proxy_t *self); TEN_RUNTIME_API size_t ten_env_proxy_get_thread_cnt(ten_env_proxy_t *self, - ten_error_t *err); + ten_error_t *err); TEN_RUNTIME_PRIVATE_API bool ten_env_proxy_acquire(ten_env_proxy_t *self, - ten_error_t *err); + ten_error_t *err); + +TEN_RUNTIME_API bool ten_env_proxy_notify_async(ten_env_proxy_t *self, + ten_notify_func_t notify_func, + void *user_data, + ten_error_t *err); diff --git a/core/src/ten_runtime/binding/python/native/common/common.c b/core/src/ten_runtime/binding/python/native/common/common.c index 7415720e3..023ea9af7 100644 --- a/core/src/ten_runtime/binding/python/native/common/common.c +++ b/core/src/ten_runtime/binding/python/native/common/common.c @@ -8,10 +8,10 @@ #include "include_internal/ten_runtime/binding/python/common/common.h" #include "include_internal/ten_runtime/binding/python/common/python_stuff.h" -#include "ten_utils/macro/check.h" #include "ten_utils/container/list.h" #include "ten_utils/container/list_node_str.h" #include "ten_utils/lib/string.h" +#include "ten_utils/macro/check.h" int ten_py_is_initialized(void) { return Py_IsInitialized(); } diff --git a/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_configure_done.c b/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_configure_done.c index e64e1e3cc..5908fbddd 100644 --- a/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_configure_done.c +++ b/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_configure_done.c @@ -46,9 +46,9 @@ PyObject *ten_py_ten_env_on_configure_done(PyObject *self, PyObject *args) { if (py_ten->c_ten_env->attach_to == TEN_ENV_ATTACH_TO_ADDON) { rc = ten_env_on_configure_done(py_ten->c_ten_env, &err); } else { - rc = ten_env_proxy_notify(py_ten->c_ten_env_proxy, - ten_env_proxy_notify_on_configure_done, NULL, - false, &err); + rc = ten_env_proxy_notify_async(py_ten->c_ten_env_proxy, + ten_env_proxy_notify_on_configure_done, + NULL, &err); } if (!rc) { diff --git a/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_init_done.c b/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_init_done.c index ba66bb16a..68793cbf6 100644 --- a/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_init_done.c +++ b/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_init_done.c @@ -46,9 +46,8 @@ PyObject *ten_py_ten_env_on_init_done(PyObject *self, PyObject *args) { if (py_ten->c_ten_env->attach_to == TEN_ENV_ATTACH_TO_ADDON) { rc = ten_env_on_init_done(py_ten->c_ten_env, &err); } else { - rc = ten_env_proxy_notify(py_ten->c_ten_env_proxy, - ten_env_proxy_notify_on_init_done, NULL, false, - &err); + rc = ten_env_proxy_notify_async( + py_ten->c_ten_env_proxy, ten_env_proxy_notify_on_init_done, NULL, &err); } if (!rc) { diff --git a/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_start_done.c b/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_start_done.c index 733c053d9..901de375a 100644 --- a/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_start_done.c +++ b/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_start_done.c @@ -39,9 +39,8 @@ PyObject *ten_py_ten_env_on_start_done(PyObject *self, ten_error_t err; ten_error_init(&err); - TEN_UNUSED bool rc = ten_env_proxy_notify(py_ten->c_ten_env_proxy, - ten_env_proxy_notify_on_start_done, - NULL, false, &err); + TEN_UNUSED bool rc = ten_env_proxy_notify_async( + py_ten->c_ten_env_proxy, ten_env_proxy_notify_on_start_done, NULL, &err); TEN_ASSERT(rc, "Should not happen."); ten_error_deinit(&err); diff --git a/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_stop_done.c b/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_stop_done.c index a0a376d6e..6a4be09cd 100644 --- a/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_stop_done.c +++ b/core/src/ten_runtime/binding/python/native/ten_env/ten_env_on_stop_done.c @@ -39,9 +39,8 @@ PyObject *ten_py_ten_env_on_stop_done(PyObject *self, ten_error_t err; ten_error_init(&err); - TEN_UNUSED bool rc = ten_env_proxy_notify(py_ten->c_ten_env_proxy, - ten_env_proxy_notify_on_stop_done, - NULL, false, &err); + TEN_UNUSED bool rc = ten_env_proxy_notify_async( + py_ten->c_ten_env_proxy, ten_env_proxy_notify_on_stop_done, NULL, &err); ten_error_deinit(&err); diff --git a/core/src/ten_runtime/ten_env/internal/return.c b/core/src/ten_runtime/ten_env/internal/return.c index d8c3cd30f..e79d8c950 100644 --- a/core/src/ten_runtime/ten_env/internal/return.c +++ b/core/src/ten_runtime/ten_env/internal/return.c @@ -60,7 +60,7 @@ static bool ten_env_return_result_internal(ten_env_t *self, } if (extension->state >= TEN_EXTENSION_STATE_CLOSING) { - TEN_LOGE("Cannot return results after on_stop_done."); + TEN_LOGW("Cannot return results after on_stop_done."); ten_error_set(err, TEN_ERRNO_GENERIC, "Cannot return results after on_stop_done."); result = false; diff --git a/core/src/ten_runtime/ten_env_proxy/internal/notify.c b/core/src/ten_runtime/ten_env_proxy/internal/notify.c index 59d30372a..f88c5b4ec 100644 --- a/core/src/ten_runtime/ten_env_proxy/internal/notify.c +++ b/core/src/ten_runtime/ten_env_proxy/internal/notify.c @@ -218,3 +218,82 @@ bool ten_env_proxy_notify(ten_env_proxy_t *self, ten_notify_func_t notify_func, TEN_ASSERT(result, "Should not happen."); return result; } + +bool ten_env_proxy_notify_async(ten_env_proxy_t *self, + ten_notify_func_t notify_func, void *user_data, + ten_error_t *err) { + TEN_ASSERT(self, "Invalid argument."); + TEN_ASSERT(notify_func, "Invalid argument."); + + if (!self || !notify_func || !ten_env_proxy_check_integrity(self)) { + const char *err_msg = "Invalid argument."; + TEN_ASSERT(0, "%s", err_msg); + if (err) { + ten_error_set(err, TEN_ERRNO_INVALID_ARGUMENT, err_msg); + } + return false; + } + + bool result = true; + + ten_env_t *ten_env = self->ten_env; + // TEN_NOLINTNEXTLINE(thread-check) + // thread-check: This function is intended to be called in any threads. + TEN_ASSERT(ten_env && ten_env_check_integrity(ten_env, false), + "Should not happen."); + + switch (ten_env->attach_to) { + case TEN_ENV_ATTACH_TO_EXTENSION: { + ten_extension_t *extension = ten_env->attached_target.extension; + TEN_ASSERT(extension, "Invalid argument."); + // TEN_NOLINTNEXTLINE(thread-check) + // thread-check: This function is intended to be called in any threads, + // and the use of extension instance is thread safe here. + TEN_ASSERT(ten_extension_check_integrity(extension, false), + "Invalid argument."); + + ten_runloop_post_task_tail( + ten_extension_get_attached_runloop(extension), + ten_notify_to_extension_task, extension, + ten_notify_data_create(notify_func, user_data)); + break; + } + + case TEN_ENV_ATTACH_TO_EXTENSION_GROUP: { + ten_extension_group_t *extension_group = + ten_env->attached_target.extension_group; + TEN_ASSERT(extension_group, "Invalid argument."); + // TEN_NOLINTNEXTLINE(thread-check) + // thread-check: This function is intended to be called in any threads, + // and the use of extension instance is thread safe here. + TEN_ASSERT(ten_extension_group_check_integrity(extension_group, false), + "Invalid argument."); + + ten_runloop_post_task_tail( + ten_extension_group_get_attached_runloop(extension_group), + ten_notify_to_extension_group_task, extension_group, + ten_notify_data_create(notify_func, user_data)); + break; + } + + case TEN_ENV_ATTACH_TO_APP: { + ten_app_t *app = ten_env->attached_target.app; + TEN_ASSERT(app, "Invalid argument."); + // TEN_NOLINTNEXTLINE(thread-check) + // thread-check: This function is intended to be called in any threads. + TEN_ASSERT(ten_app_check_integrity(app, false), "Invalid argument."); + + ten_runloop_post_task_tail( + ten_app_get_attached_runloop(app), ten_notify_to_app_task, app, + ten_notify_data_create(notify_func, user_data)); + break; + } + + default: + TEN_ASSERT(0, "Handle more types: %d", ten_env->attach_to); + break; + } + + TEN_ASSERT(result, "Should not happen."); + return result; +}