Skip to content

Commit 5dcad6f

Browse files
committed
Bugfix: fixed the following bugs
- fixed the issue where the timer memory managed by PeripheryTaskScheduler was not released correctly when the program exited - fixed the issue where fiber worker threads might only run on a few CPU cores in a container environment with strict CPU affinity binding - fixed the coredump issue during HTTP streaming client connection cleanup - Update the connection active time when reading data from the socket to prevent connection disconnection due to long packet reception time
1 parent 0e5a67f commit 5dcad6f

File tree

13 files changed

+93
-19
lines changed

13 files changed

+93
-19
lines changed

trpc/metrics/prometheus/prometheus_metrics.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ void PrometheusMetrics::Start() noexcept {
8888

8989
void PrometheusMetrics::Stop() noexcept {
9090
if (push_gateway_task_id_ != 0) {
91-
PeripheryTaskScheduler::GetInstance()->StopInnerTask(push_gateway_task_id_);
91+
PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(push_gateway_task_id_);
9292
PeripheryTaskScheduler::GetInstance()->JoinInnerTask(push_gateway_task_id_);
9393
push_gateway_task_id_ = 0;
9494
}

trpc/naming/domain/selector_domain.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ void SelectorDomain::Start() noexcept {
306306

307307
void SelectorDomain::Stop() noexcept {
308308
if (task_id_) {
309-
PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_);
309+
PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_);
310310
task_id_ = 0;
311311
}
312312
}

trpc/rpcz/collector.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ void RpczCollectorTask::Destroy() {
8484

8585
void RpczCollectorTask::Stop() {
8686
if (task_id_) {
87-
trpc::PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_);
87+
trpc::PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_);
8888
task_id_ = 0;
8989
}
9090
}

trpc/runtime/common/heartbeat/heartbeat_report.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ void HeartBeatReport::Stop() {
117117

118118
enable_ = false;
119119

120-
PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_);
120+
PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_);
121121
PeripheryTaskScheduler::GetInstance()->JoinInnerTask(task_id_);
122122

123123
task_id_ = 0;

trpc/runtime/common/periphery_task_scheduler.cc

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ void PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::Stop() {
4747
}
4848

