From 59c5d017478ed97405ed753612cd5e7497e537d6 Mon Sep 17 00:00:00 2001 From: junq <22017000+QiJune@users.noreply.github.com> Date: Mon, 7 Jul 2025 16:04:31 +0800 Subject: [PATCH 1/4] Revert "fix _pad_attention_dp_dummy_request (#5583)" This reverts commit 3a58db88c83590f7ca293e61e801140f17046fa8. --- .../_torch/pyexecutor/model_engine.py | 61 ++++++++----------- tensorrt_llm/_torch/pyexecutor/py_executor.py | 33 +++++----- tensorrt_llm/_torch/speculative/mtp.py | 6 -- .../defs/accuracy/test_llm_api_pytorch.py | 3 +- 4 files changed, 44 insertions(+), 59 deletions(-) diff --git a/tensorrt_llm/_torch/pyexecutor/model_engine.py b/tensorrt_llm/_torch/pyexecutor/model_engine.py index 052350ecd89..3a454cb740f 100644 --- a/tensorrt_llm/_torch/pyexecutor/model_engine.py +++ b/tensorrt_llm/_torch/pyexecutor/model_engine.py @@ -1197,16 +1197,16 @@ def _prepare_tp_inputs( new_tokens_lens_device = new_tensors_device.new_tokens_lens # [batch] next_draft_tokens_device = new_tensors_device.next_draft_tokens # [batch, draft_len] - # Requests with draft tokens are treated like extend requests. CUDA graph dummy extend - # requests should be at the end of extend_requests. + # Requests with draft tokens are treated like extend requests. Dummy extend requests should be + # at the end of extend_requests. extend_requests = [] - extend_cuda_graph_dummy_requests = [] + extend_dummy_requests = [] generation_requests = [] for request in scheduled_requests.generation_requests: if len(request.py_draft_tokens ) > 0 or next_draft_tokens_device is not None: - if request.is_cuda_graph_dummy: - extend_cuda_graph_dummy_requests.append(request) + if request.is_dummy: + extend_dummy_requests.append(request) else: extend_requests.append(request) else: @@ -1219,8 +1219,8 @@ def _prepare_tp_inputs( pin_memory=True) mrope_config['mrope_position_deltas'].append( mrope_position_deltas.to('cuda', non_blocking=True)) + extend_requests += extend_dummy_requests - extend_requests = extend_cuda_graph_dummy_requests + extend_requests if not self._disable_overlap_scheduler and self.is_spec_decode: spec_dec_mode = self.spec_config.spec_dec_mode assert spec_dec_mode.support_overlap_scheduler( @@ -1229,18 +1229,18 @@ def _prepare_tp_inputs( # will contain previous batch incices of generation requests previous_batch_indices = [] previous_pos_indices = [] - request_ids_with_previous_batch = [] - num_extend_reqs_wo_previous_batch = 0 for request in extend_requests: # the request has no previous tensor: # (1) next_draft_tokens_device is None, which means overlap scheduler is disabled; or # (2) a dummy request; or # (3) the first step in the generation server of disaggregated serving if next_draft_tokens_device is None or request.is_dummy or request.py_batch_idx is None: - # get token ids, including input token ids and draft token ids - input_ids.append(request.get_last_tokens(0)) - input_ids.extend(request.py_draft_tokens) - draft_tokens.extend(request.py_draft_tokens) + # get token ids, including input token ids and draft token ids. For these dummy requests, + # no need to copy the token ids. + if not request.is_dummy: + input_ids.append(request.get_last_tokens(0)) + input_ids.extend(request.py_draft_tokens) + draft_tokens.extend(request.py_draft_tokens) # get other ids and lengths num_draft_tokens = len(request.py_draft_tokens) past_seen_token_num = request.max_beam_num_tokens - 1 @@ -1268,7 +1268,6 @@ def _prepare_tp_inputs( # update batch index request.py_batch_idx = batch_idx batch_idx += 1 - num_extend_reqs_wo_previous_batch += 1 else: # update batch index previous_batch_idx = request.py_batch_idx @@ -1295,10 +1294,7 @@ def _prepare_tp_inputs( num_cached_tokens_per_seq.append(past_seen_token_num + self.max_draft_len + 1) prompt_lengths.append(request.py_prompt_len) - request_ids_with_previous_batch.append(request.py_request_id) - - # move requests with previous batch to the end of the list - request_ids.extend(request_ids_with_previous_batch) + request_ids.append(request.py_request_id) sequence_lengths.extend([1] * len(generation_requests)) gather_ids.extend( @@ -1333,6 +1329,7 @@ def _prepare_tp_inputs( num_tokens = len(input_ids) num_draft_tokens = len(draft_tokens) previous_batchs = len(previous_batch_indices) + num_requests = len(request_ids) total_num_tokens = len(position_ids) assert total_num_tokens <= self.max_num_tokens, ( "total_num_tokens should be less than or equal to max_num_tokens") @@ -1374,31 +1371,27 @@ def _prepare_tp_inputs( non_blocking=True) # prepare data for the preprocess inputs kv_len_offsets_device = new_tokens_lens_device - self.max_draft_len - 1 - pre_tokens_start_idx = num_extend_reqs_wo_previous_batch * ( - 1 + self.max_draft_len) - pre_tokens_end_idx = pre_tokens_start_idx + previous_batch_tokens - pre_batch_start_idx = num_extend_reqs_wo_previous_batch - pre_batch_end_idx = pre_batch_start_idx + previous_batchs previous_pos_indices = torch.tensor(previous_pos_indices, dtype=torch.int, pin_memory=True) - self.previous_pos_indices_cuda[ - pre_tokens_start_idx:pre_tokens_end_idx].copy_( - previous_pos_indices, non_blocking=True) + self.previous_pos_indices_cuda[0:previous_batch_tokens].copy_( + previous_pos_indices, non_blocking=True) self.previous_pos_id_offsets_cuda[ - pre_tokens_start_idx:pre_tokens_end_idx].copy_( + 0:previous_batch_tokens].copy_( new_tokens_lens_device[self.previous_pos_indices_cuda[ - pre_tokens_start_idx:pre_tokens_end_idx]], - non_blocking=True) - self.previous_kv_lens_offsets_cuda[ - pre_batch_start_idx:pre_batch_end_idx].copy_( - kv_len_offsets_device[ - self.previous_batch_indices_cuda[:previous_batchs]], + 0:previous_batch_tokens]], non_blocking=True) + self.previous_kv_lens_offsets_cuda[0:previous_batchs].copy_( + kv_len_offsets_device[ + self.previous_batch_indices_cuda[:previous_batchs]], + non_blocking=True) # for the requests that do not have previous batch, set the previous_pos_id_offsets and # previous_kv_lens_offsets to zeros to skip the value changes in _preprocess_inputs - self.previous_pos_id_offsets_cuda[:pre_tokens_start_idx] *= 0 - self.previous_kv_lens_offsets_cuda[:pre_batch_start_idx] *= 0 + self.previous_pos_id_offsets_cuda[ + previous_batch_tokens:num_requests * + (1 + self.max_draft_len)] *= 0 + self.previous_kv_lens_offsets_cuda[ + previous_batchs:num_requests] *= 0 else: # change the data to zeros to skip the value changes in _preprocess_inputs self.previous_pos_id_offsets_cuda *= 0 diff --git a/tensorrt_llm/_torch/pyexecutor/py_executor.py b/tensorrt_llm/_torch/pyexecutor/py_executor.py index 92cf91352bb..13508f85821 100644 --- a/tensorrt_llm/_torch/pyexecutor/py_executor.py +++ b/tensorrt_llm/_torch/pyexecutor/py_executor.py @@ -1492,7 +1492,7 @@ def _check_disagg_gen_transfer_status(self): @nvtx_range("_pad_attention_dp_dummy_request") def _pad_attention_dp_dummy_request(self): """ - Pad with dummy requests, if required, to avoid empty attention_dp rank. + Pad with a dummy request, if required, to ensure every attention_dp rank has at least one active request. """ if not self.enable_attention_dp: return @@ -1506,22 +1506,20 @@ def _pad_attention_dp_dummy_request(self): or req.is_disagg_generation_transmission_in_progress else 1 for req in self.active_requests ]) - num_dummy_request = self.expected_num_active_requests - num_active_request - if num_dummy_request > 0: - llm_request_list = self.kv_cache_manager.add_dummy_requests( - request_ids=list(range(num_dummy_request)), + + if self.expected_num_active_requests - num_active_request > 0 and num_active_request == 0: + llm_request = self.kv_cache_manager.add_dummy_requests( + request_ids=[0], is_gen=not self.has_context_request, prepare_resource=not self.has_context_request, max_num_draft_tokens=self.max_draft_tokens, - ) - for llm_request in llm_request_list: - llm_request.is_attention_dp_dummy = True + )[0] + llm_request.is_attention_dp_dummy = True spec_resource_manager = self.resource_manager.get_resource_manager( ResourceManagerType.SPEC_RESOURCE_MANAGER) if spec_resource_manager is not None: - spec_resource_manager.add_dummy_requests( - list(range(num_dummy_request))) - self.active_requests += llm_request_list + spec_resource_manager.add_dummy_requests([0]) + self.active_requests.append(llm_request) @nvtx_range("_prepare_disagg_gen_init") def _prepare_disagg_gen_init(self, fitting_disagg_gen_init_requests): @@ -1645,12 +1643,13 @@ def forward(scheduled_requests, resource_manager, new_tensors_device, def _update_request_states_tp(self, scheduled_requests: ScheduledRequests): # handle potential attention dp dummy request - for request in self.active_requests[:]: - if request.is_attention_dp_dummy: - request.state = LlmRequestState.GENERATION_COMPLETE - self.inflight_req_ids.erase(request.py_request_id) - self._terminate_request(request) - self.active_requests.remove(request) + if self.active_requests and self.active_requests[ + -1].is_attention_dp_dummy: + request = self.active_requests[-1] + request.state = LlmRequestState.GENERATION_COMPLETE + self.inflight_req_ids.erase(request.py_request_id) + self._terminate_request(request) + self.active_requests.remove(request) for request in scheduled_requests.context_requests: if request.state != LlmRequestState.GENERATION_COMPLETE: # skip failed requests diff --git a/tensorrt_llm/_torch/speculative/mtp.py b/tensorrt_llm/_torch/speculative/mtp.py index 29a17593e97..82ed94ab0fd 100644 --- a/tensorrt_llm/_torch/speculative/mtp.py +++ b/tensorrt_llm/_torch/speculative/mtp.py @@ -267,12 +267,6 @@ def update_requests(self, state: SampleStateMTP) -> None: request.py_decoding_iter += 1 idx += 1 - # skip the results of cuda graph dummy requests - if idx == 0: - num_cuda_graph_dummy_requests = len(new_tokens_list) - len( - state.scheduled_requests.generation_requests) - idx += num_cuda_graph_dummy_requests - for request in state.scheduled_requests.generation_requests: assert not request.py_return_context_logits, "return_context_logits not implemented for MTPSampler" assert not request.py_return_generation_logits, "return_generation_logits not implemented for MTPSampler" diff --git a/tests/integration/defs/accuracy/test_llm_api_pytorch.py b/tests/integration/defs/accuracy/test_llm_api_pytorch.py index a3b4e2eaf62..23d7a468269 100644 --- a/tests/integration/defs/accuracy/test_llm_api_pytorch.py +++ b/tests/integration/defs/accuracy/test_llm_api_pytorch.py @@ -646,8 +646,7 @@ def test_fp8_block_scales(self, mtp_nextn, fp8kv, attention_dp, cuda_graph, @pytest.mark.skip_device_not_contain(["H100"]) @parametrize_with_ids("mtp_nextn", [0, 2]) def test_fp8_block_scales_cuda_graph_padding(self, mtp_nextn): - # OOM on H100 with default free_gpu_memory_fraction=0.9 - kv_cache_config = KvCacheConfig(free_gpu_memory_fraction=0.8) + kv_cache_config = KvCacheConfig(free_gpu_memory_fraction=0.9) mtp_config = None if mtp_nextn > 0: mtp_config = MTPDecodingConfig(num_nextn_predict_layers=mtp_nextn) From 7b66676127a7492df8d750f539a21aebeabb7b60 Mon Sep 17 00:00:00 2001 From: junq <22017000+QiJune@users.noreply.github.com> Date: Mon, 7 Jul 2025 16:08:12 +0800 Subject: [PATCH 2/4] avoid nesting NCCL grouping in allgather OP Signed-off-by: junq <22017000+QiJune@users.noreply.github.com> --- cpp/tensorrt_llm/thop/allgatherOp.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/cpp/tensorrt_llm/thop/allgatherOp.cpp b/cpp/tensorrt_llm/thop/allgatherOp.cpp index 693e12d0e43..470d96dabdd 100644 --- a/cpp/tensorrt_llm/thop/allgatherOp.cpp +++ b/cpp/tensorrt_llm/thop/allgatherOp.cpp @@ -97,13 +97,21 @@ class AllgatherOp { std::vector output_list; output_list.reserve(input_list.size()); - ncclGroupStart(); + // NCCL Groups cannot be nested. + // Skips NCCL grouping when allgather v calls ncclGroupStart/ncclGroupEnd. + if (!sizes.has_value()) + { + ncclGroupStart(); + } for (auto const& input : input_list) { auto output = run(input, sizes); output_list.push_back(output); } - ncclGroupEnd(); + if (!sizes.has_value()) + { + ncclGroupEnd(); + } return output_list; } From 251d767b67cd6260ea7ba37abdf6e40f7caa69bf Mon Sep 17 00:00:00 2001 From: junq <22017000+QiJune@users.noreply.github.com> Date: Mon, 7 Jul 2025 16:08:49 +0800 Subject: [PATCH 3/4] polish Signed-off-by: junq <22017000+QiJune@users.noreply.github.com> --- cpp/tensorrt_llm/thop/allgatherOp.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/tensorrt_llm/thop/allgatherOp.cpp b/cpp/tensorrt_llm/thop/allgatherOp.cpp index 470d96dabdd..02f5b72f6b9 100644 --- a/cpp/tensorrt_llm/thop/allgatherOp.cpp +++ b/cpp/tensorrt_llm/thop/allgatherOp.cpp @@ -98,7 +98,7 @@ class AllgatherOp std::vector output_list; output_list.reserve(input_list.size()); // NCCL Groups cannot be nested. - // Skips NCCL grouping when allgather v calls ncclGroupStart/ncclGroupEnd. + // Skip NCCL grouping when allgather v calls ncclGroupStart/ncclGroupEnd. if (!sizes.has_value()) { ncclGroupStart(); From 66b2ffd138cf50cc5e7b858ed86b08fe3ff2d93a Mon Sep 17 00:00:00 2001 From: junq <22017000+QiJune@users.noreply.github.com> Date: Tue, 8 Jul 2025 08:48:14 +0800 Subject: [PATCH 4/4] polish code comment Signed-off-by: junq <22017000+QiJune@users.noreply.github.com> --- cpp/tensorrt_llm/thop/allgatherOp.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/tensorrt_llm/thop/allgatherOp.cpp b/cpp/tensorrt_llm/thop/allgatherOp.cpp index 02f5b72f6b9..a9f2288066b 100644 --- a/cpp/tensorrt_llm/thop/allgatherOp.cpp +++ b/cpp/tensorrt_llm/thop/allgatherOp.cpp @@ -98,7 +98,7 @@ class AllgatherOp std::vector output_list; output_list.reserve(input_list.size()); // NCCL Groups cannot be nested. - // Skip NCCL grouping when allgather v calls ncclGroupStart/ncclGroupEnd. + // Skip NCCL grouping when AllgatherV(variable-length AllGather) calls ncclGroupStart/ncclGroupEnd. if (!sizes.has_value()) { ncclGroupStart();