Skip to content

Commit

Permalink
fix: bugfix for starting cross-app predefined graph (#257)
Browse files Browse the repository at this point in the history
Co-authored-by: Hu Yueh-Wei <[email protected]>
  • Loading branch information
sunxilin and halajohn authored Nov 13, 2024
1 parent 4e84d7b commit 59604f1
Show file tree
Hide file tree
Showing 7 changed files with 826 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ TEN_RUNTIME_PRIVATE_API bool ten_raw_cmd_start_graph_get_long_running_mode(
TEN_RUNTIME_PRIVATE_API bool ten_cmd_start_graph_get_long_running_mode(
ten_shared_ptr_t *self);

TEN_RUNTIME_PRIVATE_API void ten_cmd_start_graph_set_predefined_graph_name(
ten_shared_ptr_t *self, const char *predefined_graph_name);

TEN_RUNTIME_PRIVATE_API ten_string_t *
ten_raw_cmd_start_graph_get_predefined_graph_name(ten_cmd_start_graph_t *self);

Expand Down
19 changes: 18 additions & 1 deletion core/src/ten_runtime/engine/internal/remote_interface.c
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,23 @@ static void ten_engine_connect_to_remote_after_remote_is_created(
TEN_ASSERT(remote && ten_remote_check_integrity(remote, true),
"Invalid use of remote %p.", remote);

if (ten_engine_check_remote_is_duplicated(
engine, ten_string_get_raw_str(&remote->uri))) {
// Since the remote_t creation is asynchronous, the engine may have already
// established a new connection with the remote during the creation process.
// If it is found that a connection is about to be duplicated, the remote_t
// object can be directly destroyed as the physical connection has not
// actually been established yet.
// Additionally, there is no need to send the 'start_graph' command to the
// remote, as the graph must have already been started on the remote side.
TEN_LOGD("Destroy remote %p for %s because it's duplicated.", remote,
ten_string_get_raw_str(&remote->uri));

ten_remote_destroy(remote);
ten_shared_ptr_destroy(start_graph_cmd);
return;
}

// This channel might be duplicated with other channels between this TEN app
// and the remote TEN app. This situation may appear in a graph which
// contains loops.
Expand Down Expand Up @@ -452,7 +469,7 @@ ten_remote_t *ten_engine_check_remote_is_existed(ten_engine_t *self,
// This function is used to solve the connection duplication problem. If there
// are two physical connections between two TEN apps, the connection which
// connects a TEN app with a smaller URI to a TEN app with a larger URI would be
// keep, and the other connection would be dropped.
// kept, and the other connection would be dropped.
//
// ------->
// ----> TEN app 1 TEN app 2 <----
Expand Down
5 changes: 5 additions & 0 deletions core/src/ten_runtime/engine/msg_interface/start_graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ void ten_engine_handle_cmd_start_graph(ten_engine_t *self,
ten_shared_ptr_t *child_cmd = ten_msg_clone(cmd, NULL);
TEN_ASSERT(child_cmd, "Should not happen.");

// The remote app does not recognize the local app's
// 'predefined_graph_name', so this field should not be included in the
// 'start_graph' command which will be sent to the remote app.
ten_cmd_start_graph_set_predefined_graph_name(child_cmd, "");

// Use the uri of the local app to fill/override the value of 'from'
// field (even if there is any old value in the 'from' field), so that
// the remote could know who connects to them.
Expand Down
11 changes: 11 additions & 0 deletions core/src/ten_runtime/msg/cmd_base/cmd/start_graph/cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "include_internal/ten_runtime/msg/cmd_base/cmd/start_graph/cmd.h"
#include "include_internal/ten_runtime/msg/cmd_base/cmd/start_graph/field/field_info.h"
#include "include_internal/ten_runtime/msg/msg.h"
#include "include_internal/ten_utils/value/value_set.h"
#include "ten_runtime/app/app.h"
#include "ten_utils/container/list.h"
#include "ten_utils/container/list_node.h"
Expand Down Expand Up @@ -376,6 +377,16 @@ bool ten_cmd_start_graph_get_long_running_mode(ten_shared_ptr_t *self) {
return ten_raw_cmd_start_graph_get_long_running_mode(get_raw_cmd(self));
}

void ten_cmd_start_graph_set_predefined_graph_name(
ten_shared_ptr_t *self, const char *predefined_graph_name) {
TEN_ASSERT(self && ten_cmd_base_check_integrity(self) &&
ten_msg_get_type(self) == TEN_MSG_TYPE_CMD_START_GRAPH,
"Should not happen.");

ten_value_set_string(&get_raw_cmd(self)->predefined_graph_name,
predefined_graph_name);
}

ten_string_t *ten_raw_cmd_start_graph_get_predefined_graph_name(
ten_cmd_start_graph_t *self) {
TEN_ASSERT(self && ten_raw_cmd_check_integrity((ten_cmd_t *)self) &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
//
// Copyright © 2024 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
#include "gtest/gtest.h"
#include "include_internal/ten_runtime/binding/cpp/ten.h"
#include "ten_utils/lib/time.h"
#include "tests/common/client/cpp/msgpack_tcp.h"
#include "tests/ten_runtime/smoke/extension_test/util/binding/cpp/check.h"

namespace {

class test_normal_extension_1 : public ten::extension_t {
public:
explicit test_normal_extension_1(const std::string &name)
: ten::extension_t(name) {}

void on_cmd(ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_t> cmd) override {
// Always by pass the command
ten_env.send_cmd(std::move(cmd));
}
};

class test_normal_extension_2 : public ten::extension_t {
public:
explicit test_normal_extension_2(const std::string &name)
: ten::extension_t(name) {}

void on_cmd(ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_t> cmd) override {
nlohmann::json json = nlohmann::json::parse(cmd->to_json());
if (json["_ten"]["name"] == "hello_world") {
auto cmd_result = ten::cmd_result_t::create(TEN_STATUS_CODE_OK);
cmd_result->set_property("detail", "hello world, too");
ten_env.return_result(std::move(cmd_result), std::move(cmd));
}
}
};

class test_predefined_graph : public ten::extension_t {
public:
explicit test_predefined_graph(const std::string &name)
: ten::extension_t(name) {}

void on_start(ten::ten_env_t &ten_env) override {
std::string start_graph_json = R"({
"_ten": {
"type": "start_graph",
"seq_id": "222",
"dest": [{
"app": "localhost"
}],
"predefined_graph": "graph_1"
}
})"_json.dump();

ten_env.send_json(
start_graph_json.c_str(),
[this, start_graph_json](ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_result_t> cmd) {
auto status_code = cmd->get_status_code();
ASSERT_EQ(status_code, TEN_STATUS_CODE_OK);

auto graph_id = cmd->get_property_string("detail");

nlohmann::json hello_cmd =
R"({
"_ten": {
"name": "hello_world",
"seq_id": "137",
"dest":[{
"app": "msgpack://127.0.0.1:8001/",
"extension_group": "start_predefined_graph_cross_app__normal_extension_group",
"extension": "normal_extension_1"
}]
}
})"_json;
hello_cmd["_ten"]["dest"][0]["graph"] = graph_id;

ten_env.send_json(
hello_cmd.dump().c_str(),
[this](ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_result_t> cmd) {
auto status_code = cmd->get_status_code();
ASSERT_EQ(status_code, TEN_STATUS_CODE_OK);

auto detail = cmd->get_property_string("detail");
ASSERT_EQ(detail, "hello world, too");

received_hello_world_resp = true;

if (test_cmd != nullptr) {
nlohmann::json detail = {{"id", 1}, {"name", "a"}};

auto cmd_result =
ten::cmd_result_t::create(TEN_STATUS_CODE_OK);
cmd_result->set_property_from_json("detail",
detail.dump().c_str());
ten_env.return_result(std::move(cmd_result),
std::move(test_cmd));
}
});
});

ten_env.on_start_done();
}

void on_cmd(ten::ten_env_t &ten_env,
std::unique_ptr<ten::cmd_t> cmd) override {
nlohmann::json json = nlohmann::json::parse(cmd->to_json());
if (json["_ten"]["name"] == "test") {
if (received_hello_world_resp) {
nlohmann::json detail = {{"id", 1}, {"name", "a"}};

auto cmd_result = ten::cmd_result_t::create(TEN_STATUS_CODE_OK);
cmd_result->set_property_from_json("detail", detail.dump().c_str());
ten_env.return_result(std::move(cmd_result), std::move(cmd));
} else {
test_cmd = std::move(cmd);
return;
}
} else {
TEN_ASSERT(0, "Should not happen.");
}
}

private:
bool received_hello_world_resp{};
std::unique_ptr<ten::cmd_t> test_cmd;
};

class test_app_1 : public ten::app_t {
public:
void on_configure(ten::ten_env_t &ten_env) override {
ten::ten_env_internal_accessor_t ten_env_internal_accessor(&ten_env);
bool rc = ten_env_internal_accessor.init_manifest_from_json(
// clang-format off
R"({
"type": "app",
"name": "test_app",
"version": "0.1.0"
})"
// clang-format on
);
ASSERT_EQ(rc, true);

rc = ten_env.init_property_from_json(
// clang-format off
R"({
"_ten": {
"uri": "msgpack://127.0.0.1:8001/",
"log_level": 2,
"predefined_graphs": [{
"name": "default",
"auto_start": false,
"singleton": true,
"nodes": [{
"type": "extension",
"name": "predefined_graph",
"app": "msgpack://127.0.0.1:8001/",
"addon": "start_predefined_graph_cross_app__predefined_graph_extension",
"extension_group": "start_predefined_graph_cross_app__predefined_graph_group"
}]
},{
"name": "graph_1",
"auto_start": false,
"nodes": [{
"type": "extension",
"name": "normal_extension_1",
"app": "msgpack://127.0.0.1:8001/",
"addon": "start_predefined_graph_cross_app__normal_extension_1",
"extension_group": "start_predefined_graph_cross_app__normal_extension_group"
}, {
"type": "extension",
"name": "normal_extension_2",
"app": "msgpack://127.0.0.1:8002/",
"addon": "start_predefined_graph_cross_app__normal_extension_2",
"extension_group": "start_predefined_graph_cross_app__normal_extension_group"
}],
"connections": [{
"app": "msgpack://127.0.0.1:8001/",
"extension_group": "start_predefined_graph_cross_app__normal_extension_group",
"extension": "normal_extension_1",
"cmd": [{
"name": "hello_world",
"dest": [{
"app": "msgpack://127.0.0.1:8002/",
"extension_group": "start_predefined_graph_cross_app__normal_extension_group",
"extension": "normal_extension_2"
}]
}]
}]
}]
}
})"
// clang-format on
);
ASSERT_EQ(rc, true);

ten_env.on_configure_done();
}
};

