Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support retry mechanism in integrated protocol and remove ten_sleep in smoke tests #283

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@
"request": "launch",
"program": "${workspaceFolder}/out/linux/x64/tests/standalone/ten_runtime_smoke_test",
"args": [
"--gtest_filter=LogTest.LogFile"
"--gtest_filter=ExtensionTest.FailedToConnectToRemote"
],
"cwd": "${workspaceFolder}/out/linux/x64/tests/standalone/",
"env": {
Expand Down
12 changes: 7 additions & 5 deletions core/include/ten_runtime/protocol/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,18 +111,20 @@ typedef void (*ten_protocol_close_func_t)(ten_protocol_t *self);
typedef void (*ten_protocol_on_output_func_t)(ten_protocol_t *self,
ten_list_t *output);

typedef void (*ten_protocol_listen_func_t)(ten_protocol_t *self,
const char *uri);

typedef ten_connection_t *(*ten_protocol_on_client_accepted_func_t)(
ten_protocol_t *self, ten_protocol_t *new_protocol);

typedef bool (*ten_protocol_connect_to_func_t)(ten_protocol_t *self,
const char *uri);
typedef void (*ten_protocol_listen_func_t)(
ten_protocol_t *self, const char *uri,
ten_protocol_on_client_accepted_func_t on_client_accepted);

typedef void (*ten_protocol_on_server_connected_func_t)(ten_protocol_t *self,
bool success);

typedef void (*ten_protocol_connect_to_func_t)(
ten_protocol_t *self, const char *uri,
ten_protocol_on_server_connected_func_t on_server_connected);

typedef void (*ten_protocol_migrate_func_t)(ten_protocol_t *self,
ten_engine_t *engine,
ten_connection_t *connection,
Expand Down
2 changes: 2 additions & 0 deletions core/include/ten_utils/io/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,14 @@ struct ten_transport_t {
*/
void (*on_server_connected)(ten_transport_t *transport, ten_stream_t *stream,
int status);
void *on_server_connected_data;

/**
* Callback when a new rx stream is created
*/
void (*on_client_accepted)(ten_transport_t *transport, ten_stream_t *stream,
int status);
void *on_client_accepted_data;

/**
* Callback when transport closed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@

typedef struct ten_protocol_integrated_t ten_protocol_integrated_t;

TEN_RUNTIME_PRIVATE_API void ten_protocol_integrated_on_close(
ten_protocol_integrated_t *self);

TEN_RUNTIME_PRIVATE_API void ten_protocol_integrated_on_stream_closed(
ten_protocol_integrated_t *self);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,36 @@

#include "ten_runtime/ten_config.h"

#include "include_internal/ten_runtime/protocol/integrated/retry.h"
#include "include_internal/ten_runtime/protocol/protocol.h"
#include "ten_utils/io/stream.h"
#include "ten_utils/io/transport.h"

typedef struct ten_protocol_integrated_t ten_protocol_integrated_t;
typedef struct ten_timer_t ten_timer_t;

typedef void (*ten_protocol_integrated_on_input_func_t)(
ten_protocol_integrated_t *protocol, ten_buf_t buf, ten_list_t *input);

typedef ten_buf_t (*ten_protocol_integrated_on_output_func_t)(
ten_protocol_integrated_t *protocol, ten_list_t *output);

typedef struct ten_protocol_integrated_connect_to_context_t {
// The protocol which is trying to connect to the server.
ten_protocol_integrated_t *protocol;

// The server URI to connect to.
ten_string_t server_uri;

// The callback function to be called when the connection is established or
// failed.
//
// @note Set to NULL if the callback has been called.
ten_protocol_on_server_connected_func_t on_server_connected;

void *user_data;
} ten_protocol_integrated_connect_to_context_t;

/**
* @brief This is the base class of all the protocols which uses the event loop
* inside the TEN world.
Expand All @@ -43,9 +61,22 @@ struct ten_protocol_integrated_t {

// Used to convert TEN runtime messages to a buffer.
ten_protocol_integrated_on_output_func_t on_output;

// Used to configure the retry mechanism.
ten_protocol_integrated_retry_config_t retry_config;
ten_timer_t *retry_timer;
};

TEN_RUNTIME_API void ten_protocol_integrated_init(
ten_protocol_integrated_t *self, const char *name,
ten_protocol_integrated_on_input_func_t on_input,
ten_protocol_integrated_on_output_func_t on_output);

TEN_RUNTIME_PRIVATE_API ten_protocol_integrated_connect_to_context_t *
ten_protocol_integrated_connect_to_context_create(
ten_protocol_integrated_t *self, const char *server_uri,
ten_protocol_on_server_connected_func_t on_server_connected,
void *user_data);

TEN_RUNTIME_PRIVATE_API void ten_protocol_integrated_connect_to_context_destroy(
ten_protocol_integrated_connect_to_context_t *context);
28 changes: 28 additions & 0 deletions core/include_internal/ten_runtime/protocol/integrated/retry.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
//
// Copyright © 2024 Agora
// This file is part of TEN Framework, an open source project.
// Licensed under the Apache License, Version 2.0, with certain conditions.
// Refer to the "LICENSE" file in the root directory for more information.
//
#pragma once

#include "ten_runtime/ten_config.h"

#include <stdbool.h>
#include <stdint.h>

typedef struct ten_protocol_integrated_t ten_protocol_integrated_t;

typedef struct ten_protocol_integrated_retry_config_t {
// Whether to enable the retry mechanism.
bool enable;

// The max retry times.
uint32_t max_retries;

// The interval between retries.
uint32_t interval_ms;
} ten_protocol_integrated_retry_config_t;

TEN_RUNTIME_PRIVATE_API void ten_protocol_integrated_retry_config_init(
ten_protocol_integrated_retry_config_t *self);
16 changes: 1 addition & 15 deletions core/include_internal/ten_runtime/protocol/protocol.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,20 +139,6 @@ typedef struct ten_protocol_t {
// Used to handle the output TEN messages to the remote.
ten_protocol_on_output_func_t on_output;

// This is the callback function when a client connects to this protocol.
// Note that this function pointer can only be set in 'ten_protocol_listen'
// and the 'listen' method should be able to call back this function when
// the client successfully establishes a connection.
ten_protocol_on_client_accepted_func_t on_client_accepted;

// This is the callback function when this protocol connected to the remote
// server.
// Note that this function pointer can only be set in
// 'ten_protocol_connect_to' and the 'connect_to' method should be able to
// call back this function when the connection to the remote server is
// established.
ten_protocol_on_server_connected_func_t on_server_connected;

// This is the callback function when this protocol is migrated to the new
// runloop.
ten_protocol_on_migrated_func_t on_migrated;
Expand Down Expand Up @@ -211,7 +197,7 @@ TEN_RUNTIME_PRIVATE_API void ten_protocol_listen(
ten_protocol_t *self, const char *uri,
ten_protocol_on_client_accepted_func_t on_client_accepted);

TEN_RUNTIME_PRIVATE_API bool ten_protocol_connect_to(
TEN_RUNTIME_PRIVATE_API void ten_protocol_connect_to(
ten_protocol_t *self, const char *uri,
ten_protocol_on_server_connected_func_t on_server_connected);

Expand Down
13 changes: 12 additions & 1 deletion core/include_internal/ten_runtime/timer/timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,16 @@ struct ten_timer_t {
int32_t requested_times; // TEN_TIMER_INFINITE means "forever"
int32_t times;

// If the auto_restart flag is set to be 'false', it will __not__
// automatically restart timing after each timeout. Instead, the user needs to
// manually restart the timer (ten_timer_enable). When the number of timeouts
// exceeds the specified times, the timer will automatically close.
//
// Conversely, if auto_restart is set to be 'true' (by default), the timer
// will automatically decide whether to restart timing or close the timer
// based on its policy after each timeout.
bool auto_restart;

ten_loc_t src_loc;

ten_runloop_timer_t *backend;
Expand All @@ -58,7 +68,8 @@ TEN_RUNTIME_PRIVATE_API ten_timer_t *ten_timer_create_with_cmd(

TEN_RUNTIME_PRIVATE_API ten_timer_t *ten_timer_create(ten_runloop_t *runloop,
uint64_t timeout_in_us,
int32_t requested_times);
int32_t requested_times,
bool auto_restart);

TEN_RUNTIME_PRIVATE_API void ten_timer_destroy(ten_timer_t *self);

Expand Down
2 changes: 2 additions & 0 deletions core/include_internal/ten_utils/log/log.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,6 @@ TEN_UTILS_API void ten_log_global_deinit(void);

TEN_UTILS_API void ten_log_global_set_output_level(TEN_LOG_LEVEL level);

TEN_UTILS_API void ten_log_global_set_output_to_stderr(void);

TEN_UTILS_API void ten_log_global_set_output_to_file(const char *log_path);
9 changes: 9 additions & 0 deletions core/include_internal/ten_utils/log/output.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#include "ten_utils/ten_config.h"

#include <stdbool.h>

/**
* @brief String to put in the end of each log line (can be empty).
*/
Expand All @@ -20,8 +22,15 @@ typedef struct ten_string_t ten_string_t;

TEN_UTILS_API void ten_log_set_output_to_stderr(ten_log_t *self);

TEN_UTILS_PRIVATE_API void ten_log_output_to_file_cb(ten_string_t *msg,
void *user_data);

TEN_UTILS_PRIVATE_API void ten_log_output_to_stderr_cb(ten_string_t *msg,
void *user_data);

TEN_UTILS_PRIVATE_API void ten_log_set_output_to_file(ten_log_t *self,
const char *log_path);

TEN_UTILS_PRIVATE_API void ten_log_output_to_file_deinit(ten_log_t *self);

TEN_UTILS_PRIVATE_API bool ten_log_is_output_to_file(ten_log_t *self);
4 changes: 4 additions & 0 deletions core/src/ten_runtime/app/metadata.c
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ bool ten_app_handle_ten_namespace_properties(ten_app_t *self) {
self->one_event_loop_per_engine = false;
self->long_running_mode = false;

// First, set the log-related configuration to default values. This way, if
// there are no log-related properties under the `ten` namespace, the default
// values will be used.
ten_log_global_set_output_to_stderr();
ten_log_global_set_output_level(DEFAULT_LOG_OUTPUT_LEVEL);

if (!ten_app_determine_ten_namespace_properties(self,
Expand Down
13 changes: 6 additions & 7 deletions core/src/ten_runtime/connection/connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -393,14 +393,13 @@ void ten_connection_connect_to(ten_connection_t *self, const char *uri,
"Should not happen.");
}

if (self->protocol) {
bool is_connected =
ten_protocol_connect_to(self->protocol, uri, on_server_connected);
TEN_ASSERT(
self->protocol && ten_protocol_check_integrity(self->protocol, true),
"Should not happen.");
TEN_ASSERT(ten_protocol_role_is_communication(self->protocol),
"Should not happen.");

if (!is_connected && on_server_connected) {
on_server_connected(self->protocol, false);
}
}
ten_protocol_connect_to(self->protocol, uri, on_server_connected);
}

void ten_connection_attach_to_remote(ten_connection_t *self,
Expand Down
4 changes: 2 additions & 2 deletions core/src/ten_runtime/extension/internal/path_timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ ten_timer_t *ten_extension_create_timer_for_in_path(ten_extension_t *self) {

ten_timer_t *timer = ten_timer_create(
ten_extension_thread_get_attached_runloop(extension_thread),
self->path_timeout_info.check_interval, TEN_TIMER_INFINITE);
self->path_timeout_info.check_interval, TEN_TIMER_INFINITE, true);

ten_timer_set_on_triggered(timer, ten_extension_in_path_timer_on_triggered,
self);
Expand All @@ -136,7 +136,7 @@ ten_timer_t *ten_extension_create_timer_for_out_path(ten_extension_t *self) {

ten_timer_t *timer = ten_timer_create(
ten_extension_thread_get_attached_runloop(extension_thread),
self->path_timeout_info.check_interval, TEN_TIMER_INFINITE);
self->path_timeout_info.check_interval, TEN_TIMER_INFINITE, true);

ten_timer_set_on_triggered(timer, ten_extension_out_path_timer_on_triggered,
self);
Expand Down
13 changes: 12 additions & 1 deletion core/src/ten_runtime/protocol/integrated/close.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include "include_internal/ten_runtime/protocol/integrated/close.h"
#include "include_internal/ten_runtime/protocol/integrated/protocol_integrated.h"
#include "include_internal/ten_runtime/protocol/protocol.h"
#include "include_internal/ten_runtime/timer/timer.h"
#include "ten_utils/io/stream.h"
#include "ten_utils/log/log.h"
#include "ten_utils/macro/check.h"
Expand Down Expand Up @@ -39,6 +40,10 @@ static bool ten_protocol_integrated_could_be_close(
if (self->role_facility.communication_stream) {
return false;
}

if (self->retry_timer) {
return false;
}
break;
default:
TEN_ASSERT(0, "Should not happen.");
Expand All @@ -48,7 +53,7 @@ static bool ten_protocol_integrated_could_be_close(
return true;
}

static void ten_protocol_integrated_on_close(ten_protocol_integrated_t *self) {
void ten_protocol_integrated_on_close(ten_protocol_integrated_t *self) {
TEN_ASSERT(self, "Should not happen.");

ten_protocol_t *protocol = &self->base;
Expand Down Expand Up @@ -141,6 +146,12 @@ void ten_protocol_integrated_close(ten_protocol_integrated_t *self) {
ten_stream_close(self->role_facility.communication_stream);
perform_any_closing_operation = true;
}

if (self->retry_timer) {
ten_timer_stop_async(self->retry_timer);
ten_timer_close_async(self->retry_timer);
perform_any_closing_operation = true;
}
break;

default:
Expand Down
Loading
Loading