Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: bugfix for starting cross-app predefined graph #257

Merged
merged 10 commits into from
Nov 13, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_start_graph_get_long_running_mode(
TEN_RUNTIME_PRIVATE_API bool ten_cmd_start_graph_get_long_running_mode(
ten_shared_ptr_t *self);

TEN_RUNTIME_PRIVATE_API void ten_cmd_start_graph_set_predefined_graph_name(
ten_shared_ptr_t *self, const char *predefined_graph_name);

TEN_RUNTIME_PRIVATE_API ten_string_t *
ten_raw_cmd_start_graph_get_predefined_graph_name(ten_cmd_start_graph_t *self);

Expand Down
19 changes: 18 additions & 1 deletion core/src/ten_runtime/engine/internal/remote_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,23 @@ static void ten_engine_connect_to_remote_after_remote_is_created(
TEN_ASSERT(remote && ten_remote_check_integrity(remote, true),
"Invalid use of remote %p.", remote);

if (ten_engine_check_remote_is_duplicated(
engine, ten_string_get_raw_str(&remote->uri))) {
// Since the remote_t creation is asynchronous, the engine may have already
// established a new connection with the remote during the creation process.
// If it is found that a connection is about to be duplicated, the remote_t
// object can be directly destroyed as the physical connection has not
// actually been established yet.
// Additionally, there is no need to send the 'start_graph' command to the
// remote, as the graph must have already been started on the remote side.
TEN_LOGD("Destroy remote %p for %s because it's duplicated.", remote,
ten_string_get_raw_str(&remote->uri));

ten_remote_destroy(remote);
ten_shared_ptr_destroy(start_graph_cmd);
return;
}

// This channel might be duplicated with other channels between this TEN app
// and the remote TEN app. This situation may appear in a graph which
// contains loops.
Expand Down Expand Up @@ -452,7 +469,7 @@ ten_remote_t *ten_engine_check_remote_is_existed(ten_engine_t *self,
// This function is used to solve the connection duplication problem. If there
// are two physical connections between two TEN apps, the connection which
// connects a TEN app with a smaller URI to a TEN app with a larger URI would be
// keep, and the other connection would be dropped.
// kept, and the other connection would be dropped.
//
// ------->
// ----> TEN app 1 TEN app 2 <----
Expand Down
5 changes: 5 additions & 0 deletions core/src/ten_runtime/engine/msg_interface/start_graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ void ten_engine_handle_cmd_start_graph(ten_engine_t *self,
ten_shared_ptr_t *child_cmd = ten_msg_clone(cmd, NULL);
TEN_ASSERT(child_cmd, "Should not happen.");

// The remote app does not recognize the local app's
// 'predefined_graph_name', so this field should not be included in the
// 'start_graph' command which will be sent to the remote app.
ten_cmd_start_graph_set_predefined_graph_name(child_cmd, "");

// Use the uri of the local app to fill/override the value of 'from'
// field (even if there is any old value in the 'from' field), so that
// the remote could know who connects to them.
Expand Down
11 changes: 11 additions & 0 deletions core/src/ten_runtime/msg/cmd_base/cmd/start_graph/cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "include_internal/ten_runtime/msg/cmd_base/cmd/start_graph/cmd.h"
#include "include_internal/ten_runtime/msg/cmd_base/cmd/start_graph/field/field_info.h"
#include "include_internal/ten_runtime/msg/msg.h"
#include "include_internal/ten_utils/value/value_set.h"
#include "ten_runtime/app/app.h"
#include "ten_utils/container/list.h"
#include "ten_utils/container/list_node.h"
Expand Down Expand Up @@ -376,6 +377,16 @@ bool ten_cmd_start_graph_get_long_running_mode(ten_shared_ptr_t *self) {
return ten_raw_cmd_start_graph_get_long_running_mode(get_raw_cmd(self));
}

void ten_cmd_start_graph_set_predefined_graph_name(
ten_shared_ptr_t *self, const char *predefined_graph_name) {
TEN_ASSERT(self && ten_cmd_base_check_integrity(self) &&
ten_msg_get_type(self) == TEN_MSG_TYPE_CMD_START_GRAPH,
"Should not happen.");

ten_value_set_string(&get_raw_cmd(self)->predefined_graph_name,
predefined_graph_name);
}

ten_string_t *ten_raw_cmd_start_graph_get_predefined_graph_name(
ten_cmd_start_graph_t *self) {
TEN_ASSERT(self && ten_raw_cmd_check_integrity((ten_cmd_t *)self) &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
//
// Copyright © 2024 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
#include "gtest/gtest.h"
#include "include_internal/ten_runtime/binding/cpp/ten.h"
#include "ten_utils/lib/time.h"
#include "tests/common/client/cpp/msgpack_tcp.h"
#include "tests/ten_runtime/smoke/extension_test/util/binding/cpp/check.h"

namespace {

class test_normal_extension_1 : public ten::extension_t {
public:
explicit test_normal_extension_1(const std::string &name)
: ten::extension_t(name) {}

void on_cmd(ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_t> cmd) override {
// Always by pass the command
ten_env.send_cmd(std::move(cmd));
}
};

class test_normal_extension_2 : public ten::extension_t {
public:
explicit test_normal_extension_2(const std::string &name)
: ten::extension_t(name) {}

void on_cmd(ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_t> cmd) override {
nlohmann::json json = nlohmann::json::parse(cmd->to_json());
if (json["_ten"]["name"] == "hello_world") {
auto cmd_result = ten::cmd_result_t::create(TEN_STATUS_CODE_OK);
cmd_result->set_property("detail", "hello world, too");
ten_env.return_result(std::move(cmd_result), std::move(cmd));
}
}
};

class test_predefined_graph : public ten::extension_t {
public:
explicit test_predefined_graph(const std::string &name)
: ten::extension_t(name) {}

void on_start(ten::ten_env_t &ten_env) override {
std::string start_graph_json = R"({
"_ten": {
"type": "start_graph",
"seq_id": "222",
"dest": [{
"app": "localhost"
}],
"predefined_graph": "graph_1"
}
})"_json.dump();

ten_env.send_json(
start_graph_json.c_str(),
[this, start_graph_json](ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_result_t> cmd) {
auto status_code = cmd->get_status_code();
ASSERT_EQ(status_code, TEN_STATUS_CODE_OK);

auto graph_id = cmd->get_property_string("detail");

nlohmann::json hello_cmd =
R"({
"_ten": {
"name": "hello_world",
"seq_id": "137",
"dest":[{
"app": "msgpack://127.0.0.1:8001/",
"extension_group": "start_predefined_graph_cross_app__normal_extension_group",
"extension": "normal_extension_1"
}]
}
})"_json;
hello_cmd["_ten"]["dest"][0]["graph"] = graph_id;

ten_env.send_json(
hello_cmd.dump().c_str(),
[this](ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_result_t> cmd) {
auto status_code = cmd->get_status_code();
ASSERT_EQ(status_code, TEN_STATUS_CODE_OK);

auto detail = cmd->get_property_string("detail");
ASSERT_EQ(detail, "hello world, too");

received_hello_world_resp = true;

if (test_cmd != nullptr) {
nlohmann::json detail = {{"id", 1}, {"name", "a"}};

auto cmd_result =
ten::cmd_result_t::create(TEN_STATUS_CODE_OK);
cmd_result->set_property_from_json("detail",
detail.dump().c_str());
ten_env.return_result(std::move(cmd_result),
std::move(test_cmd));
}
});
});

ten_env.on_start_done();
}

void on_cmd(ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_t> cmd) override {
nlohmann::json json = nlohmann::json::parse(cmd->to_json());
if (json["_ten"]["name"] == "test") {
if (received_hello_world_resp) {
nlohmann::json detail = {{"id", 1}, {"name", "a"}};

auto cmd_result = ten::cmd_result_t::create(TEN_STATUS_CODE_OK);
cmd_result->set_property_from_json("detail", detail.dump().c_str());
ten_env.return_result(std::move(cmd_result), std::move(cmd));
} else {
test_cmd = std::move(cmd);
return;
}
} else {
TEN_ASSERT(0, "Should not happen.");
}
}

private:
bool received_hello_world_resp{};
std::unique_ptr<ten::cmd_t> test_cmd;
};

class test_app_1 : public ten::app_t {
public:
void on_configure(ten::ten_env_t &ten_env) override {
ten::ten_env_internal_accessor_t ten_env_internal_accessor(&ten_env);
bool rc = ten_env_internal_accessor.init_manifest_from_json(
// clang-format off
R"({
"type": "app",
"name": "test_app",
"version": "0.1.0"
})"
// clang-format on
);
ASSERT_EQ(rc, true);

rc = ten_env.init_property_from_json(
// clang-format off
R"({
"_ten": {
"uri": "msgpack://127.0.0.1:8001/",
"log_level": 2,
"predefined_graphs": [{
"name": "default",
"auto_start": false,
"singleton": true,
"nodes": [{
"type": "extension",
"name": "predefined_graph",
"app": "msgpack://127.0.0.1:8001/",
"addon": "start_predefined_graph_cross_app__predefined_graph_extension",
"extension_group": "start_predefined_graph_cross_app__predefined_graph_group"
}]
},{
"name": "graph_1",
"auto_start": false,
"nodes": [{
"type": "extension",
"name": "normal_extension_1",
"app": "msgpack://127.0.0.1:8001/",
"addon": "start_predefined_graph_cross_app__normal_extension_1",
"extension_group": "start_predefined_graph_cross_app__normal_extension_group"
}, {
"type": "extension",
"name": "normal_extension_2",
"app": "msgpack://127.0.0.1:8002/",
"addon": "start_predefined_graph_cross_app__normal_extension_2",
"extension_group": "start_predefined_graph_cross_app__normal_extension_group"
}],
"connections": [{
"app": "msgpack://127.0.0.1:8001/",
"extension_group": "start_predefined_graph_cross_app__normal_extension_group",
"extension": "normal_extension_1",
"cmd": [{
"name": "hello_world",
"dest": [{
"app": "msgpack://127.0.0.1:8002/",
"extension_group": "start_predefined_graph_cross_app__normal_extension_group",
"extension": "normal_extension_2"
}]
}]
}]
}]
}
})"
// clang-format on
);
ASSERT_EQ(rc, true);

ten_env.on_configure_done();
}
};

