@@ -135,9 +135,8 @@ class DataResponder::Impl
135135 if (common::getEnvParallelCacheSend ())
136136 {
137137 // TODO: Use a thread pool and check for thread safety.
138- std::thread (
139- &DataResponder::Impl::sendAndRemoveResponse, this , reqId, std::move (readyResponseIt->second ))
140- .detach ();
138+ mSendThreads .emplace_back (
139+ &DataResponder::Impl::sendAndRemoveResponse, this , reqId, std::move (readyResponseIt->second ));
141140 }
142141 else
143142 {
@@ -177,6 +176,11 @@ class DataResponder::Impl
177176
178177 ~Impl ()
179178 {
179+ for (auto & thread : mSendThreads )
180+ {
181+ thread.join ();
182+ }
183+ mSendThreads .clear ();
180184 terminate ();
181185 }
182186
@@ -220,10 +224,8 @@ class DataResponder::Impl
220224 {
221225 break ;
222226 }
223- std::vector<size_t > blockHashes;
224227 auto const & requestInfo = mSender ->recvRequestInfo ();
225228 auto reqId = requestInfo.getRequestId ();
226- blockHashes = requestInfo.getBlockHashes ();
227229 {
228230 std::unique_lock lk (mSendMutex );
229231 mRequestInfoMap [reqId] = std::move (requestInfo);
@@ -276,6 +278,7 @@ class DataResponder::Impl
276278 std::condition_variable mResponderCv ;
277279 std::future<void > mResponseFuture ;
278280 std::unique_ptr<DataSender> mSender ;
281+ std::vector<std::thread> mSendThreads ;
279282 std::unordered_map<LlmRequest::RequestIdType, int > mRemainSendCount ;
280283 std::unordered_map<LlmRequest::RequestIdType, RequestInfo> mRequestInfoMap ;
281284 int mDeviceId {-1 };
0 commit comments