From 57f6943d5c2b7fd7d1fcd792034c9a87b711a27e Mon Sep 17 00:00:00 2001 From: Hu Yueh-Wei Date: Sun, 29 Sep 2024 10:37:46 +0800 Subject: [PATCH] refactor: refine extension stop flow --- .../extension_thread/extension_thread.h | 3 + core/src/ten_runtime/extension/extension.c | 14 ++--- core/src/ten_runtime/extension/msg_handling.c | 28 +++++---- .../extension_thread/msg_interface/common.c | 62 +++++++------------ 4 files changed, 51 insertions(+), 56 deletions(-) diff --git a/core/include_internal/ten_runtime/extension_thread/extension_thread.h b/core/include_internal/ten_runtime/extension_thread/extension_thread.h index 90f5b80781..d32aa3fbe3 100644 --- a/core/include_internal/ten_runtime/extension_thread/extension_thread.h +++ b/core/include_internal/ten_runtime/extension_thread/extension_thread.h @@ -30,6 +30,9 @@ typedef enum TEN_EXTENSION_THREAD_STATE { TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS, TEN_EXTENSION_THREAD_STATE_NORMAL, TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE, + + // All extensions of this extension thread are closed, and removed from this + // extension thread. TEN_EXTENSION_THREAD_STATE_CLOSED, } TEN_EXTENSION_THREAD_STATE; diff --git a/core/src/ten_runtime/extension/extension.c b/core/src/ten_runtime/extension/extension.c index 6633ee2dc9..5f4d2184d5 100644 --- a/core/src/ten_runtime/extension/extension.c +++ b/core/src/ten_runtime/extension/extension.c @@ -797,13 +797,8 @@ static void ten_extension_flush_all_pending_msgs(ten_extension_t *self) { TEN_ASSERT(ten_extension_check_integrity(self, true), "Invalid use of extension %p.", self); - // The developer expects that on_start() will execute before all on_cmd() - // events. Therefore, after on_start() has been executed, there is no need - // to wait for on_start_done() before sending all previously buffered - // messages into the extension. - // Flush the previously got messages, which are received before - // on_start_done(), into the extension. + // on_init_done(), into the extension. ten_extension_thread_t *extension_thread = self->extension_thread; ten_list_foreach (&extension_thread->pending_msgs, iter) { ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node); @@ -819,7 +814,7 @@ static void ten_extension_flush_all_pending_msgs(ten_extension_t *self) { } // Flush the previously got messages, which are received before - // on_start_done(), into the extension. + // on_init_done(), into the extension. ten_list_foreach (&self->pending_msgs, iter) { ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node); TEN_ASSERT(msg, "Should not happen."); @@ -841,6 +836,11 @@ void ten_extension_on_start(ten_extension_t *self) { if (self->on_start) { self->on_start(self, self->ten_env); + // The developer expects that on_start() will execute before all on_cmd() + // events. Therefore, after on_start() has been executed, there is no need + // to wait for on_start_done() before sending all previously buffered + // messages into the extension. + ten_extension_flush_all_pending_msgs(self); } else { ten_extension_flush_all_pending_msgs(self); diff --git a/core/src/ten_runtime/extension/msg_handling.c b/core/src/ten_runtime/extension/msg_handling.c index 01ef276089..b6a8e6d6ee 100644 --- a/core/src/ten_runtime/extension/msg_handling.c +++ b/core/src/ten_runtime/extension/msg_handling.c @@ -70,26 +70,32 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) { bool delete_msg = false; - // - During on_init() and on_deinit(), there should be no interaction with - // other extensions. - // - During on_start() and on_stop(), it is possible to send messages to other - // extensions and receive cmd_result for that message. + // - During on_configure(), on_init() and on_deinit(), the extension should + // not receive any messages, because it is not ready to handle any messages. + // - In other time periods, it is possible to receive and send messages to + // other extensions and receive cmd result. // // The messages, from other extensions, sent to this extension will be - // delivered to this extension only after its on_start_done(). Therefore, all - // messages sent to this extension before on_start_done() will be queued until - // on_start_done() is triggered. On the other hand, the cmd result of the - // command sent by this extension in on_start() can be delivered to this - // extension before its on_start_done(). + // delivered to this extension only after its on_start(). Therefore, all + // messages sent to this extension before on_start() will be queued until + // on_start() is triggered. On the other hand, the cmd result of the + // command sent by this extension in any time can be delivered to this + // extension before its on_start(). if (self->state < TEN_EXTENSION_STATE_ON_START && !ten_msg_is_cmd_result(msg)) { - // The extension is not started, and the msg is not a cmd result, so + // The extension is not initialized, and the msg is not a cmd result, so // cache the msg to the pending list. ten_list_push_smart_ptr_back(&self->pending_msgs, msg); goto done; } + if (self->state >= TEN_EXTENSION_STATE_ON_DEINIT) { + // The extension is in its de-initialization phase, and is not ready to + // handle any messages, so drop any messages. + goto done; + } + bool msg_is_cmd_result = false; // Because 'commands' has 'results', TEN will perform some bookkeeping for @@ -147,7 +153,7 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) { // command (if the IN path still exists) when on_cmd_done(). // // TODO(Xilin): Currently, there is no mechanism for auto return, so the - // relevant logic code should be able to be disabled. + // relevant codes should be able to be disabled. ten_extension_cache_cmd_result_to_in_path_for_auto_return(self, msg); delete_msg = true; diff --git a/core/src/ten_runtime/extension_thread/msg_interface/common.c b/core/src/ten_runtime/extension_thread/msg_interface/common.c index 3d5f63d85e..33ca8c4c4f 100644 --- a/core/src/ten_runtime/extension_thread/msg_interface/common.c +++ b/core/src/ten_runtime/extension_thread/msg_interface/common.c @@ -110,50 +110,36 @@ static void ten_extension_thread_handle_in_msg_task(void *self_, void *arg) { TEN_ASSERT(msg && ten_msg_check_integrity(msg), "Invalid argument."); TEN_ASSERT(ten_msg_get_dest_cnt(msg) == 1, "Should not happen."); - if (ten_msg_get_type(msg) == TEN_MSG_TYPE_CMD_RESULT) { - if (ten_extension_thread_get_state(self) <= - TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) { - // The receipt of a result is definitely because some extension of this - // extension thread previously sent out a command. As long as the - // extension can issue a command, the corresponding result must be - // delivered to and processed by the respective extension. - ten_extension_thread_handle_in_msg_sync(self, msg); - } else { - // Discard this cmd result. - } - } else { - switch (ten_extension_thread_get_state(self)) { - case TEN_EXTENSION_THREAD_STATE_INIT: - case TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS: { + switch (ten_extension_thread_get_state(self)) { + case TEN_EXTENSION_THREAD_STATE_INIT: + case TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS: #if defined(_DEBUG) - ten_msg_dump(msg, NULL, - "A message (^m) comes when extension thread (%p) is in " - "state (%d)", - self, ten_extension_thread_get_state(self)); + ten_msg_dump(msg, NULL, + "A message (^m) comes when extension thread (%p) is in " + "state (%d)", + self, ten_extension_thread_get_state(self)); #endif - // At this stage, the extensions have not been created yet, so any - // received messages are placed into a `pending_msgs` list. Once the - // extensions are created, the messages will be delivered to the - // corresponding extensions. - ten_list_push_smart_ptr_back(&self->pending_msgs, msg); - break; - } + // At this stage, the extensions have not been created yet, so any + // received messages are placed into a `pending_msgs` list. Once the + // extensions are created, the messages will be delivered to the + // corresponding extensions. + ten_list_push_smart_ptr_back(&self->pending_msgs, msg); + break; - case TEN_EXTENSION_THREAD_STATE_NORMAL: - case TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE: - ten_extension_thread_handle_in_msg_sync(self, msg); - break; + case TEN_EXTENSION_THREAD_STATE_NORMAL: + case TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE: + ten_extension_thread_handle_in_msg_sync(self, msg); + break; - case TEN_EXTENSION_THREAD_STATE_CLOSED: - // All the extensions of the extension thread have been closed, so - // discard all received messages directly. - break; + case TEN_EXTENSION_THREAD_STATE_CLOSED: + // All extensions are removed from this extension thread, so the only + // thing we can do is to discard this cmd result. + break; - default: - TEN_ASSERT(0, "Should not happen."); - break; - } + default: + TEN_ASSERT(0, "Should not happen."); + break; } ten_shared_ptr_destroy(msg);