@@ -160,6 +160,34 @@ class DataResponder::Impl
160160        }
161161    }
162162
163+     void  sendResponse (std::vector<size_t > const & blockHashes, std::map<RequestIdType, Response>::iterator it)
164+     {
165+         auto  reqId = mCurrentRequest .value ();
166+         auto  count = --mRemainSendCount [reqId];
167+         TLLM_CHECK (count >= 0 );
168+         if  (count == 0 )
169+         {
170+             mRemainSendCount .erase (reqId);
171+ 
172+             //  TODO(zhengd): pass the hashes directly instead of update llmRequest
173+             auto  llmRequest = it->second .mRequest ;
174+             llmRequest->setRequestedBlockHashes (std::move (blockHashes));
175+ 
176+             if  (common::getEnvParallelCacheSend ())
177+             {
178+                 //  TODO: Use a thread pool and check for thread safety.
179+                 std::thread (&DataResponder::Impl::sendAndRemoveResponse, this , it->first , std::move (it->second ))
180+                     .detach ();
181+             }
182+             else 
183+             {
184+                 DataResponder::Impl::sendAndRemoveResponse (it->first , std::move (it->second ));
185+             }
186+             removeResponse (it);
187+         }
188+         mCurrentRequest  = std::nullopt ;
189+     }
190+ 
163191    void  response () noexcept 
164192    {
165193        try 
@@ -193,40 +221,22 @@ class DataResponder::Impl
193221                auto  it = getCurrentResponse ();
194222                if  (it != mReadyResponses .end ())
195223                {
196-                     auto  reqId = mCurrentRequest .value ();
197-                     auto  count = --mRemainSendCount [reqId];
198-                     TLLM_CHECK (count >= 0 );
199-                     if  (count == 0 )
224+                     sendResponse (blockHashes, it);
225+                 }
226+                 else 
227+                 {
228+                     auto  it = getCurrentResponse ();
229+                     while  (it == mReadyResponses .end ())
200230                    {
201-                         mRemainSendCount .erase (reqId);
202- 
203-                         //  TODO(zhengd): pass the hashes directly instead of update llmRequest
204-                         auto  llmRequest = it->second .mRequest ;
205-                         llmRequest->setRequestedBlockHashes (std::move (blockHashes));
206- 
207-                         if  (common::getEnvParallelCacheSend ())
208-                         {
209-                             //  TODO: Use a thread pool and check for thread safety.
210-                             std::thread (
211-                                 &DataResponder::Impl::sendAndRemoveResponse, this , it->first , std::move (it->second ))
212-                                 .detach ();
213-                         }
214-                         else 
231+                         std::unique_lock lk (mCondMutex );
232+                         mResponderCv .wait (lk, [this ]() { return  (mAnyReady  || mTerminate ); });
233+                         if  (mTerminate )
215234                        {
216-                             DataResponder::Impl::sendAndRemoveResponse (it-> first ,  std::move (it-> second )) ;
235+                             break ;
217236                        }
218-                         removeResponse (it );
237+                         it =  getCurrentResponse ( );
219238                    }
220-                     mCurrentRequest  = std::nullopt ;
221-                 }
222-                 else 
223-                 {
224-                     TLLM_CHECK_WITH_INFO (!mCurrentRequest .has_value (),
225-                         " This executor does not have a prepared KV cache for request ID: %zu, and the " 
226-                         " mReadyResponses size is: %zu. mpi rank :%d     " 
227-                         mCurrentRequest .value (), mReadyResponses .size (), mpi::MpiComm::world ().getRank ());
228-                     std::unique_lock lk (mCondMutex );
229-                     mResponderCv .wait (lk, [this ]() { return  (mAnyReady  || mTerminate ); });
239+                     sendResponse (blockHashes, it);
230240                }
231241            }
232242        }
0 commit comments