Skip to content

Commit

Permalink
refactor!: refine extension stop flow (#56)
Browse files Browse the repository at this point in the history
refactor!: refine extension stop flow

In the past, all extensions under the same engine within the same app would wait until everyone reached the `on_stop_done` phase before proceeding to the `on_deinit` phase. Considering the upcoming cloud-edge integration, the stop flow for extensions is now designed to be fully decoupled from one another. Any stop sequence dependencies between extensions must be explicitly implemented in the extension code as needed.

BREAKING CHANGE: The stop flow for all extensions is now completely decoupled, and each extension independently completes its own stop flow. If there is a priority order between multiple extensions during the stop process, it must be handled separately.
  • Loading branch information
halajohn authored Sep 29, 2024
1 parent b96e154 commit e1a076e
Show file tree
Hide file tree
Showing 33 changed files with 238 additions and 989 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=InterfaceTest.OutResultError"
"--gtest_filter=ExtensionTest.MultiDestSendInStopPeriod"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
22 changes: 4 additions & 18 deletions core/include_internal/ten_runtime/extension/extension.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include "ten_utils/value/value.h"

#define TEN_EXTENSION_SIGNATURE 0xE1627776E09A723CU
#define TEN_EXTENSION_UNIQUE_NAME_IN_GRAPH_PATTERN "%s::%s"

typedef struct ten_env_t ten_env_t;
typedef struct ten_extension_t ten_extension_t;
Expand Down Expand Up @@ -56,12 +55,8 @@ typedef struct ten_timer_t ten_timer_t;
// - on_start ~ on_stop_done: Normal sending and receiving of all
// messages and results.
//
// TODO(WEi): Does it still needs to be considered?
// [ After everyone has completed on_stop_done, they will collectively move into
// on_deinit. ]
//
// - on_deinit ~ on_deinit_done: Handles its own de-initialization; cannot send
// or receive messages.
// - on_deinit ~ on_deinit_done: Handles its own de-initialization; cannot
// receive messages.
typedef enum TEN_EXTENSION_STATE {
TEN_EXTENSION_STATE_INIT,

Expand All @@ -77,8 +72,8 @@ typedef enum TEN_EXTENSION_STATE {
// on_start_done() is completed.
TEN_EXTENSION_STATE_ON_START_DONE,

// on_stop_done() is completed and could proceed to be closed.
TEN_EXTENSION_STATE_CLOSING,
// on_stop_done() is completed.
TEN_EXTENSION_STATE_ON_STOP_DONE,

// on_deinit() is called.
TEN_EXTENSION_STATE_ON_DEINIT,
Expand Down Expand Up @@ -159,12 +154,6 @@ struct ten_extension_t {
ten_addon_host_t *addon_host;
ten_string_t name;

// The extension name is unique in the extension group to which it belongs,
// and may not be unique in the graph to which it belongs. But in some
// contexts, a unique name in a graph is needed. The pattern of the unique
// extension name in a graph is '${extension_group_name}::${extension_name}'.
ten_string_t unique_name_in_graph;

ten_string_t base_dir;

ten_env_t *ten_env;
Expand Down Expand Up @@ -257,9 +246,6 @@ TEN_RUNTIME_PRIVATE_API ten_path_in_t *
ten_extension_get_cmd_return_path_from_itself(ten_extension_t *self,
const char *cmd_id);

TEN_RUNTIME_PRIVATE_API void ten_extension_set_unique_name_in_graph(
ten_extension_t *self);

TEN_RUNTIME_PRIVATE_API bool ten_extension_handle_out_msg(
ten_extension_t *extension, ten_shared_ptr_t *msg, ten_error_t *err);

Expand Down
14 changes: 0 additions & 14 deletions core/include_internal/ten_runtime/extension/on_xxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,6 @@

#include "ten_runtime/ten_env/ten_env.h"

/**
* @brief Indicate that extension on_start/on_stop/on_deinit is completed.
*/
typedef struct ten_extension_on_start_stop_deinit_done_t {
// Indicates which extension's on_start/on_stop/on_deinit ends.
ten_extension_t *extension;
} ten_extension_on_start_stop_deinit_done_t;

TEN_RUNTIME_PRIVATE_API ten_extension_on_start_stop_deinit_done_t *
ten_extension_on_start_stop_deinit_done_create(ten_extension_t *extension);

TEN_RUNTIME_PRIVATE_API void ten_extension_on_start_stop_deinit_done_destroy(
ten_extension_on_start_stop_deinit_done_t *self);

TEN_RUNTIME_PRIVATE_API void ten_extension_on_configure_done(ten_env_t *self);

TEN_RUNTIME_PRIVATE_API void ten_extension_on_init_done(ten_env_t *self);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,6 @@ struct ten_extension_context_t {
ten_list_t extension_threads;

size_t extension_threads_cnt_of_initted;
size_t extension_threads_cnt_of_all_extensions_stopped;
size_t extension_threads_cnt_of_closing_flag_is_set;
size_t extension_threads_cnt_of_closed;

ten_list_t extension_groups_info_from_graph;
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,4 @@ TEN_RUNTIME_PRIVATE_API void ten_extension_store_del_extension(
ten_extension_store_t *self, ten_extension_t *extension);

TEN_RUNTIME_PRIVATE_API ten_extension_t *ten_extension_store_find_extension(
ten_extension_store_t *self, const char *extension_group_name,
const char *extension_name, bool of_extension_thread, bool check_thread);
ten_extension_store_t *self, const char *extension_name, bool check_thread);
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,8 @@ typedef enum TEN_EXTENSION_THREAD_STATE {
TEN_EXTENSION_THREAD_STATE_NORMAL,
TEN_EXTENSION_THREAD_STATE_PREPARE_TO_CLOSE,

// All the extensions of the extension thread have been closed, so the
// extension thread can now proceed with its own closing flow. Additionally,
// since the extensions have been closed, any messages received by the
// extension thread from this point on will be directly dropped.
TEN_EXTENSION_THREAD_STATE_CLOSING,

// The closing procedure is completed, so the extension thread can be
// destroyed safely.
// All extensions of this extension thread are closed, and removed from this
// extension thread.
TEN_EXTENSION_THREAD_STATE_CLOSED,
} TEN_EXTENSION_THREAD_STATE;

Expand All @@ -60,9 +54,7 @@ typedef struct ten_extension_thread_t {
ten_list_t pending_msgs;

ten_list_t extensions; // ten_extension_t*
size_t extensions_cnt_of_deleted_from_engine;
size_t extensions_cnt_of_on_stop_done;
size_t extensions_cnt_of_set_closing_flag;
size_t extensions_cnt_of_deleted;

// Store all extensions (ten_extension_t*) belong to this extension thread.
ten_extension_store_t *extension_store;
Expand Down
12 changes: 0 additions & 12 deletions core/include_internal/ten_runtime/extension_thread/on_xxx.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,6 @@ TEN_RUNTIME_API void ten_extension_inherit_thread_ownership(
TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_on_extension_group_on_init_done(void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_extension_on_stop_done(
void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void ten_extension_thread_call_all_extensions_on_deinit(
void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void ten_extension_thread_pre_close(void *self_,
void *arg);

TEN_RUNTIME_PRIVATE_API void ten_extension_thread_on_extension_set_closing_flag(
void *self_, void *arg);

TEN_RUNTIME_PRIVATE_API void
ten_extension_thread_start_life_cycle_of_all_extensions(void *self_, void *arg);

Expand Down
32 changes: 7 additions & 25 deletions core/include_internal/ten_runtime/test/extension_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,39 +6,21 @@
//
#include "ten_runtime/ten_config.h"

#include "include_internal/ten_runtime/extension/extension.h"
#include "ten_runtime/ten_env_proxy/ten_env_proxy.h"

// =-=-= 改名
typedef struct ten_extension_test_t {
ten_extension_thread_t *test_extension_thread;
ten_extension_group_t *test_extension_group;
ten_list_t pre_created_extensions;
ten_thread_t *test_thread;
} ten_extension_test_t;

TEN_RUNTIME_API ten_extension_test_t *ten_extension_test_create(
ten_extension_t *test_extension, ten_extension_t *target_extension);

TEN_RUNTIME_API void ten_extension_test_start(ten_extension_test_t *self);

TEN_RUNTIME_API void ten_extension_test_wait(ten_extension_test_t *self);

TEN_RUNTIME_API void ten_extension_test_destroy(ten_extension_test_t *self);

typedef struct ten_extension_test_new_t {
ten_thread_t *test_app_thread;
ten_string_t test_extension_addon_name;
ten_env_proxy_t *test_app_ten_env_proxy;
ten_event_t *test_app_ten_env_proxy_create_completed;
} ten_extension_test_new_t;
} ten_extension_test_t;

TEN_RUNTIME_API ten_extension_test_new_t *ten_extension_test_create_new(void);
TEN_RUNTIME_API ten_extension_test_t *ten_extension_test_create(void);

TEN_RUNTIME_API void ten_extension_test_destroy_new(
ten_extension_test_new_t *self);
TEN_RUNTIME_API void ten_extension_test_destroy(ten_extension_test_t *self);

TEN_RUNTIME_API void ten_extension_test_start_new(
ten_extension_test_new_t *self);
TEN_RUNTIME_API void ten_extension_test_start(ten_extension_test_t *self);

TEN_RUNTIME_API void ten_extension_test_add_addon(
ten_extension_test_new_t *self, const char *addon_name);
TEN_RUNTIME_API void ten_extension_test_add_addon(ten_extension_test_t *self,
const char *addon_name);
30 changes: 10 additions & 20 deletions core/src/ten_runtime/engine/on_xxx.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,34 +39,24 @@ static void ten_engine_on_extension_thread_is_ready(
ten_extension_thread_check_integrity(extension_thread, false),
"Should not happen.");

self->extension_context->extension_threads_cnt_of_initted++;
if (self->extension_context->extension_threads_cnt_of_initted ==
ten_list_size(&self->extension_context->extension_threads)) {
ten_extension_context_t *extension_context = self->extension_context;
TEN_ASSERT(extension_context &&
ten_extension_context_check_integrity(extension_context, true),
"Should not happen.");

extension_context->extension_threads_cnt_of_initted++;
if (extension_context->extension_threads_cnt_of_initted ==
ten_list_size(&extension_context->extension_threads)) {
TEN_LOGD("[%s] All extension threads are initted.",
ten_engine_get_name(self));

// All the extension threads requested by this command have been completed,
// return the result for this command.
//
// We cannot notify the engine that this thread is started completely
// _before_ this line. Because the codes of previous states would modify
// some data structures (i.e., to add the extension information to the
// extension store, etc.), and when the main (I/O) thread (ex: the app
// thread or the engine thread) receives messages from remote apps, the main
// thread would 'search/read' those data structures to find the correct
// extension to dispatch those received messages.
//
// After notifying the engine, the engine would send a 'OK' status back
// to the previous graph stage, and finally notifying the client that
// the whole graph is built-up successfully, so that the client will
// start to send commands into the graph.
//
// So if we notify the engine too early (before this line), that would
// cause a very seldom timing issue that when the extension thread is
// still modifying those data structures, and the main (I/O) thread (ex: the
// app thread or the engine thread) is started to receive messages, and the
// main (I/O) thread will fail to find the correct destination extension
// (Because the extension doesn't register itself to the extension store).

ten_string_t *graph_name = &self->graph_name;

Expand All @@ -75,7 +65,7 @@ static void ten_engine_on_extension_thread_is_ready(
: ten_string_get_raw_str(graph_name);

ten_shared_ptr_t *state_requester_cmd =
extension_thread->extension_context->state_requester_cmd;
extension_context->state_requester_cmd;

ten_shared_ptr_t *returned_cmd =
ten_cmd_result_create_from_cmd(TEN_STATUS_CODE_OK, state_requester_cmd);
Expand All @@ -85,7 +75,7 @@ static void ten_engine_on_extension_thread_is_ready(
// We have sent the result for the original state_requester_cmd, so it is
// useless now, destroy it.
ten_shared_ptr_destroy(state_requester_cmd);
extension_thread->extension_context->state_requester_cmd = NULL;
extension_context->state_requester_cmd = NULL;

#if defined(_DEBUG)
ten_msg_dump(
Expand Down
51 changes: 7 additions & 44 deletions core/src/ten_runtime/extension/extension.c
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ ten_extension_t *ten_extension_create(
self->addon_host = NULL;
ten_string_init_formatted(&self->name, "%s", name);

ten_string_init(&self->unique_name_in_graph);
ten_string_init(&self->base_dir);

self->ten_env = NULL;
Expand Down Expand Up @@ -170,7 +169,6 @@ void ten_extension_destroy(ten_extension_t *self) {

ten_env_destroy(self->ten_env);
ten_string_deinit(&self->name);
ten_string_deinit(&self->unique_name_in_graph);

ten_value_deinit(&self->manifest);
ten_value_deinit(&self->property);
Expand Down Expand Up @@ -541,14 +539,6 @@ static TEN_EXTENSION_DETERMINE_OUT_MSGS_RESULT ten_extension_determine_out_msgs(
TEN_ASSERT(msg && ten_msg_check_integrity(msg), "Should not happen.");
TEN_ASSERT(result_msgs, "Should not happen.");

{
TEN_UNUSED ten_extension_thread_t *extension_thread =
self->extension_thread;
TEN_ASSERT(extension_thread, "Invalid argument.");
TEN_ASSERT(ten_extension_thread_check_integrity(extension_thread, true),
"Invalid use of extension_thread %p.", extension_thread);
}

if (ten_msg_get_dest_cnt(msg) > 0) {
// Because the messages has already had destinations, no matter it is a
// backward path or a forward path, dispatch the message according to the
Expand Down Expand Up @@ -626,16 +616,6 @@ bool ten_extension_handle_out_msg(ten_extension_t *self, ten_shared_ptr_t *msg,
TEN_ASSERT(msg && ten_msg_check_integrity(msg), "Should not happen.");
TEN_ASSERT(err && ten_error_check_integrity(err), "Invalid argument.");

if (ten_extension_thread_get_state(self->extension_thread) >=
TEN_EXTENSION_THREAD_STATE_CLOSING) {
// We should not handle anymore messages, because when the extension thread
// enters its 'closing' stage, it means the graph relevant
// resources/structures (i.e., ten_all_msg_type_dest_info_t) might
// have already been destroyed. Therefore, it's unsafe to continue to handle
// messages.
return false;
}

// The source of the out message is the current extension.
ten_msg_set_src_to_extension(msg, self);

Expand Down Expand Up @@ -817,13 +797,8 @@ static void ten_extension_flush_all_pending_msgs(ten_extension_t *self) {
TEN_ASSERT(ten_extension_check_integrity(self, true),
"Invalid use of extension %p.", self);

// The developer expects that on_start() will execute before all on_cmd()
// events. Therefore, after on_start() has been executed, there is no need
// to wait for on_start_done() before sending all previously buffered
// messages into the extension.

// Flush the previously got messages, which are received before
// on_start_done(), into the extension.
// on_init_done(), into the extension.
ten_extension_thread_t *extension_thread = self->extension_thread;
ten_list_foreach (&extension_thread->pending_msgs, iter) {
ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node);
Expand All @@ -839,7 +814,7 @@ static void ten_extension_flush_all_pending_msgs(ten_extension_t *self) {
}

// Flush the previously got messages, which are received before
// on_start_done(), into the extension.
// on_init_done(), into the extension.
ten_list_foreach (&self->pending_msgs, iter) {
ten_shared_ptr_t *msg = ten_smart_ptr_listnode_get(iter.node);
TEN_ASSERT(msg, "Should not happen.");
Expand All @@ -861,6 +836,11 @@ void ten_extension_on_start(ten_extension_t *self) {
if (self->on_start) {
self->on_start(self, self->ten_env);

// The developer expects that on_start() will execute before all on_cmd()
// events. Therefore, after on_start() has been executed, there is no need
// to wait for on_start_done() before sending all previously buffered
// messages into the extension.

ten_extension_flush_all_pending_msgs(self);
} else {
ten_extension_flush_all_pending_msgs(self);
Expand Down Expand Up @@ -1067,23 +1047,6 @@ ten_path_in_t *ten_extension_get_cmd_return_path_from_itself(
return ten_ptr_listnode_get(returned_node);
}

void ten_extension_set_unique_name_in_graph(ten_extension_t *self) {
TEN_ASSERT(self && ten_extension_check_integrity(self, true) &&
self->extension_thread,
"Should not happen.");

ten_extension_group_t *extension_group =
self->extension_thread->extension_group;
TEN_ASSERT(extension_group &&
ten_extension_group_check_integrity(extension_group, true),
"Should not happen.");

ten_string_set_formatted(&self->unique_name_in_graph,
TEN_EXTENSION_UNIQUE_NAME_IN_GRAPH_PATTERN,
ten_string_get_raw_str(&extension_group->name),
ten_string_get_raw_str(&self->name));
}

ten_string_t *ten_extension_get_base_dir(ten_extension_t *self) {
TEN_ASSERT(self && ten_extension_check_integrity(self, true),
"Invalid argument.");
Expand Down
Loading

0 comments on commit e1a076e

Please sign in to comment.