Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: modify the internal implementation logic of path_group #30

Merged
merged 3 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/include_internal/ten_runtime/path/path.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ typedef struct ten_path_t {
ten_loc_t src_loc;
ten_loc_t dest_loc;

ten_path_group_t *group;
ten_shared_ptr_t *group; // a shared_ptr of ten_path_group_t

// We will cache the returned cmd result here. If someone does not call
// return_result() before Extension::onCmd is actually completed
Expand Down
44 changes: 12 additions & 32 deletions core/include_internal/ten_runtime/path/path_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,6 @@
typedef struct ten_path_t ten_path_t;
typedef struct ten_msg_conversion_operation_t ten_msg_conversion_operation_t;

typedef enum TEN_PATH_GROUP_ROLE {
TEN_PATH_GROUP_ROLE_INVALID,

TEN_PATH_GROUP_ROLE_MASTER,
TEN_PATH_GROUP_ROLE_SLAVE,
} TEN_PATH_GROUP_ROLE;

typedef enum TEN_PATH_GROUP_POLICY {
TEN_PATH_GROUP_POLICY_INVALID,

Expand All @@ -155,29 +148,18 @@ typedef struct ten_path_group_t {
ten_sanitizer_thread_check_t thread_check;

ten_path_table_t *table;
TEN_PATH_GROUP_ROLE role;

union {
// There should be only 1 master in a group.
struct {
TEN_PATH_GROUP_POLICY policy;
ten_list_t members; // Contain the members of the group.

// If this flag is set, none of the paths in the path_group can be used to
// trace back cmd results anymore.
//
// For example, if the policy is ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST
// and one of the paths in the group has received a fail cmd result, then
// the 'has_been_processed' flag will be set to true to prevent the left
// paths in the group from transmitting cmd results.
bool has_been_processed;
} master;

// There might be multiple slaves in a group.
struct {
ten_path_t *master; // Point to the master of the group.
} slave;
};

TEN_PATH_GROUP_POLICY policy;
ten_list_t members; // Contain the members of the group.

// If this flag is set, none of the paths in the path_group can be used to
// trace back cmd results anymore.
//
// For example, if the policy is ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST
// and one of the paths in the group has received a fail cmd result, then
// the 'has_been_processed' flag will be set to true to prevent the left
// paths in the group from transmitting cmd results.
bool has_been_processed;
} ten_path_group_t;

TEN_RUNTIME_PRIVATE_API bool ten_path_group_check_integrity(
Expand All @@ -195,5 +177,3 @@ TEN_RUNTIME_PRIVATE_API void ten_paths_create_group(

TEN_RUNTIME_PRIVATE_API ten_path_t *ten_path_group_resolve(ten_path_t *path,
TEN_PATH_TYPE type);

TEN_RUNTIME_PRIVATE_API ten_path_t *ten_path_group_get_master(ten_path_t *path);
13 changes: 7 additions & 6 deletions core/src/ten_runtime/path/path.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
#include "include_internal/ten_runtime/msg/msg.h"
#include "include_internal/ten_runtime/msg_conversion/msg_conversion_operation/base.h"
#include "include_internal/ten_runtime/path/path_group.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/lib/signature.h"
#include "ten_utils/lib/smart_ptr.h"
#include "ten_utils/lib/string.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/sanitizer/thread_check.h"

bool ten_path_check_integrity(ten_path_t *self, bool check_thread) {
Expand Down Expand Up @@ -86,7 +86,7 @@ void ten_path_deinit(ten_path_t *self) {
ten_loc_deinit(&self->dest_loc);

if (self->group) {
ten_path_group_destroy(self->group);
ten_shared_ptr_destroy(self->group);
self->group = NULL;
}

Expand Down Expand Up @@ -164,11 +164,12 @@ void ten_path_set_result(ten_path_t *path, ten_shared_ptr_t *cmd_result) {
if (ten_path_is_in_a_group(path)) {
// Move the current path to the last of the members of the group, so that we
// can know which one should be returned in different policies.
ten_path_group_t *path_group =
(ten_path_group_t *)ten_shared_ptr_get_data(path->group);
TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true),
"Invalid argument.");

ten_path_t *master = ten_path_group_get_master(path);
TEN_ASSERT(master, "Should not happen.");

ten_list_t *members = &(master->group->master.members);
ten_list_t *members = &path_group->members;
TEN_ASSERT(members, "Should not happen.");

ten_listnode_t *path_node = ten_list_find_ptr(members, path);
Expand Down
103 changes: 30 additions & 73 deletions core/src/ten_runtime/path/path_group.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,14 @@
#include "include_internal/ten_runtime/extension_thread/extension_thread.h"
#include "include_internal/ten_runtime/msg/msg.h"
#include "include_internal/ten_runtime/path/path.h"
#include "ten_utils/macro/check.h"
#include "ten_runtime/common/status_code.h"
#include "ten_runtime/msg/cmd_result/cmd_result.h"
#include "ten_utils/container/list.h"
#include "ten_utils/container/list_ptr.h"
#include "ten_utils/lib/alloc.h"
#include "ten_utils/lib/signature.h"
#include "ten_utils/lib/smart_ptr.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/macro/mark.h"

bool ten_path_group_check_integrity(ten_path_group_t *self, bool check_thread) {
Expand Down Expand Up @@ -60,43 +60,18 @@ bool ten_path_is_in_a_group(ten_path_t *self) {
* group, and return the group.
*/
static ten_path_group_t *ten_path_group_create(ten_path_table_t *table,
TEN_PATH_GROUP_ROLE role) {
TEN_ASSERT(role != TEN_PATH_GROUP_ROLE_INVALID, "Invalid argument.");

TEN_PATH_GROUP_POLICY policy) {
ten_path_group_t *self =
(ten_path_group_t *)TEN_MALLOC(sizeof(ten_path_group_t));
TEN_ASSERT(self, "Failed to allocate memory.");

ten_signature_set(&self->signature, TEN_PATH_GROUP_SIGNATURE);
ten_sanitizer_thread_check_init_with_current_thread(&self->thread_check);

self->table = table;
self->role = role;

return self;
}

static ten_path_group_t *ten_path_group_create_master(
ten_path_table_t *table, TEN_PATH_GROUP_POLICY policy) {
TEN_ASSERT(policy != TEN_PATH_GROUP_POLICY_INVALID, "Invalid argument.");

ten_path_group_t *self =
ten_path_group_create(table, TEN_PATH_GROUP_ROLE_MASTER);

self->master.policy = policy;
ten_list_init(&self->master.members);
self->master.has_been_processed = false;

return self;
}

static ten_path_group_t *ten_path_group_create_slave(ten_path_table_t *table,
ten_path_t *master) {
TEN_ASSERT(master, "Invalid argument.");

ten_path_group_t *self =
ten_path_group_create(table, TEN_PATH_GROUP_ROLE_SLAVE);

self->slave.master = master;
self->policy = policy;
self->has_been_processed = false;
ten_list_init(&self->members);

return self;
}
Expand All @@ -107,9 +82,10 @@ static ten_path_group_t *ten_path_group_create_slave(ten_path_table_t *table,
void ten_path_group_destroy(ten_path_group_t *self) {
TEN_ASSERT(self, "Invalid argument.");

if (self->role == TEN_PATH_GROUP_ROLE_MASTER) {
ten_list_clear(&self->master.members);
}
ten_sanitizer_thread_check_deinit(&self->thread_check);
ten_signature_set(&self->signature, 0);

ten_list_clear(&self->members);

TEN_FREE(self);
}
Expand All @@ -124,23 +100,24 @@ void ten_paths_create_group(ten_list_t *paths, TEN_PATH_GROUP_POLICY policy) {
TEN_ASSERT(paths, "Invalid argument.");
TEN_ASSERT(ten_list_size(paths) > 1, "Invalid argument.");

ten_path_t *master = NULL;
ten_path_group_t *path_group = NULL;
ten_shared_ptr_t *path_group_sp = NULL;

ten_list_foreach (paths, iter) {
ten_path_t *path = ten_ptr_listnode_get(iter.node);
TEN_ASSERT(path && ten_path_check_integrity(path, true),
"Invalid argument.");
TEN_ASSERT(path->table, "Invalid argument.");

if (iter.index == 0) {
// It's a master.
master = path;
path->group = ten_path_group_create_master(path->table, policy);
if (!path_group_sp) {
path_group = ten_path_group_create(path->table, policy);
path_group_sp = ten_shared_ptr_create(path_group, ten_path_group_destroy);
path->group = path_group_sp;
} else {
path->group = ten_path_group_create_slave(path->table, master);
path->group = ten_shared_ptr_clone(path_group_sp);
}

// The 'master' is one of the members, too.
ten_list_push_ptr_back(&master->group->master.members, path, NULL);
ten_list_push_ptr_back(&path_group->members, path, NULL);
}
}

Expand Down Expand Up @@ -193,30 +170,6 @@ static ten_path_t *ten_path_group_resolve_in_one_fail_and_all_ok_return(
return NULL;
}

/**
* @brief Takes a path as an argument and returns the master path of the group
* that the path belongs to.
*/
ten_path_t *ten_path_group_get_master(ten_path_t *path) {
TEN_ASSERT(path && ten_path_check_integrity(path, true), "Invalid argument.");
TEN_ASSERT(ten_path_is_in_a_group(path), "Invalid argument.");

ten_path_t *master = NULL;
switch (path->group->role) {
case TEN_PATH_GROUP_ROLE_MASTER:
master = path;
break;
case TEN_PATH_GROUP_ROLE_SLAVE:
master = path->group->slave.master;
break;
default:
TEN_ASSERT(0, "Should not happen.");
break;
}

return master;
}

/**
* @brief Takes a path as an argument and returns a list of all the paths that
* belong to the same group as the given path.
Expand All @@ -225,10 +178,12 @@ ten_list_t *ten_path_group_get_members(ten_path_t *path) {
TEN_ASSERT(path && ten_path_check_integrity(path, true), "Invalid argument.");
TEN_ASSERT(ten_path_is_in_a_group(path), "Invalid argument.");

ten_path_t *master = ten_path_group_get_master(path);
TEN_ASSERT(master, "Should not happen.");
ten_path_group_t *path_group =
(ten_path_group_t *)ten_shared_ptr_get_data(path->group);
TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true),
"Invalid argument.");

ten_list_t *members = &master->group->master.members;
ten_list_t *members = &path_group->members;
TEN_ASSERT(members && ten_list_check_integrity(members),
"Should not happen.");

Expand All @@ -251,14 +206,16 @@ ten_path_t *ten_path_group_resolve(ten_path_t *path, TEN_PATH_TYPE type) {
TEN_ASSERT(path && ten_path_check_integrity(path, true), "Invalid argument.");
TEN_ASSERT(ten_path_is_in_a_group(path), "Invalid argument.");

ten_path_t *master = ten_path_group_get_master(path);
TEN_ASSERT(master, "Should not happen.");
ten_path_group_t *path_group =
(ten_path_group_t *)ten_shared_ptr_get_data(path->group);
TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true),
"Invalid argument.");

ten_list_t *members = &(master->group->master.members);
ten_list_t *members = &path_group->members;
TEN_ASSERT(members && ten_list_check_integrity(members),
"Should not happen.");

switch (master->group->master.policy) {
switch (path_group->policy) {
case TEN_PATH_GROUP_POLICY_ONE_FAIL_RETURN_AND_ALL_OK_RETURN_FIRST:
return ten_path_group_resolve_in_one_fail_and_all_ok_return(members, type,
false);
Expand Down
33 changes: 14 additions & 19 deletions core/src/ten_runtime/path/path_table.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@
#include "include_internal/ten_runtime/path/path_group.h"
#include "include_internal/ten_runtime/path/path_in.h"
#include "include_internal/ten_runtime/path/path_out.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/container/list.h"
#include "ten_utils/container/list_node.h"
#include "ten_utils/lib/alloc.h"
#include "ten_utils/lib/signature.h"
#include "ten_utils/lib/smart_ptr.h"
#include "ten_utils/lib/string.h"
#include "ten_utils/lib/time.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/sanitizer/thread_check.h"

#define PATH_TABLE_REASONABLE_MAX_CNT 1000
Expand Down Expand Up @@ -387,11 +387,12 @@ static void ten_path_mark_belonging_group_processed(ten_path_t *path) {
TEN_ASSERT(path && ten_path_check_integrity(path, true), "Invalid argument.");
TEN_ASSERT(ten_path_is_in_a_group(path), "Invalid argument.");

ten_path_t *master = ten_path_group_get_master(path);
TEN_ASSERT(master && ten_path_check_integrity(path, true),
"Should not happen.");
ten_path_group_t *group =
(ten_path_group_t *)ten_shared_ptr_get_data(path->group);
TEN_ASSERT(group && ten_path_group_check_integrity(group, true),
"Invalid argument.");

master->group->master.has_been_processed = true;
group->has_been_processed = true;
}

static bool ten_path_table_remove_group_and_all_its_paths_if_needed(
Expand All @@ -403,11 +404,12 @@ static bool ten_path_table_remove_group_and_all_its_paths_if_needed(

ten_list_t *list = type == TEN_PATH_IN ? &self->in_paths : &self->out_paths;

ten_path_t *master = ten_path_group_get_master(path);
TEN_ASSERT(master && ten_path_check_integrity(path, true),
"Should not happen.");
ten_path_group_t *path_group =
(ten_path_group_t *)ten_shared_ptr_get_data(path->group);
TEN_ASSERT(path_group && ten_path_group_check_integrity(path_group, true),
"Invalid argument.");

if (!master->group->master.has_been_processed) {
if (!path_group->has_been_processed) {
// This path group has not yet completed its task, so it cannot be removed.
return false;
}
Expand All @@ -434,19 +436,12 @@ static bool ten_path_table_remove_group_and_all_its_paths_if_needed(
TEN_ASSERT(group_path && ten_path_check_integrity(group_path, true),
"Invalid argument.");

if (group_path->group->role == TEN_PATH_GROUP_ROLE_SLAVE) {
ten_listnode_t *group_path_node = ten_list_find_ptr(list, group_path);
TEN_ASSERT(group_path_node, "Should not happen.");
ten_listnode_t *group_path_node = ten_list_find_ptr(list, group_path);
TEN_ASSERT(group_path_node, "Should not happen.");

ten_list_remove_node(list, group_path_node);
}
ten_list_remove_node(list, group_path_node);
}

// Remove the master node.
ten_listnode_t *master_node = ten_list_find_ptr(list, master);
TEN_ASSERT(master_node, "Should not happen.");
ten_list_remove_node(list, master_node);

return true;
}

Expand Down
Loading