class test_app_2 : public ten::app_t {
public:
void on_configure(ten::ten_env_t &ten_env) override {
ten_env.init_property_from_json(
R"({
"_ten": {
"uri": "msgpack://127.0.0.1:8002/"
}
})");
ten_env.on_configure_done();
}
};

void *app_thread_1_main(TEN_UNUSED void *args) {
auto *app = new test_app_1();
app->run();
delete app;

return nullptr;
}

void *app_thread_2_main(TEN_UNUSED void *args) {
auto *app = new test_app_2();
app->run();
delete app;

return nullptr;
}

TEN_CPP_REGISTER_ADDON_AS_EXTENSION(
start_predefined_graph_cross_app__predefined_graph_extension,
test_predefined_graph);
TEN_CPP_REGISTER_ADDON_AS_EXTENSION(
start_predefined_graph_cross_app__normal_extension_1,
test_normal_extension_1);
TEN_CPP_REGISTER_ADDON_AS_EXTENSION(
start_predefined_graph_cross_app__normal_extension_2,
test_normal_extension_2);

} // namespace

TEST(ExtensionTest, StartPredefinedGraphCrossApp) { // NOLINT
auto *app_1_thread =
ten_thread_create("app thread 1", app_thread_1_main, nullptr);
auto *app_2_thread =
ten_thread_create("app thread 2", app_thread_2_main, nullptr);

ten_sleep(300);

// Create a client and connect to the app.
auto *client = new ten::msgpack_tcp_client_t("msgpack://127.0.0.1:8001/");

// Do not need to send 'start_graph' command first.
// The 'graph_id' MUST be "default" (a special string) if we want to send the
// request to predefined graph.
nlohmann::json resp = client->send_json_and_recv_resp_in_json(
R"({
"_ten": {
"name": "test",
"seq_id": "111",
"dest": [{
"app": "msgpack://127.0.0.1:8001/",
"graph": "default",
"extension_group": "start_predefined_graph_cross_app__predefined_graph_group",
"extension": "predefined_graph"
}]
}
})"_json);
ten_test::check_result_is(resp, "111", TEN_STATUS_CODE_OK,
R"({"id": 1, "name": "a"})");

delete client;

ten_thread_join(app_1_thread, -1);
ten_thread_join(app_2_thread, -1);
}
Loading
Loading