Skip to content

Commit 56b13a1

Browse files
authored
Merge branch 'develop' into metax-dev-v1-loader
2 parents ff3b7b2 + 88da9d9 commit 56b13a1

File tree

8 files changed

+239
-56
lines changed

8 files changed

+239
-56
lines changed

fastdeploy/engine/sched/resource_manager_v1.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -728,14 +728,7 @@ def _allocate_decode_and_extend():
728728
if scheduled_reqs:
729729
llm_logger.debug(f"schedued_reqs: {scheduled_reqs}")
730730

731-
# Update metrics
732-
num_tasks = sum([1 if task else 0 for task in self.tasks_list])
733-
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
734-
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks)
735-
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
736-
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
737-
main_process_metrics.num_requests_running.set(len(self.running))
738-
main_process_metrics.num_requests_waiting.set(num_tasks - len(self.running))
731+
self.update_metrics()
739732

740733
return scheduled_reqs
741734

@@ -962,7 +955,10 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]):
962955
if request in self.running: # normally run and finished
963956
self.running.remove(request)
964957
request.status = RequestStatus.FINISHED
965-
self._free_blocks(request)
958+
try:
959+
self._free_blocks(request)
960+
except Exception as e:
961+
llm_logger.warning(f"release block failed {req_id}: {e}")
966962
if (
967963
request.request_id in self.to_be_rescheduled_request_id_set
968964
): # finished after preempted, blocks have been recycled.
@@ -981,7 +977,19 @@ def finish_requests(self, request_ids: Union[str, Iterable[str]]):
981977
del self.req_dict[req_id]
982978
except Exception as e:
983979
llm_logger.error(f"finish_request err: {e}, {str(traceback.format_exc())}")
980+
finally:
981+
self.update_metrics()
984982

985983
def clear_data(self):
986984
self.waiting: deque[Request] = deque()
987985
self.to_be_rescheduled_request_id_set = set()
986+
987+
def update_metrics(self):
988+
# Update metrics
989+
num_tasks = sum([1 if task else 0 for task in self.tasks_list])
990+
num_blocks_used_by_tasks = sum([len(task.block_tables) if task else 0 for task in self.tasks_list])
991+
main_process_metrics.available_gpu_block_num.set(self.total_block_number() - num_blocks_used_by_tasks)
992+
main_process_metrics.batch_size.set(self.max_num_seqs - self.available_batch())
993+
main_process_metrics.gpu_cache_usage_perc.set(self.get_gpu_cache_usage_perc())
994+
main_process_metrics.num_requests_running.set(len(self.running))
995+
main_process_metrics.num_requests_waiting.set(num_tasks - len(self.running))

fastdeploy/model_executor/layers/mtp_linear.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ def __init__(
7676
self.linear.weight,
7777
{
7878
"weight_loader": default_weight_loader(self.fd_config),
79-
"model_format": self.fd_config.model_config.model_format,
79+
"weight_need_transpose": self.fd_config.model_config.model_format == "torch",
8080
},
8181
)
8282
if self.bias_key is not None:
@@ -100,7 +100,7 @@ def __init__(
100100
self.linear.weight,
101101
{
102102
"weight_loader": default_weight_loader(self.fd_config),
103-
"model_format": self.fd_config.model_config.model_format,
103+
"weight_need_transpose": self.fd_config.model_config.model_format == "torch",
104104
},
105105
)
106106
if self.nranks > 1:

fastdeploy/output/token_processor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -847,7 +847,7 @@ def _record_speculative_decoding_mertics(self, accept_num):
847847
def clear_data(self):
848848
if envs.ENABLE_V1_KVCACHE_SCHEDULER:
849849
self.resource_manager.clear_data()
850-
for i in range(self.cfg.max_num_seqs):
850+
for i in range(self.resource_manager.max_num_seqs):
851851
if self.resource_manager.stop_flags[i]:
852852
continue
853853
task = self.resource_manager.tasks_list[i]

fastdeploy/worker/worker_process.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -340,11 +340,14 @@ def event_loop_normal(self) -> None:
340340
mmap_infos = create_mmap(
341341
[MODEL_MAIN_NAME], self.local_rank, self.ranks, shm_uuid=os.getenv("SHM_UUID", ""), logger=logger
342342
)
343+
344+
tp_size = self.parallel_config.tensor_parallel_size
343345
# Currently, only support single node
344-
self.nnode = int((self.parallel_config.tensor_parallel_size + 7) // 8)
346+
self.nnode = int((tp_size + 7) // 8)
345347
req_ids = []
346348
num_running_requests = 0
347-
local_rank = self.local_rank % self.parallel_config.tensor_parallel_size
349+
tp_rank = self.local_rank % tp_size
350+
348351
self.model_weights_signal = np.zeros([1], dtype=np.int32)
349352
while True:
350353
if self.eplb_config.enable_redundant_experts:
@@ -385,35 +388,34 @@ def event_loop_normal(self) -> None:
385388
if self.local_rank == 0:
386389
rearrange_experts_status_array[0] = RearrangeExpertState.done.value
387390
logger.info("redundant_expert: done")
388-
if self.local_rank % self.parallel_config.tensor_parallel_size == 0:
391+
if tp_rank == 0:
389392
if self.model_weights_status.value[0] != ModelWeightsStatus.NORMAL:
390393
self.model_weights_signal[0] = int(self.model_weights_status.value[0])
391394
if self.fd_config.load_config.dynamic_load_weight and self.parallel_config.enable_expert_parallel:
392395
self.model_weights_signal[0] = self._broadcast_model_weights_signal(
393396
src=0, group=self.parallel_config.ep_group
394397
)
395-
if self.fd_config.load_config.dynamic_load_weight and self.parallel_config.tensor_parallel_size > 1:
398+
if self.fd_config.load_config.dynamic_load_weight and tp_size > 1:
396399
self.model_weights_signal[0] = self._broadcast_model_weights_signal(
397400
src=0, group=self.parallel_config.tp_group
398401
)
399402

400403
self.insert_step = False
401404
req_dicts = None
402-
local_rank = self.local_rank % self.parallel_config.tensor_parallel_size
403-
self.worker_healthy_live_signal.value[local_rank % self.max_chips_per_node] = int(time.time())
405+
self.worker_healthy_live_signal.value[tp_rank % self.max_chips_per_node] = int(time.time())
404406

405407
# The first worker detects whether there are tasks in the task queue
406-
if local_rank == 0:
408+
if tp_rank == 0:
407409
if self.task_queue.num_tasks() > 0:
408410
if envs.ENABLE_V1_KVCACHE_SCHEDULER or not (
409411
self.fd_config.model_config.enable_mm and self.worker.exist_prefill()
410412
):
411-
if self.nnode > 1 and self.parallel_config.tensor_parallel_size > self.max_chips_per_node:
413+
if self.nnode > 1 and tp_size > self.max_chips_per_node:
412414
self.task_queue.read_finish_flag.set(1)
413415
else:
414416
self.exist_task_signal.value[0] = ExistTaskStatus.EXIST
415417

416-
if self.parallel_config.tensor_parallel_size > 1:
418+
if tp_size > 1:
417419
# Synchronize the signal for other workers
418420
self._tp_barrier_wait()
419421

0 commit comments

Comments
 (0)