4949
void PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::Join() {
50-
for (unsigned i = 0; i < thread_num_; ++i) {
51-
if (workers_[i].joinable()) {
52-
workers_[i].join();
50+
for (auto & worker : workers_) {
51+
if (worker.joinable()) {
52+
worker.join();
5353
}
5454
}
5555
workers_.clear();
@@ -61,6 +61,11 @@ void PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::Join() {
6161
}
6262

6363
while (!tasks_.empty()) {
64+
// Compatibility handling for scenarios where the user does not call DetachTask/RemoveTask:
65+
// actively call Deref when program exit.
66+
if (tasks_.top()->UnsafeRefCount() > 1) {
67+
tasks_.top()->Deref();
68+
}
6469
tasks_.pop();
6570
}
6671
}
@@ -135,6 +140,19 @@ bool PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::StopTaskImpl(uint64_t t
135140
return StopAndDestroyTask(task_id, false);
136141
}
137142

143+
bool PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::DetachTaskImpl(uint64_t task_id) {
144+
if (TRPC_UNLIKELY(exited_.load(std::memory_order_relaxed))) {
145+
return false;
146+
}
147+
148+
TaskPtr task_ptr = GetTaskPtr(task_id);
149+
if (task_ptr.Get() == nullptr) {
150+
return false;
151+
}
152+
153+
return true;
154+
}
155+
138156
bool PeripheryTaskScheduler::PeripheryTaskSchedulerImpl::JoinTaskImpl(uint64_t task_id) {
139157
if (TRPC_UNLIKELY(exited_.load(std::memory_order_relaxed))) {
140158
TRPC_FMT_ERROR("PeripheryTaskScheduler is already exited.");
@@ -323,6 +341,13 @@ bool PeripheryTaskScheduler::RemoveTask(uint64_t task_id) {
323341
return scheduler_->RemoveTaskImpl(task_id);
324342
}
325343

344+
bool PeripheryTaskScheduler::DetachTask(std::uint64_t task_id) {
345+
if (!scheduler_) {
346+
return false;
347+
}
348+
return scheduler_->DetachTaskImpl(task_id);
349+
}
350+
326351
bool PeripheryTaskScheduler::StopTask(uint64_t task_id) {
327352
if (!scheduler_) {
328353
return false;
@@ -368,6 +393,13 @@ bool PeripheryTaskScheduler::RemoveInnerTask(uint64_t task_id) {
368393
return inner_scheduler_->RemoveTaskImpl(task_id);
369394
}
370395

396+
bool PeripheryTaskScheduler::DetachInnerTask(std::uint64_t task_id) {
397+
if (!inner_scheduler_) {
398+
return false;
399+
}
400+
return inner_scheduler_->DetachTaskImpl(task_id);
401+
}
402+
371403
bool PeripheryTaskScheduler::StopInnerTask(uint64_t task_id) {
372404
if (!inner_scheduler_) {
373405
return false;

trpc/runtime/common/periphery_task_scheduler.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,16 @@ class PeripheryTaskScheduler {
102102
/// @brief Remove task, used in scenarios where it is not necessary to wait for tasks to complete before exiting.
103103
/// @param task_id task id
104104
/// @return on success, return true. on error, return false
105-
/// @note This interface can only be called once with the same ID.
105+
/// @note This interface can only be called once with the same ID. After call it, the task_id can't be used again
106106
bool RemoveTask(std::uint64_t task_id);
107107

108+
/// @brief Detach task, after calling this interface, the lifecycle of this task will be managed by the scheduler,
109+
// users no longer need to concern themselves with the release of the task.
110+
/// @param task_id task id
111+
/// @return on success return true, otherwise return false
112+
/// @note the task_id can't be used again after calling this interface
113+
bool DetachTask(std::uint64_t task_id);
114+
108115
/// @brief Same as 'SubmitTask', but is used only internally by the framework.
109116
std::uint64_t SubmitInnerTask(Function<void()>&& task, const std::string& name = "");
110117

@@ -119,10 +126,13 @@ class PeripheryTaskScheduler {
119126
/// @brief Same as 'RemoveTask', but is used only internally by the framework.
120127
bool RemoveInnerTask(std::uint64_t task_id);
121128

122-
/// @brief Same as 'Stoptask', but is used only internally by the framework.
129+
/// @brief Same as 'DetachTask', but is used only internally by the framework.
130+
bool DetachInnerTask(std::uint64_t task_id);
131+
132+
/// @brief Same as 'StopTask', but is used only internally by the framework.
123133
bool StopInnerTask(std::uint64_t task_id);
124134

125-
/// @brief Same as 'Jointask', but is used only internally by the framework.
135+
/// @brief Same as 'JoinTask', but is used only internally by the framework.
126136
bool JoinInnerTask(std::uint64_t task_id);
127137

128138
/// @brief Used to destroy resources accessed by scheduled tasks after all scheduled task execution threads have
@@ -158,6 +168,7 @@ class PeripheryTaskScheduler {
158168
bool StopTaskImpl(std::uint64_t task_id);
159169
bool JoinTaskImpl(std::uint64_t task_id);
160170
bool RemoveTaskImpl(std::uint64_t task_id);
171+
bool DetachTaskImpl(uint64_t task_id);
161172

162173
void Init(size_t thread_num);
163174
void Start();

trpc/runtime/common/periphery_task_scheduler_test.cc

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,13 @@ void TestRemoveTask() {
7474
ASSERT_TRUE(PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id));
7575
}
7676

77+
void TestDetachTask() {
78+
std::uint64_t task_id = PeripheryTaskScheduler::GetInstance()->SubmitInnerTask([]() {});
79+
ASSERT_TRUE(task_id > 0);
80+
ASSERT_FALSE(PeripheryTaskScheduler::GetInstance()->DetachInnerTask(task_id + 1));
81+
ASSERT_TRUE(PeripheryTaskScheduler::GetInstance()->DetachInnerTask(task_id));
82+
}
83+
7784
void TestSubmitPeriodicTask() {
7885
int count = 0;
7986
Latch latch(1);
@@ -268,6 +275,8 @@ TEST_F(PeripheryTaskSchedulerTest, SubmitTaskTest) { TestSubmitTask(); }
268275

269276
TEST_F(PeripheryTaskSchedulerTest, RemoveTaskTest) { TestRemoveTask(); }
270277

278+
TEST_F(PeripheryTaskSchedulerTest, DetachTaskTest) { TestDetachTask(); }
279+
271280
TEST_F(PeripheryTaskSchedulerTest, SubmitPeriodicTaskTest) { TestSubmitPeriodicTask(); }
272281

273282
TEST_F(PeripheryTaskSchedulerTest, RemoveTaskAdvanceTest) { TestRemoveTaskAdvance(); }

trpc/runtime/common/runtime_info_report/runtime_info_reporter.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ void StopReportRuntimeInfo() {
173173
return;
174174
}
175175

176-
PeripheryTaskScheduler::GetInstance()->StopInnerTask(report_task_id);
176+
PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(report_task_id);
177177
PeripheryTaskScheduler::GetInstance()->JoinInnerTask(report_task_id);
178178

179179
report_task_id = 0;

trpc/runtime/common/stats/frame_stats.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ void FrameStats::Start() {
3737

3838
void FrameStats::Stop() {
3939
if (task_id_) {
40-
PeripheryTaskScheduler::GetInstance()->StopInnerTask(task_id_);
40+
PeripheryTaskScheduler::GetInstance()->RemoveInnerTask(task_id_);
4141
PeripheryTaskScheduler::GetInstance()->JoinInnerTask(task_id_);
4242
task_id_ = 0;
4343
}

trpc/runtime/iomodel/reactor/default/tcp_connection.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ int TcpConnection::HandleReadEvent() {
180180

181181
int ret = ReadIoData(read_buffer_.buffer);
182182
if (ret > 0) {
183+
SetConnActiveTime(trpc::time::GetMilliSeconds());
184+
GetConnectionHandler()->UpdateConnection();
185+
183186
std::deque<std::any> data;
184187
RefPtr ref(ref_ptr, this);
185188
int checker_ret = GetConnectionHandler()->CheckMessage(ref, read_buffer_.buffer, data);
@@ -197,9 +200,6 @@ int TcpConnection::HandleReadEvent() {
197200
if (TRPC_UNLIKELY(GetConnectionState() == ConnectionState::kUnconnected)) {
198201
return -1;
199202
}
200-
201-
SetConnActiveTime(trpc::time::GetMilliSeconds());
202-
GetConnectionHandler()->UpdateConnection();
203203
}
204204
} else if (checker_ret == kPacketError) {
205205
TRPC_LOG_ERROR("TcpConnection::HandleReadEvent fd:" << socket_.GetFd() << ", ip:" << GetPeerIp()

0 commit comments

Comments
 (0)