class test_app_2 : public ten::app_t {
public:
void on_configure(ten::ten_env_t &ten_env) override {
ten_env.init_property_from_json(
R"({
"_ten": {
"uri": "msgpack://127.0.0.1:8002/"
}
})");
ten_env.on_configure_done();
}
};

void *app_thread_1_main(TEN_UNUSED void *args) {
auto *app = new test_app_1();
app->run();
delete app;

return nullptr;
}

void *app_thread_2_main(TEN_UNUSED void *args) {
auto *app = new test_app_2();
app->run();
delete app;

return nullptr;
}

TEN_CPP_REGISTER_ADDON_AS_EXTENSION(
start_predefined_graph_cross_app__predefined_graph_extension,
test_predefined_graph);
TEN_CPP_REGISTER_ADDON_AS_EXTENSION(
start_predefined_graph_cross_app__normal_extension_1,
test_normal_extension_1);
TEN_CPP_REGISTER_ADDON_AS_EXTENSION(
start_predefined_graph_cross_app__normal_extension_2,
test_normal_extension_2);

} // namespace

TEST(ExtensionTest, StartPredefinedGraphCrossApp) { // NOLINT
auto *app_1_thread =
ten_thread_create("app thread 1", app_thread_1_main, nullptr);
auto *app_2_thread =
ten_thread_create("app thread 2", app_thread_2_main, nullptr);

ten_sleep(300);

// Create a client and connect to the app.
auto *client = new ten::msgpack_tcp_client_t("msgpack://127.0.0.1:8001/");

// Do not need to send 'start_graph' command first.
// The 'graph_id' MUST be "default" (a special string) if we want to send the
// request to predefined graph.
nlohmann::json resp = client->send_json_and_recv_resp_in_json(
R"({
"_ten": {
"name": "test",
"seq_id": "111",
"dest": [{
"app": "msgpack://127.0.0.1:8001/",
"graph": "default",
"extension_group": "start_predefined_graph_cross_app__predefined_graph_group",
"extension": "predefined_graph"
}]
}
})"_json);
ten_test::check_result_is(resp, "111", TEN_STATUS_CODE_OK,
R"({"id": 1, "name": "a"})");

delete client;

ten_thread_join(app_1_thread, -1);
ten_thread_join(app_2_thread, -1);
}
Loading

0 comments on commit 59604f1

Please sign in to comment.