Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: provide error_handler in send_data/A/Vframe #341

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 99 additions & 17 deletions core/include/ten_runtime/binding/cpp/detail/ten_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "ten_utils/lib/error.h"
#include "ten_utils/log/log.h"
#include "ten_utils/macro/check.h"
#include "ten_utils/macro/mark.h"
#include "ten_utils/value/value.h"
#include "ten_utils/value/value_json.h"

Expand All @@ -47,7 +46,9 @@ class ten_env_proxy_t;
class ten_env_internal_accessor_t;

using result_handler_func_t =
std::function<void(ten_env_t &, std::unique_ptr<cmd_result_t>)>;
std::function<void(ten_env_t &, std::unique_ptr<cmd_result_t>, error_t *)>;

using error_handler_func_t = std::function<void(ten_env_t &, error_t *)>;

class ten_env_t {
public:
Expand All @@ -72,7 +73,9 @@ class ten_env_t {
err);
}

bool send_data(std::unique_ptr<data_t> &&data, error_t *err = nullptr) {
bool send_data(std::unique_ptr<data_t> &&data,
error_handler_func_t &&error_handler = nullptr,
error_t *err = nullptr) {
TEN_ASSERT(c_ten_env && data, "Should not happen.");

if (!data) {
Expand All @@ -88,20 +91,39 @@ class ten_env_t {
return false;
}

auto rc = ten_env_send_data(
c_ten_env, data->get_underlying_msg(),
err != nullptr ? err->get_internal_representation() : nullptr);
auto rc = false;

if (error_handler == nullptr) {
rc = ten_env_send_data(
c_ten_env, data->get_underlying_msg(), nullptr, nullptr,
err != nullptr ? err->get_internal_representation() : nullptr);
} else {
auto *error_handler_ptr =
new error_handler_func_t(std::move(error_handler));

rc = ten_env_send_data(
c_ten_env, data->get_underlying_msg(), proxy_handle_error,
error_handler_ptr,
err != nullptr ? err->get_internal_representation() : nullptr);
if (!rc) {
delete error_handler_ptr;
}
}

if (rc) {
// Only when the data has been sent successfully, we should give back
// the ownership of the data message to the TEN runtime.
auto *cpp_data_ptr = data.release();
delete cpp_data_ptr;
} else {
TEN_LOGE("Failed to send_data: %s.", data->get_name());
}

return rc;
}

bool send_video_frame(std::unique_ptr<video_frame_t> &&frame,
error_handler_func_t &&error_handler = nullptr,
error_t *err = nullptr) {
TEN_ASSERT(c_ten_env, "Should not happen.");

Expand All @@ -110,21 +132,39 @@ class ten_env_t {
return false;
}

auto rc = ten_env_send_video_frame(
c_ten_env, frame->get_underlying_msg(),
err != nullptr ? err->get_internal_representation() : nullptr);
auto rc = false;

if (error_handler == nullptr) {
rc = ten_env_send_video_frame(
c_ten_env, frame->get_underlying_msg(), nullptr, nullptr,
err != nullptr ? err->get_internal_representation() : nullptr);
} else {
auto *error_handler_ptr =
new error_handler_func_t(std::move(error_handler));

rc = ten_env_send_video_frame(
c_ten_env, frame->get_underlying_msg(), proxy_handle_error,
error_handler_ptr,
err != nullptr ? err->get_internal_representation() : nullptr);
if (!rc) {
delete error_handler_ptr;
}
}

if (rc) {
// Only when the message has been sent successfully, we should give back
// the ownership of the message to the TEN runtime.
auto *cpp_frame_ptr = frame.release();
delete cpp_frame_ptr;
} else {
TEN_LOGE("Failed to send_video_frame %s.", frame->get_name());
}

return rc;
}

bool send_audio_frame(std::unique_ptr<audio_frame_t> &&frame,
error_handler_func_t &&error_handler = nullptr,
error_t *err = nullptr) {
TEN_ASSERT(c_ten_env, "Should not happen.");

Expand All @@ -133,15 +173,32 @@ class ten_env_t {
return false;
}

auto rc = ten_env_send_audio_frame(
c_ten_env, frame->get_underlying_msg(),
err != nullptr ? err->get_internal_representation() : nullptr);
auto rc = false;

if (error_handler == nullptr) {
rc = ten_env_send_audio_frame(
c_ten_env, frame->get_underlying_msg(), nullptr, nullptr,
err != nullptr ? err->get_internal_representation() : nullptr);
} else {
auto *error_handler_ptr =
new error_handler_func_t(std::move(error_handler));

rc = ten_env_send_audio_frame(
c_ten_env, frame->get_underlying_msg(), proxy_handle_error,
error_handler_ptr,
err != nullptr ? err->get_internal_representation() : nullptr);
if (!rc) {
delete error_handler_ptr;
}
}

if (rc) {
// Only when the message has been sent successfully, we should give back
// the ownership of the message to the TEN runtime.
auto *cpp_frame_ptr = frame.release();
delete cpp_frame_ptr;
} else {
TEN_LOGE("Failed to send_audio_frame %s.", frame->get_name());
}

return rc;
Expand Down Expand Up @@ -675,10 +732,9 @@ class ten_env_t {
return rc;
}

static void proxy_handle_result(TEN_UNUSED ten_extension_t *extension,
::ten_env_t *ten_env,
ten_shared_ptr_t *c_cmd_result,
void *cb_data) {
static void proxy_handle_result(::ten_env_t *ten_env,
ten_shared_ptr_t *c_cmd_result, void *cb_data,
ten_error_t *err) {
auto *result_handler = static_cast<result_handler_func_t *>(cb_data);
auto *cpp_ten_env =
static_cast<ten_env_t *>(ten_binding_handle_get_me_in_target_lang(
Expand All @@ -700,7 +756,12 @@ class ten_env_t {
// executing, processing should be based on this cached value.
bool is_completed = ten_cmd_result_is_completed(c_cmd_result, nullptr);

(*result_handler)(*cpp_ten_env, std::move(cmd_result));
if (err != nullptr) {
error_t cpp_err(err, false);
(*result_handler)(*cpp_ten_env, std::move(cmd_result), &cpp_err);
} else {
(*result_handler)(*cpp_ten_env, std::move(cmd_result), nullptr);
}

if (is_completed) {
// Only when is_final is true should the result handler be cleared.
Expand All @@ -709,6 +770,27 @@ class ten_env_t {
delete result_handler;
}
}

static void proxy_handle_error(::ten_env_t *ten_env,
ten_shared_ptr_t *c_cmd_result, void *cb_data,
ten_error_t *err) {
TEN_ASSERT(c_cmd_result == nullptr, "Should not happen.");

auto *error_handler = static_cast<error_handler_func_t *>(cb_data);
auto *cpp_ten_env =
static_cast<ten_env_t *>(ten_binding_handle_get_me_in_target_lang(
reinterpret_cast<ten_binding_handle_t *>(ten_env)));

if (err == nullptr) {
(*error_handler)(*cpp_ten_env, nullptr);
} else {
error_t cpp_err(err, false);
(*error_handler)(*cpp_ten_env, &cpp_err);
}

// The error handler should be cleared.
delete error_handler;
};
};

} // namespace ten
25 changes: 17 additions & 8 deletions core/include/ten_runtime/binding/cpp/detail/test/env_tester.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ namespace ten {
class ten_env_tester_t;
class extension_tester_t;

using ten_env_tester_send_cmd_result_handler_func_t =
std::function<void(ten_env_tester_t &, std::unique_ptr<cmd_result_t>)>;
using ten_env_tester_send_cmd_result_handler_func_t = std::function<void(
ten_env_tester_t &, std::unique_ptr<cmd_result_t>, error_t *)>;

class ten_env_tester_t {
public:
Expand Down Expand Up @@ -194,19 +194,28 @@ class ten_env_tester_t {
}

static void proxy_handle_result(::ten_env_tester_t *c_ten_env_tester,
ten_shared_ptr_t *c_cmd_result,
void *cb_data) {
ten_shared_ptr_t *c_cmd_result, void *cb_data,
ten_error_t *err) {
auto *result_handler =
static_cast<ten_env_tester_send_cmd_result_handler_func_t *>(cb_data);
auto *cpp_ten_env_tester = static_cast<ten_env_tester_t *>(
ten_binding_handle_get_me_in_target_lang(
reinterpret_cast<ten_binding_handle_t *>(c_ten_env_tester)));

auto cmd_result = cmd_result_t::create(
// Clone a C shared_ptr to be owned by the C++ instance.
ten_shared_ptr_clone(c_cmd_result));
std::unique_ptr<cmd_result_t> cmd_result = nullptr;

(*result_handler)(*cpp_ten_env_tester, std::move(cmd_result));
if (c_cmd_result != nullptr) {
cmd_result = cmd_result_t::create(
// Clone a C shared_ptr to be owned by the C++ instance.
ten_shared_ptr_clone(c_cmd_result));
}

if (err != nullptr) {
error_t cpp_err(err, false);
(*result_handler)(*cpp_ten_env_tester, std::move(cmd_result), &cpp_err);
} else {
(*result_handler)(*cpp_ten_env_tester, std::move(cmd_result), nullptr);
}

if (ten_cmd_result_is_final(c_cmd_result, nullptr)) {
// Only when is_final is true should the result handler be cleared.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
namespace ten {

using ten_client_proxy_send_cmd_result_handler_func_t =
std::function<void(std::unique_ptr<ten::cmd_result_t>)>;
std::function<void(std::unique_ptr<ten::cmd_result_t>, error_t *err)>;

class ten_client_proxy_event_handler_t {
public:
Expand Down Expand Up @@ -86,12 +86,13 @@ class ten_client_proxy_internal_impl_t : public ten::extension_tester_t {
}
}

static void proxy_on_cmd_response(
static void proxy_on_cmd_result(
std::unique_ptr<ten::cmd_result_t> cmd_result,
const ten::ten_client_proxy_send_cmd_result_handler_func_t
&result_handler) {
&result_handler,
error_t *err) {
if (result_handler) {
result_handler(std::move(cmd_result));
result_handler(std::move(cmd_result), err);
}
}

Expand All @@ -118,8 +119,9 @@ class ten_client_proxy_internal_impl_t : public ten::extension_tester_t {
ten_env_tester.send_cmd(
std::move(*cmd_shared),
[result_handler](ten::ten_env_tester_t & /*ten_env_tester*/,
std::unique_ptr<ten::cmd_result_t> cmd_result) {
proxy_on_cmd_response(std::move(cmd_result), result_handler);
std::unique_ptr<ten::cmd_result_t> cmd_result,
error_t *err) {
proxy_on_cmd_result(std::move(cmd_result), result_handler, err);
});
},
nullptr);
Expand Down
33 changes: 18 additions & 15 deletions core/include/ten_runtime/ten_env/internal/send.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,41 @@
#include <stdbool.h>

#include "ten_utils/lib/error.h"
#include "ten_utils/lib/json.h"
#include "ten_utils/lib/smart_ptr.h"

typedef struct ten_env_t ten_env_t;
typedef struct ten_extension_t ten_extension_t;

typedef void (*ten_env_cmd_result_handler_func_t)(
ten_extension_t *extension, ten_env_t *ten_env,
ten_shared_ptr_t *cmd_result, void *cmd_result_handler_user_data);
typedef void (*ten_env_msg_result_handler_func_t)(
ten_env_t *ten_env, ten_shared_ptr_t *cmd_result,
void *cmd_result_handler_user_data, ten_error_t *err);

typedef bool (*ten_env_send_cmd_func_t)(
ten_env_t *self, ten_shared_ptr_t *cmd,
ten_env_cmd_result_handler_func_t result_handler,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);

TEN_RUNTIME_API bool ten_env_send_cmd(
ten_env_t *self, ten_shared_ptr_t *cmd,
ten_env_cmd_result_handler_func_t result_handler,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);

TEN_RUNTIME_API bool ten_env_send_cmd_ex(
ten_env_t *self, ten_shared_ptr_t *cmd,
ten_env_cmd_result_handler_func_t result_handler,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);

TEN_RUNTIME_API bool ten_env_send_data(ten_env_t *self, ten_shared_ptr_t *data,
ten_error_t *err);
TEN_RUNTIME_API bool ten_env_send_data(
ten_env_t *self, ten_shared_ptr_t *data,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);

TEN_RUNTIME_API bool ten_env_send_video_frame(ten_env_t *self,
ten_shared_ptr_t *frame,
ten_error_t *err);
TEN_RUNTIME_API bool ten_env_send_video_frame(
ten_env_t *self, ten_shared_ptr_t *frame,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);

TEN_RUNTIME_API bool ten_env_send_audio_frame(ten_env_t *self,
ten_shared_ptr_t *frame,
ten_error_t *err);
TEN_RUNTIME_API bool ten_env_send_audio_frame(
ten_env_t *self, ten_shared_ptr_t *frame,
ten_env_msg_result_handler_func_t result_handler,
void *result_handler_user_data, ten_error_t *err);
3 changes: 2 additions & 1 deletion core/include/ten_runtime/test/env_tester.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ TEN_RUNTIME_API bool ten_env_tester_on_start_done(ten_env_tester_t *self,
ten_error_t *err);

typedef void (*ten_env_tester_cmd_result_handler_func_t)(
ten_env_tester_t *self, ten_shared_ptr_t *cmd_result, void *user_data);
ten_env_tester_t *self, ten_shared_ptr_t *cmd_result, void *user_data,
ten_error_t *error);

TEN_RUNTIME_API bool ten_env_tester_send_cmd(
ten_env_tester_t *self, ten_shared_ptr_t *cmd,
Expand Down
18 changes: 9 additions & 9 deletions core/include_internal/ten_runtime/binding/go/internal/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ TEN_RUNTIME_PRIVATE_API void ten_go_bridge_destroy_c_part(
TEN_RUNTIME_PRIVATE_API void ten_go_bridge_destroy_go_part(
ten_go_bridge_t *self);

TEN_RUNTIME_PRIVATE_API void ten_go_status_init_with_errno(
ten_go_status_t *self, ten_errno_t errno);
TEN_RUNTIME_PRIVATE_API void ten_go_error_init_with_errno(ten_go_error_t *self,
ten_errno_t errno);

TEN_RUNTIME_PRIVATE_API void ten_go_status_from_error(ten_go_status_t *self,
ten_error_t *err);
TEN_RUNTIME_PRIVATE_API void ten_go_error_from_error(ten_go_error_t *self,
ten_error_t *err);

TEN_RUNTIME_PRIVATE_API void ten_go_status_set_errno(ten_go_status_t *self,
ten_errno_t errno);
TEN_RUNTIME_PRIVATE_API void ten_go_error_set_errno(ten_go_error_t *self,
ten_errno_t errno);

TEN_RUNTIME_PRIVATE_API void ten_go_status_set(ten_go_status_t *self,
ten_errno_t errno,
const char *msg);
TEN_RUNTIME_PRIVATE_API void ten_go_error_set(ten_go_error_t *self,
ten_errno_t errno,
const char *msg);
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@
#include "ten_utils/lib/json.h"

TEN_RUNTIME_PRIVATE_API ten_json_t *ten_go_json_loads(const void *json_bytes,
int json_bytes_len,
ten_go_status_t *status);
int json_bytes_len,
ten_go_error_t *status);
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ TEN_RUNTIME_PRIVATE_API ten_go_handle_t
ten_go_ten_env_go_handle(ten_go_ten_env_t *self);

extern void tenGoCAsyncApiCallback(ten_go_handle_t callback,
ten_go_status_t status);
ten_go_error_t cgo_error);
Loading