Skip to content

Commit

Permalink
refactor: refine extension stop flow
Browse files Browse the repository at this point in the history
  • Loading branch information
halajohn committed Sep 29, 2024
1 parent 65f12aa commit 57f6943
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ typedef enum TEN_EXTENSION_THREAD_STATE {
TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS,
TEN_EXTENSION_THREAD_STATE_NORMAL,
TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE,

// All extensions of this extension thread are closed, and removed from this
// extension thread.
TEN_EXTENSION_THREAD_STATE_CLOSED,
} TEN_EXTENSION_THREAD_STATE;

Expand Down
14 changes: 7 additions & 7 deletions core/src/ten_runtime/extension/extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -797,13 +797,8 @@ static void ten_extension_flush_all_pending_msgs(ten_extension_t *self) {
TEN_ASSERT(ten_extension_check_integrity(self, true),
"Invalid use of extension %p.", self);

// The developer expects that on_start() will execute before all on_cmd()
// events. Therefore, after on_start() has been executed, there is no need
// to wait for on_start_done() before sending all previously buffered
// messages into the extension.

// Flush the previously got messages, which are received before
// on_start_done(), into the extension.
// on_init_done(), into the extension.
ten_extension_thread_t *extension_thread = self->extension_thread;
ten_list_foreach (&extension_thread->pending_msgs, iter) {
ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node);
Expand All @@ -819,7 +814,7 @@ static void ten_extension_flush_all_pending_msgs(ten_extension_t *self) {
}

// Flush the previously got messages, which are received before
// on_start_done(), into the extension.
// on_init_done(), into the extension.
ten_list_foreach (&self->pending_msgs, iter) {
ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node);
TEN_ASSERT(msg, "Should not happen.");
Expand All @@ -841,6 +836,11 @@ void ten_extension_on_start(ten_extension_t *self) {
if (self->on_start) {
self->on_start(self, self->ten_env);

// The developer expects that on_start() will execute before all on_cmd()
// events. Therefore, after on_start() has been executed, there is no need
// to wait for on_start_done() before sending all previously buffered
// messages into the extension.

ten_extension_flush_all_pending_msgs(self);
} else {
ten_extension_flush_all_pending_msgs(self);
Expand Down
28 changes: 17 additions & 11 deletions core/src/ten_runtime/extension/msg_handling.c
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,32 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) {

bool delete_msg = false;

// - During on_init() and on_deinit(), there should be no interaction with
// other extensions.
// - During on_start() and on_stop(), it is possible to send messages to other
// extensions and receive cmd_result for that message.
// - During on_configure(), on_init() and on_deinit(), the extension should
// not receive any messages, because it is not ready to handle any messages.
// - In other time periods, it is possible to receive and send messages to
// other extensions and receive cmd result.
//
// The messages, from other extensions, sent to this extension will be
// delivered to this extension only after its on_start_done(). Therefore, all
// messages sent to this extension before on_start_done() will be queued until
// on_start_done() is triggered. On the other hand, the cmd result of the
// command sent by this extension in on_start() can be delivered to this
// extension before its on_start_done().
// delivered to this extension only after its on_start(). Therefore, all
// messages sent to this extension before on_start() will be queued until
// on_start() is triggered. On the other hand, the cmd result of the
// command sent by this extension in any time can be delivered to this
// extension before its on_start().

if (self->state < TEN_EXTENSION_STATE_ON_START &&
!ten_msg_is_cmd_result(msg)) {
// The extension is not started, and the msg is not a cmd result, so
// The extension is not initialized, and the msg is not a cmd result, so
// cache the msg to the pending list.
ten_list_push_smart_ptr_back(&self->pending_msgs, msg);
goto done;
}

if (self->state >= TEN_EXTENSION_STATE_ON_DEINIT) {
// The extension is in its de-initialization phase, and is not ready to
// handle any messages, so drop any messages.
goto done;
}

bool msg_is_cmd_result = false;

// Because 'commands' has 'results', TEN will perform some bookkeeping for
Expand Down Expand Up @@ -147,7 +153,7 @@ void ten_extension_handle_in_msg(ten_extension_t *self, ten_shared_ptr_t *msg) {
// command (if the IN path still exists) when on_cmd_done().
//
// TODO(Xilin): Currently, there is no mechanism for auto return, so the
// relevant logic code should be able to be disabled.
// relevant codes should be able to be disabled.
ten_extension_cache_cmd_result_to_in_path_for_auto_return(self, msg);

delete_msg = true;
Expand Down
62 changes: 24 additions & 38 deletions core/src/ten_runtime/extension_thread/msg_interface/common.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,50 +110,36 @@ static void ten_extension_thread_handle_in_msg_task(void *self_, void *arg) {
TEN_ASSERT(msg && ten_msg_check_integrity(msg), "Invalid argument.");
TEN_ASSERT(ten_msg_get_dest_cnt(msg) == 1, "Should not happen.");

if (ten_msg_get_type(msg) == TEN_MSG_TYPE_CMD_RESULT) {
if (ten_extension_thread_get_state(self) <=
TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE) {
// The receipt of a result is definitely because some extension of this
// extension thread previously sent out a command. As long as the
// extension can issue a command, the corresponding result must be
// delivered to and processed by the respective extension.
ten_extension_thread_handle_in_msg_sync(self, msg);
} else {
// Discard this cmd result.
}
} else {
switch (ten_extension_thread_get_state(self)) {
case TEN_EXTENSION_THREAD_STATE_INIT:
case TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS: {
switch (ten_extension_thread_get_state(self)) {
case TEN_EXTENSION_THREAD_STATE_INIT:
case TEN_EXTENSION_THREAD_STATE_CREATING_EXTENSIONS:
#if defined(_DEBUG)
ten_msg_dump(msg, NULL,
"A message (^m) comes when extension thread (%p) is in "
"state (%d)",
self, ten_extension_thread_get_state(self));
ten_msg_dump(msg, NULL,
"A message (^m) comes when extension thread (%p) is in "
"state (%d)",
self, ten_extension_thread_get_state(self));
#endif

// At this stage, the extensions have not been created yet, so any
// received messages are placed into a `pending_msgs` list. Once the
// extensions are created, the messages will be delivered to the
// corresponding extensions.
ten_list_push_smart_ptr_back(&self->pending_msgs, msg);
break;
}
// At this stage, the extensions have not been created yet, so any
// received messages are placed into a `pending_msgs` list. Once the
// extensions are created, the messages will be delivered to the
// corresponding extensions.
ten_list_push_smart_ptr_back(&self->pending_msgs, msg);
break;

case TEN_EXTENSION_THREAD_STATE_NORMAL:
case TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE:
ten_extension_thread_handle_in_msg_sync(self, msg);
break;
case TEN_EXTENSION_THREAD_STATE_NORMAL:
case TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE:
ten_extension_thread_handle_in_msg_sync(self, msg);
break;

case TEN_EXTENSION_THREAD_STATE_CLOSED:
// All the extensions of the extension thread have been closed, so
// discard all received messages directly.
break;
case TEN_EXTENSION_THREAD_STATE_CLOSED:
// All extensions are removed from this extension thread, so the only
// thing we can do is to discard this cmd result.
break;

default:
TEN_ASSERT(0, "Should not happen.");
break;
}
default:
TEN_ASSERT(0, "Should not happen.");
break;
}

ten_shared_ptr_destroy(msg);
Expand Down

0 comments on commit 57f6943

Please sign in to comment.