Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add immediate return result #254

Merged
merged 3 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=ExtensionTest.MultiDestSendInStopPeriod"
"--gtest_filter=GraphTest.GroupNodeMissing2Apps"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ class cmd_result_t : public msg_t {
c_msg, err != nullptr ? err->get_internal_representation() : nullptr);
}

bool is_completed(error_t *err = nullptr) const {
return ten_cmd_result_is_completed(
c_msg, err != nullptr ? err->get_internal_representation() : nullptr);
}

bool set_final(bool final, error_t *err = nullptr) {
return ten_cmd_result_set_final(
c_msg, final,
Expand Down
3 changes: 2 additions & 1 deletion core/include/ten_runtime/binding/cpp/internal/ten_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "ten_runtime/binding/cpp/internal/msg/data.h"
#include "ten_runtime/binding/cpp/internal/msg/video_frame.h"
#include "ten_runtime/common/errno.h"
#include "ten_runtime/msg/cmd_result/cmd_result.h"
#include "ten_runtime/ten.h"
#include "ten_runtime/ten_env/internal/metadata.h"
#include "ten_runtime/ten_env/internal/on_xxx_done.h"
Expand Down Expand Up @@ -1184,7 +1185,7 @@ class ten_env_t {

(*result_handler)(*cpp_ten_env, std::move(cmd_result));

if (ten_cmd_result_is_final(c_cmd_result, nullptr)) {
if (ten_cmd_result_is_completed(c_cmd_result, nullptr)) {
// Only when is_final is true should the result handler be cleared.
// Otherwise, since more result handlers are expected, the result
// handler should not be cleared.
Expand Down
3 changes: 3 additions & 0 deletions core/include/ten_runtime/msg/cmd_result/cmd_result.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ ten_cmd_result_get_status_code(ten_shared_ptr_t *self);
TEN_RUNTIME_API bool ten_cmd_result_is_final(ten_shared_ptr_t *self,
ten_error_t *err);

TEN_RUNTIME_API bool ten_cmd_result_is_completed(ten_shared_ptr_t *self,
ten_error_t *err);

TEN_RUNTIME_API bool ten_cmd_result_set_final(ten_shared_ptr_t *self,
bool is_final, ten_error_t *err);
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ TEN_RUNTIME_PRIVATE_API PyObject *ten_py_cmd_result_set_final(PyObject *self,

TEN_RUNTIME_PRIVATE_API PyObject *ten_py_cmd_result_is_final(PyObject *self,
PyObject *args);

TEN_RUNTIME_PRIVATE_API PyObject *ten_py_cmd_result_is_completed(
PyObject *self, PyObject *args);
6 changes: 6 additions & 0 deletions core/include_internal/ten_runtime/common/constant_str.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,9 @@

#define TEN_STR_DEFAULT_EXTENSION_GROUP "default_extension_group"
#define TEN_STR_TEN_TEST_EXTENSION "ten:test_extension"

// Result return policy.
#define TEN_STR_RESULT_RETURN_POLICY "result_return_policy"
#define TEN_STR_FIRST_ERROR_OR_FIRST_OK "first_error_or_first_ok"
#define TEN_STR_FIRST_ERROR_OR_LAST_OK "first_error_or_last_ok"
#define TEN_STR_EACH_IMMEDIATELY "each_immediately"
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "ten_runtime/ten_config.h"

#include "include_internal/ten_runtime/path/path_group.h"
#include "ten_utils/container/list.h"
#include "ten_utils/lib/error.h"
#include "ten_utils/lib/signature.h"
Expand All @@ -20,7 +21,8 @@
typedef struct ten_msg_dest_info_t {
ten_signature_t signature;
ten_string_t name; // The name of a message or an interface.
ten_list_t dest; // ten_weak_ptr_t of ten_extension_info_t
TEN_RESULT_RETURN_POLICY policy;
ten_list_t dest; // ten_weak_ptr_t of ten_extension_info_t
} ten_msg_dest_info_t;

TEN_RUNTIME_PRIVATE_API bool ten_msg_dest_info_check_integrity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ typedef struct ten_cmd_result_t {

ten_value_t status_code; // int32 (TEN_STATUS_CODE)

ten_value_t is_final; // bool
ten_value_t is_final; // bool
ten_value_t is_completed; // bool
} ten_cmd_result_t;

TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_validate_schema(
Expand Down Expand Up @@ -103,9 +104,18 @@ TEN_RUNTIME_API ten_json_t *ten_cmd_result_to_json(ten_shared_ptr_t *self,
TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_is_final(ten_cmd_result_t *self,
ten_error_t *err);

TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_is_completed(
ten_cmd_result_t *self, ten_error_t *err);

TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_set_final(
ten_cmd_result_t *self, bool is_final, ten_error_t *err);

TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_set_completed(
ten_cmd_result_t *self, bool is_completed, ten_error_t *err);

TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_result_loop_all_fields(
ten_msg_t *self, ten_raw_msg_process_one_field_func_t cb, void *user_data,
ten_error_t *err);

TEN_RUNTIME_PRIVATE_API bool ten_cmd_result_set_completed(
ten_shared_ptr_t *self, bool is_completed, ten_error_t *err);
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//
// Copyright © 2024 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
#pragma once

#include "ten_runtime/ten_config.h"

#include <stdbool.h>

#include "include_internal/ten_runtime/msg/loop_fields.h"
#include "ten_utils/container/list.h"

typedef struct ten_msg_t ten_msg_t;
typedef struct ten_http_t ten_http_t;
typedef struct ten_error_t ten_error_t;

TEN_RUNTIME_PRIVATE_API void ten_cmd_result_copy_is_completed(
ten_msg_t *self, ten_msg_t *src, ten_list_t *excluded_field_ids);

TEN_RUNTIME_PRIVATE_API bool ten_cmd_result_process_is_completed(
ten_msg_t *self, ten_raw_msg_process_one_field_func_t cb, void *user_data,
ten_error_t *err);
22 changes: 3 additions & 19 deletions core/include_internal/ten_runtime/path/path_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include "include_internal/ten_runtime/path/common.h"
#include "include_internal/ten_runtime/path/path_table.h"
#include "include_internal/ten_runtime/path/result_return_policy.h"
#include "ten_utils/container/list.h"

// There is a possible group relationship among ten_path_t, and that group
Expand Down Expand Up @@ -129,30 +130,13 @@
typedef struct ten_path_t ten_path_t;
typedef struct ten_msg_conversion_t ten_msg_conversion_t;

typedef enum TEN_PATH_GROUP_POLICY {
TEN_PATH_GROUP_POLICY_INVALID,

// If receive a fail result, return it, otherwise, when all OK results are
// received, return the first received one. Clear the group after returning
// the result.
TEN_PATH_GROUP_POLICY_RETURN_FIRST_OK_OR_FAIL,

// Similar to the above, except return the last received one.
TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL,

// Return each result immediately as it is received.
TEN_PATH_GROUP_POLICY_RETURN_EACH_IMMEDIATELY,

// More modes is allowed, and could be added here in case needed.
} TEN_PATH_GROUP_POLICY;

typedef struct ten_path_group_t {
ten_signature_t signature;
ten_sanitizer_thread_check_t thread_check;

ten_path_table_t *table;

TEN_PATH_GROUP_POLICY policy;
TEN_RESULT_RETURN_POLICY policy;

// Contain the members of the group.
ten_list_t members; // ten_path_t
Expand All @@ -169,7 +153,7 @@ TEN_RUNTIME_PRIVATE_API ten_list_t *ten_path_group_get_members(
ten_path_t *path);

TEN_RUNTIME_PRIVATE_API void ten_paths_create_group(
ten_list_t *paths, TEN_PATH_GROUP_POLICY policy);
ten_list_t *paths, TEN_RESULT_RETURN_POLICY policy);

TEN_RUNTIME_PRIVATE_API ten_path_t *ten_path_group_resolve(ten_path_t *path,
TEN_PATH_TYPE type);
35 changes: 35 additions & 0 deletions core/include_internal/ten_runtime/path/result_return_policy.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//
// Copyright © 2024 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
#pragma once

#include "ten_runtime/ten_config.h"

#define TEN_DEFAULT_RESULT_RETURN_POLICY \
TEN_RESULT_RETURN_POLICY_FIRST_ERROR_OR_LAST_OK

typedef enum TEN_RESULT_RETURN_POLICY {
TEN_RESULT_RETURN_POLICY_INVALID,

// If receive a fail result, return it, otherwise, when all OK results are
// received, return the first received one. Clear the group after returning
// the result.
TEN_RESULT_RETURN_POLICY_FIRST_ERROR_OR_FIRST_OK,

// Similar to the above, except return the last received one.
TEN_RESULT_RETURN_POLICY_FIRST_ERROR_OR_LAST_OK,

// Return each result immediately as it is received.
TEN_RESULT_RETURN_POLICY_EACH_IMMEDIATELY,

// More modes is allowed, and could be added here in case needed.
} TEN_RESULT_RETURN_POLICY;

TEN_RUNTIME_PRIVATE_API TEN_RESULT_RETURN_POLICY
ten_result_return_policy_from_string(const char *policy_str);

TEN_RUNTIME_PRIVATE_API const char *ten_result_return_policy_to_string(
TEN_RESULT_RETURN_POLICY policy);
3 changes: 3 additions & 0 deletions core/src/ten_runtime/binding/go/interface/ten/cmd.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@ ten_go_status_t ten_go_cmd_result_set_final(uintptr_t bridge_addr,
ten_go_status_t ten_go_cmd_result_is_final(uintptr_t bridge_addr,
bool *is_final);

ten_go_status_t ten_go_cmd_result_is_completed(uintptr_t bridge_addr,
bool *is_completed);

ten_go_handle_t ten_go_cmd_result_get_detail(uintptr_t bridge_addr);

ten_go_status_t ten_go_cmd_result_get_detail_json_and_size(
Expand Down
18 changes: 18 additions & 0 deletions core/src/ten_runtime/binding/go/interface/ten/cmd_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type CmdResult interface {
GetStatusCode() (StatusCode, error)
SetFinal(isFinal bool) error
IsFinal() (bool, error)
IsCompleted() (bool, error)
}

type cmdResult struct {
Expand Down Expand Up @@ -117,3 +118,20 @@ func (p *cmdResult) IsFinal() (bool, error) {

return bool(isFinal), nil
}

func (p *cmdResult) IsCompleted() (bool, error) {
var isCompleted C.bool
err := withCGOLimiter(func() error {
apiStatus := C.ten_go_cmd_result_is_completed(
p.getCPtr(),
&isCompleted,
)
return withGoStatus(&apiStatus)
})

if err != nil {
return false, err
}

return bool(isCompleted), nil
}
30 changes: 30 additions & 0 deletions core/src/ten_runtime/binding/go/native/msg/cmd/cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -164,3 +164,33 @@ ten_go_status_t ten_go_cmd_result_is_final(uintptr_t bridge_addr,
ten_error_deinit(&err);
return status;
}

ten_go_status_t ten_go_cmd_result_is_completed(uintptr_t bridge_addr,
bool *is_completed) {
TEN_ASSERT(bridge_addr && is_completed, "Invalid argument.");

ten_go_msg_t *msg_bridge = ten_go_msg_reinterpret(bridge_addr);
TEN_ASSERT(msg_bridge && ten_go_msg_check_integrity(msg_bridge),
"Should not happen.");

ten_shared_ptr_t *c_cmd = ten_go_msg_c_msg(msg_bridge);
TEN_ASSERT(c_cmd, "Should not happen.");

ten_go_status_t status;
ten_go_status_init_with_errno(&status, TEN_ERRNO_OK);

ten_error_t err;
ten_error_init(&err);

bool is_completed_ =
ten_cmd_result_is_completed(ten_go_msg_c_msg(msg_bridge), &err);

if (!ten_error_is_success(&err)) {
ten_go_status_set(&status, ten_error_errno(&err), ten_error_errmsg(&err));
} else {
*is_completed = is_completed_;
}

ten_error_deinit(&err);
return status;
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ def set_final(self, is_final: bool):

def is_final(self) -> bool:
return _CmdResult.is_final(self)

def is_completed(self) -> bool:
return _CmdResult.is_completed(self)
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class _CmdResult(_Msg):
def get_status_code(self) -> int: ...
def set_final(self, is_final_flag: int) -> None: ...
def is_final(self) -> bool: ...
def is_completed(self) -> bool: ...

class _Data(_Msg):
def alloc_buf(self, size: int) -> None: ...
Expand Down
23 changes: 23 additions & 0 deletions core/src/ten_runtime/binding/python/native/msg/cmd_result.c
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,29 @@ PyObject *ten_py_cmd_result_is_final(PyObject *self, PyObject *args) {
return PyBool_FromLong(is_final);
}

PyObject *ten_py_cmd_result_is_completed(PyObject *self, PyObject *args) {
ten_py_cmd_result_t *py_cmd_result = (ten_py_cmd_result_t *)self;

TEN_ASSERT(py_cmd_result &&
ten_py_msg_check_integrity((ten_py_msg_t *)py_cmd_result),
"Invalid argument.");

ten_error_t err;
ten_error_init(&err);

bool is_completed =
ten_cmd_result_is_completed(py_cmd_result->msg.c_msg, &err);

if (!ten_error_is_success(&err)) {
ten_error_deinit(&err);
return ten_py_raise_py_runtime_error_exception("Failed to is_completed.");
}

ten_error_deinit(&err);

return PyBool_FromLong(is_completed);
}

bool ten_py_cmd_result_init_for_module(PyObject *module) {
PyTypeObject *py_type = ten_py_cmd_result_py_type();
if (PyType_Ready(py_type) < 0) {
Expand Down
1 change: 1 addition & 0 deletions core/src/ten_runtime/binding/python/native/msg/type.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ PyTypeObject *ten_py_cmd_result_py_type(void) {
NULL},
{"set_final", ten_py_cmd_result_set_final, METH_VARARGS, NULL},
{"is_final", ten_py_cmd_result_is_final, METH_VARARGS, NULL},
{"is_completed", ten_py_cmd_result_is_completed, METH_VARARGS, NULL},
{NULL, NULL, 0, NULL},
};

Expand Down
4 changes: 2 additions & 2 deletions core/src/ten_runtime/engine/msg_interface/start_graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ void ten_engine_handle_cmd_start_graph(ten_engine_t *self,

if (ten_list_size(&new_works) > 1) {
// Create path group for these newly submitted 'start_graph' commands.
ten_paths_create_group(&new_works,
TEN_PATH_GROUP_POLICY_RETURN_LAST_OK_OR_FAIL);
ten_paths_create_group(
&new_works, TEN_RESULT_RETURN_POLICY_FIRST_ERROR_OR_LAST_OK);
}
ten_list_clear(&new_works);

Expand Down
Loading
Loading