diff --git a/fastdeploy/engine/async_llm.py b/fastdeploy/engine/async_llm.py index d70962ba8fb..240e1620d06 100644 --- a/fastdeploy/engine/async_llm.py +++ b/fastdeploy/engine/async_llm.py @@ -722,6 +722,7 @@ def _setting_environ_variables(self): "FLAGS_use_append_attn": 1, "NCCL_ALGO": "Ring", "FLAGS_max_partition_size": int(os.getenv("FLAGS_max_partition_size", 1024)), + "OMP_NUM_THREADS": int(os.getenv("OMP_NUM_THREADS", 3)), } # environment variables needed by Dy2St variables.update( diff --git a/fastdeploy/engine/engine.py b/fastdeploy/engine/engine.py index e9919d4ef1a..c7d40c557e5 100644 --- a/fastdeploy/engine/engine.py +++ b/fastdeploy/engine/engine.py @@ -444,6 +444,7 @@ def _setting_environ_variables(self): "PROTOCOL_BUFFERS_PYTHON_IMPLEMENTATION": "python", "NCCL_ALGO": "Ring", "FLAGS_max_partition_size": int(os.getenv("FLAGS_max_partition_size", 1024)), + "OMP_NUM_THREADS": int(os.getenv("OMP_NUM_THREADS", 3)), } # environment variables needed by Dy2St variables.update( diff --git a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py index 1bda8567e75..2a7d4846084 100644 --- a/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py +++ b/fastdeploy/model_executor/layers/backends/xpu/moe/fused_moe.py @@ -95,7 +95,7 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): { "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, "weight_loader": extra_weight_attrs.get("weight_loader", default_weight_loader(layer.fd_config)), - "weight_need_transpose": extra_weight_attrs.get("model_format") == "torch", + "weight_need_transpose": not extra_weight_attrs.get("model_format") == "torch", "tensor_track": TensorTracker(shape=layer.up_gate_proj_weight.shape, output_dim=False), }, ) @@ -104,7 +104,7 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): { "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, "weight_loader": extra_weight_attrs.get("weight_loader", default_weight_loader(layer.fd_config)), - "weight_need_transpose": extra_weight_attrs.get("model_format") == "torch", + "weight_need_transpose": not extra_weight_attrs.get("model_format") == "torch", "tensor_track": TensorTracker(shape=layer.down_proj_weight.shape, output_dim=True), }, ) @@ -126,7 +126,6 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): "weight_loader": extra_weight_attrs.get( "weight_loader", default_weight_loader(layer.fd_config) ), - "model_format": extra_weight_attrs.get("model_format", ""), }, ) set_weight_attrs( @@ -135,7 +134,6 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): "weight_loader": extra_weight_attrs.get( "weight_loader", default_weight_loader(layer.fd_config) ), - "model_format": extra_weight_attrs.get("model_format", ""), }, ) if self.moe_quant_type in ["weight_only_int8", "weight_only_int4"]: diff --git a/fastdeploy/model_executor/layers/embeddings.py b/fastdeploy/model_executor/layers/embeddings.py index 69dbbe371c6..0e66268fffc 100644 --- a/fastdeploy/model_executor/layers/embeddings.py +++ b/fastdeploy/model_executor/layers/embeddings.py @@ -23,7 +23,7 @@ from paddle.distributed import fleet from fastdeploy.config import FDConfig -from fastdeploy.model_executor.utils import set_weight_attrs, slice_fn +from fastdeploy.model_executor.utils import h2d_copy, set_weight_attrs, slice_fn from .utils import ( DEFAULT_VOCAB_PADDING_SIZE, @@ -273,10 +273,10 @@ def weight_loader(self, param, loaded_weight, shard_id=None): shard_weight = slice_fn(loaded_weight, output_dim, start_idx, end_idx) if output_dim == 0: - param[: shard_weight.shape[0]].copy_(shard_weight, False) + h2d_copy(param[: shard_weight.shape[0]], shard_weight) param[shard_weight.shape[0] :].fill_(0) else: - param[:, : shard_weight.shape[1]].copy_(shard_weight, False) + h2d_copy(param[:, : shard_weight.shape[1]], shard_weight) param[:, shard_weight.shape[1] :].fill_(0) def forward(self, ids_remove_padding=None) -> paddle.Tensor: diff --git a/fastdeploy/model_executor/layers/linear.py b/fastdeploy/model_executor/layers/linear.py index 3a2edee8634..5f4291faf3e 100644 --- a/fastdeploy/model_executor/layers/linear.py +++ b/fastdeploy/model_executor/layers/linear.py @@ -25,6 +25,8 @@ from fastdeploy.model_executor.layers.quantization.quant_base import QuantMethodBase from fastdeploy.model_executor.utils import ( default_weight_loader, + h2d_copy, + process_weight_transpose, set_weight_attrs, slice_fn, ) @@ -43,8 +45,13 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): - output_dim: determines whether the split is applied along the output dimension (rows) or input dimension (columns) - weight_loader: a callable or method responsible for loading the weight data """ + self.model_format = extra_weight_attrs.get("model_format") + self.weight_shape = ( + layer.weight_shape[::-1] if extra_weight_attrs.get("model_format") == "torch" else layer.weight_shape + ) + layer.weight = layer.create_parameter( - shape=layer.weight_shape, + shape=self.weight_shape, dtype=layer.weight_dtype, is_bias=False, default_initializer=paddle.nn.initializer.Constant(0), @@ -52,15 +59,22 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): split_axis = extra_weight_attrs.get("split_axis") if hasattr(layer, "nranks") and layer.nranks > 0: _set_var_distributed(layer.weight, split_axis=split_axis) + + if self.model_format == "torch" and "output_dim" in extra_weight_attrs: + extra_weight_attrs["output_dim"] = not extra_weight_attrs["output_dim"] + set_weight_attrs( layer.weight, { **extra_weight_attrs, "weight_loader": extra_weight_attrs.get("weight_loader", default_weight_loader(layer.fd_config)), - "weight_need_transpose": extra_weight_attrs.get("model_format") == "torch", }, ) + def process_weights_after_loading(self, layer): + if self.model_format == "torch": + process_weight_transpose(layer, "weight") + def process_loaded_weights(self, layer, weights) -> None: # mlp.gate.weight is precision-sensitive, so we cast it to float32 for computation if layer.weight.dtype != weights.dtype: @@ -165,7 +179,7 @@ def __init__( if self.with_bias: self.bias = self.create_parameter( shape=[self.output_size], - dtype=self._dtype, + dtype=self.weight_dtype, is_bias=True, ) setattr( @@ -262,6 +276,7 @@ def __init__( skip_quant: bool = False, weight_dtype: str = "", weight_key: str = "", + model_format: Optional[str] = None, ): """ Initializes a replicated linear layer. @@ -296,7 +311,7 @@ def __init__( weight_loader=( self.weight_loader if hasattr(self, "weight_loader") else default_weight_loader(self.fd_config) ), - model_format=fd_config.model_config.model_format, + model_format=fd_config.model_config.model_format if model_format is None else model_format, ) @@ -344,10 +359,8 @@ def __init__( def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = None): weight_need_transpose = getattr(param, "weight_need_transpose", False) - loaded_weight = get_tensor(loaded_weight) - if weight_need_transpose: - loaded_weight = loaded_weight.transpose([1, 0]) + loaded_weight = get_tensor(loaded_weight).transpose([1, 0]) assert loaded_shard_id in ["q_a", "kv_a"] if not param._is_initialized(): @@ -373,7 +386,9 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N loaded_weight = loaded_weight.view(param.dtype) else: loaded_weight = loaded_weight.cast(param.dtype) - param.copy_(loaded_weight, False) + # (bukejiyu) After this fix, the early H2D copy for non-GPU devices is no longer needed and can be safely removed. + loaded_weight = get_tensor(loaded_weight) + h2d_copy(param, loaded_weight) class ColumnParallelLinear(LinearBase): @@ -393,7 +408,7 @@ def __init__( with_bias: bool = False, add_bias: bool = False, skip_quant: bool = False, - weight_dtype="", + weight_dtype: str = "", ): """ Initializes a linear layer and provides additional parameters required for inference and quantization. @@ -493,6 +508,7 @@ def __init__( ) def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = None): + # for xpu and other backend weight_need_transpose = getattr(param, "weight_need_transpose", False) output_dim = getattr(param, "output_dim", None) assert output_dim is not None @@ -522,7 +538,7 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N loaded_weight = get_tensor(loaded_weight) loaded_weight = loaded_weight.transpose([1, 0]) # Tensor parallelism splits the weight along the output_dim - if self.nranks != 1: + if self.nranks > 1 and output_dim is not None: dim = -1 if output_dim else 0 if isinstance(loaded_weight, (np.ndarray, paddle.Tensor)): size = loaded_weight.shape[dim] @@ -532,7 +548,6 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N shard_offset = self.local_rank * block_size shard_size = (self.local_rank + 1) * block_size loaded_weight = slice_fn(loaded_weight, output_dim, start=shard_offset, end=shard_size) - loaded_weight = get_tensor(loaded_weight) if not param._is_initialized(): param.initialize() param_shard_size = output_size // 2 @@ -553,7 +568,8 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N loaded_weight = loaded_weight.view(param.dtype) else: loaded_weight = loaded_weight.cast(param.dtype) - param.copy_(loaded_weight, False) + + h2d_copy(param, loaded_weight) def load_state_dict(self, state_dict: dict): """ @@ -589,7 +605,19 @@ class QKVParallelLinear(ColumnParallelLinear): QKVParallelLinear Layer. """ - def __init__(self, fd_config, prefix, with_bias=False, add_bias=True): + def __init__( + self, + fd_config, + prefix, + with_bias=False, + add_bias=True, + num_heads: Optional[int] = None, + kv_num_heads: Optional[int] = None, + hidden_size: Optional[int] = None, + head_dim: Optional[int] = None, + skip_quant: bool = False, + weight_dtype: str = "", + ): """ Initialize the QKV Linear layer with given parameters. @@ -599,11 +627,15 @@ def __init__(self, fd_config, prefix, with_bias=False, add_bias=True): Can be arbitrarily named. with_bias (bool): Whether to include bias or not. Defaults to False. add_bias (bool): Whether to add bias in the current layer or in the pre/post layer. Defaults to True. + num_heads (Optional[int]): Number of attention heads in the model. + kv_num_heads (Optional[int]): Number of key/value heads, used for multi-query or grouped-query attention. + hidden_size (Optional[int]): Total hidden layer dimension, typically the embedding size. + head_dim (Optional[int]): Size of each attention head, usually computed as hidden_size divided by num_heads. """ - self.num_heads = fd_config.model_config.num_attention_heads - self.kv_num_heads = fd_config.model_config.num_key_value_heads - self.hidden_size = fd_config.model_config.hidden_size - self.head_dim = fd_config.model_config.head_dim + self.num_heads = fd_config.model_config.num_attention_heads if num_heads is None else num_heads + self.kv_num_heads = fd_config.model_config.num_key_value_heads if kv_num_heads is None else kv_num_heads + self.hidden_size = fd_config.model_config.hidden_size if hidden_size is None else hidden_size + self.head_dim = fd_config.model_config.head_dim if head_dim is None else head_dim self.nranks = fd_config.parallel_config.tensor_parallel_size self.local_rank = fd_config.parallel_config.tensor_parallel_rank self.num_heads_per_rank = divide(self.num_heads, self.nranks) @@ -623,6 +655,8 @@ def __init__(self, fd_config, prefix, with_bias=False, add_bias=True): output_size=output_size, with_bias=with_bias, add_bias=add_bias, + skip_quant=skip_quant, + weight_dtype=weight_dtype, ) def _get_shard_size_mapping(self, loaded_shard_id: str, head_dim: int): @@ -664,15 +698,13 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N loaded_weight = get_tensor(loaded_weight) loaded_weight = loaded_weight.transpose([1, 0]) # Tensor parallelism splits the weight along the output_dim - if self.nranks != 1: + if self.nranks > 1 and output_dim is not None: block_size = self._get_shard_size_mapping(loaded_shard_id, head_dim) shard_id = self.local_rank if loaded_shard_id == "q" else self.local_rank // self.num_kv_head_replicas shard_offset = shard_id * block_size shard_size = block_size loaded_weight = slice_fn(loaded_weight, output_dim, start=shard_offset, end=shard_offset + shard_size) - loaded_weight = get_tensor(loaded_weight) - if not param._is_initialized(): param.initialize() @@ -700,7 +732,7 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N loaded_weight = loaded_weight.view(param.dtype) else: loaded_weight = loaded_weight.cast(param.dtype) - param.copy_(loaded_weight, False) + h2d_copy(param, loaded_weight) def load_weight(self, state_dict: dict): """ @@ -798,7 +830,7 @@ def __init__( add_bias: bool = False, reduce_results: bool = True, skip_quant: bool = False, - weight_dtype="", + weight_dtype: str = "", layer_id: int = -1, ): """ @@ -857,10 +889,6 @@ def __init__( ), model_format=fd_config.model_config.model_format, ) - if self.nranks > 0: - if self.with_bias: - # col parallel - _set_var_distributed(self.bias, split_axis=0) self.reduce_results = reduce_results diff --git a/fastdeploy/model_executor/layers/lm_head.py b/fastdeploy/model_executor/layers/lm_head.py index ff1bdaa9217..bfc544ffbec 100644 --- a/fastdeploy/model_executor/layers/lm_head.py +++ b/fastdeploy/model_executor/layers/lm_head.py @@ -28,6 +28,7 @@ ) from fastdeploy.model_executor.utils import ( default_weight_loader, + free_tensor, set_weight_attrs, temporary_dtype, ) @@ -69,6 +70,7 @@ def __init__( self.bias_key: Optional[str] = prefix + ".bias" else: self.bias_key: Optional[str] = None + self.embedding_dim = embedding_dim self.tp_group = fd_config.parallel_config.tp_group self.column_cut = True self.nranks = fd_config.parallel_config.tensor_parallel_size @@ -77,34 +79,53 @@ def __init__( if num_embeddings % self.nranks != 0: num_embeddings = pad_vocab_size(num_embeddings, self.padding_size) + self.num_embeddings = num_embeddings + self.model_format = fd_config.model_config.model_format ColumnParallelLinear = fleet.meta_parallel.ColumnParallelLinear RowParallelLinear = fleet.meta_parallel.RowParallelLinear self.dtype = "float32" if fd_config.model_config.lm_head_fp32 else dtype self.tie_word_embeddings: bool = fd_config.model_config.tie_word_embeddings + self.need_gather = True with temporary_dtype(self.dtype): - if self.column_cut: - need_gather = True + if self.fd_config.load_config.load_choices == "default_v1" and ( + self.model_format == "torch" or self.tie_word_embeddings + ): + self.linear = RowParallelLinear( + num_embeddings, + embedding_dim, + mp_group=self.tp_group, + weight_attr=None, + has_bias=True if self.bias_key is not None else False, + input_is_parallel=False, + fuse_matmul_bias=False, + ) + set_weight_attrs( + self.linear.weight, + { + "weight_loader": default_weight_loader(self.fd_config), + }, + ) + set_weight_attrs(self.linear.weight, {"output_dim": False}) + elif self.column_cut: self.linear = ColumnParallelLinear( embedding_dim, num_embeddings, mp_group=self.tp_group, weight_attr=None, has_bias=True if self.bias_key is not None else False, - gather_output=need_gather, + gather_output=self.need_gather, fuse_matmul_bias=False, ) set_weight_attrs( self.linear.weight, { "weight_loader": default_weight_loader(self.fd_config), - "weight_need_transpose": self.fd_config.model_config.model_format == "torch", }, ) - if self.nranks > 1: - set_weight_attrs(self.linear.weight, {"output_dim": True}) + set_weight_attrs(self.linear.weight, {"output_dim": True}) else: self.linear = RowParallelLinear( embedding_dim, @@ -119,12 +140,32 @@ def __init__( self.linear.weight, { "weight_loader": default_weight_loader(self.fd_config), - "weight_need_transpose": self.fd_config.model_config.model_format == "torch", }, ) - - if self.nranks > 1: - set_weight_attrs(self.linear.weight, {"output_dim": False}) + set_weight_attrs(self.linear.weight, {"output_dim": False}) + + def process_weights_after_loading(self): + if not ( + self.fd_config.load_config.load_choices == "default_v1" + and (self.model_format == "torch" or self.tie_word_embeddings) + ): + return + if not self.linear.weight._is_initialized(): + self.linear.weight.initialize() + weight_transpose = self.linear.weight.transpose([1, 0]) + with temporary_dtype(self.dtype): + linear = fleet.meta_parallel.ColumnParallelLinear( + self.embedding_dim, + self.num_embeddings, + mp_group=self.tp_group, + weight_attr=None, + has_bias=True if self.bias_key is not None else False, + gather_output=self.need_gather, + fuse_matmul_bias=False, + ) + linear.weight.set_value(weight_transpose) + free_tensor(self.linear.weight) + self.linear = linear def load_state_dict(self, state_dict: Dict[str, paddle.Tensor | np.ndarray]): """ diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py index 978345a89c2..03191066713 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py @@ -19,7 +19,13 @@ import paddle from paddle import nn -from fastdeploy.model_executor.utils import default_weight_loader, set_weight_attrs +from fastdeploy.model_executor.utils import ( + TensorTracker, + default_weight_loader, + free_tensor, + set_weight_attrs, + weight_fully_copied, +) from fastdeploy.platforms import current_platform from ..quantization.quant_base import QuantMethodBase @@ -215,14 +221,21 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): num_experts = extra_weight_attrs.pop("num_experts") hidden_size = extra_weight_attrs.pop("hidden_size") moe_intermediate_size = extra_weight_attrs.pop("moe_intermediate_size") - if current_platform.is_cuda(): + self.model_format = extra_weight_attrs.get("model_format") + if current_platform.is_cuda() and self.model_format != "torch": self.up_gate_proj_weight_shape = [num_experts, hidden_size, moe_intermediate_size * 2] self.down_proj_weight_shape = [num_experts, moe_intermediate_size, hidden_size] - extra_weight_attrs = {**extra_weight_attrs, "SHARD_ID_TO_SHARDED_DIM": {"gate": 1, "down": 0, "up": 1}} + extra_weight_attrs = { + **(extra_weight_attrs or {}), + "SHARD_ID_TO_SHARDED_DIM": {"gate": 1, "down": 0, "up": 1}, + } else: self.up_gate_proj_weight_shape = [num_experts, moe_intermediate_size * 2, hidden_size] self.down_proj_weight_shape = [num_experts, hidden_size, moe_intermediate_size] - extra_weight_attrs = {**extra_weight_attrs, "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}} + extra_weight_attrs = { + **(extra_weight_attrs or {}), + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } layer.up_gate_proj_weight = layer.create_parameter( shape=self.up_gate_proj_weight_shape, @@ -235,31 +248,46 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) - + extra_weight_attrs["weight_loader"] = extra_weight_attrs.get( + "weight_loader", default_weight_loader(layer.fd_config) + ) + if self.model_format != "torch": + up_gate_proj_attrs = extra_weight_attrs + down_proj_attrs = extra_weight_attrs + else: + up_gate_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker( + shape=layer.up_gate_proj_weight.shape, + output_dim=extra_weight_attrs["SHARD_ID_TO_SHARDED_DIM"]["gate"], + ), + } + down_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker( + shape=layer.down_proj_weight.shape, + output_dim=extra_weight_attrs["SHARD_ID_TO_SHARDED_DIM"]["down"], + ), + } set_weight_attrs( layer.up_gate_proj_weight, - { - "weight_loader": extra_weight_attrs.get("weight_loader", default_weight_loader(layer.fd_config)), - "weight_need_transpose": extra_weight_attrs.get("model_format") == "torch", - }, + up_gate_proj_attrs, ) set_weight_attrs( layer.down_proj_weight, - { - "weight_loader": extra_weight_attrs.get("weight_loader", default_weight_loader(layer.fd_config)), - "weight_need_transpose": extra_weight_attrs.get("model_format") == "torch", - }, + down_proj_attrs, ) if layer.with_bias: + # only pt model now layer.up_gate_proj_bias = layer.create_parameter( - shape=[layer.num_experts, layer.moe_intermediate_size * 2], + shape=[num_experts, moe_intermediate_size * 2], dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) layer.down_proj_bias = layer.create_parameter( - shape=[layer.num_experts, layer.hidden_size], + shape=[num_experts, hidden_size], dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) @@ -267,13 +295,37 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): layer.up_gate_proj_bias, { "weight_loader": extra_weight_attrs.get("weight_loader", default_weight_loader(layer.fd_config)), - "model_format": extra_weight_attrs.get("model_format", ""), }, ) set_weight_attrs( layer.down_proj_bias, { "weight_loader": extra_weight_attrs.get("weight_loader", default_weight_loader(layer.fd_config)), - "model_format": extra_weight_attrs.get("model_format", ""), }, ) + + def process_weights_after_loading(self, layer): + if self.model_format != "torch": + return + if not weight_fully_copied(layer.up_gate_proj_weight) or not weight_fully_copied(layer.down_proj_weight): + return + up_gate_proj_weight_transpose = layer.up_gate_proj_weight.transpose([0, 2, 1]) + down_proj_weight_transpose = layer.down_proj_weight.transpose([0, 2, 1]) + up_gate_proj = layer.create_parameter( + shape=up_gate_proj_weight_transpose.shape, + dtype=up_gate_proj_weight_transpose.dtype, + default_initializer=paddle.nn.initializer.Normal(mean=0.0, std=0.02), + is_bias=False, + ) + up_gate_proj.copy_(up_gate_proj_weight_transpose, False) + free_tensor(layer.up_gate_proj_weight) + layer.up_gate_proj_weight = up_gate_proj + down_proj = layer.create_parameter( + shape=down_proj_weight_transpose.shape, + dtype=down_proj_weight_transpose.dtype, + default_initializer=paddle.nn.initializer.Normal(mean=0.0, std=0.02), + is_bias=False, + ) + down_proj.copy_(down_proj_weight_transpose, False) + free_tensor(layer.down_proj_weight) + layer.down_proj_weight = down_proj diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py index a5deea656d0..05a4783599c 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_cutlass_backend.py @@ -40,7 +40,13 @@ ) from fastdeploy.model_executor.layers.moe.moe import get_moe_scores -from fastdeploy.model_executor.utils import TensorTracker, free_tensor, set_weight_attrs +from fastdeploy.model_executor.utils import ( + TensorTracker, + free_tensor, + process_weight_transpose, + set_weight_attrs, + weight_fully_copied, +) class CutlassMoEMethod(UnquantizedFusedMoEMethod): @@ -1084,33 +1090,60 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): ] self.up_gate_proj_scale_shape = [layer.num_local_experts, layer.moe_intermediate_size * 2] self.down_proj_scale_shape = [layer.num_local_experts, layer.hidden_size] + self.model_format = extra_weight_attrs.get("model_format") # TODO(bukejiyu): remove v1 loader check when v0 loader is removed if self.quant_config.is_checkpoint_bf16 and layer.fd_config.load_config.load_choices == "default_v1": + if self.model_format != "torch": + up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.hidden_size, + layer.moe_intermediate_size * 2, + ] + down_proj_weight_shape = [layer.num_local_experts, layer.moe_intermediate_size, layer.hidden_size] + up_gate_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=up_gate_proj_weight_shape, output_dim=True), + } + down_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=down_proj_weight_shape, output_dim=False), + } + else: + up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.moe_intermediate_size * 2, + layer.hidden_size, + ] + down_proj_weight_shape = [layer.num_local_experts, layer.hidden_size, layer.moe_intermediate_size] + up_gate_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=up_gate_proj_weight_shape, output_dim=False), + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } + down_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=down_proj_weight_shape, output_dim=True), + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } + layer.up_gate_proj_weight = layer.create_parameter( - shape=[layer.num_local_experts, layer.hidden_size, layer.moe_intermediate_size * 2], + shape=up_gate_proj_weight_shape, dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) layer.down_proj_weight = layer.create_parameter( - shape=[layer.num_local_experts, layer.moe_intermediate_size, layer.hidden_size], + shape=down_proj_weight_shape, dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) - extra_weight_attrs["weight_need_transpose"] = extra_weight_attrs.get("model_format") == "torch" set_weight_attrs( layer.up_gate_proj_weight, - { - **extra_weight_attrs, - "tensor_track": TensorTracker(shape=layer.up_gate_proj_weight.shape, output_dim=True), - }, + up_gate_proj_attrs, ) set_weight_attrs( layer.down_proj_weight, - { - **extra_weight_attrs, - "tensor_track": TensorTracker(shape=layer.down_proj_weight.shape, output_dim=False), - }, + down_proj_attrs, ) else: self.weight_dtype = "int8" @@ -1157,7 +1190,7 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): default_initializer=paddle.nn.initializer.Constant(0), ), ) - extra_weight_attrs["weight_need_transpose"] = not extra_weight_attrs.get("model_format") == "torch" + # The v1 loader currently does not support loading offline quantized weight-only weights. moe_extra_weight_attrs = {**extra_weight_attrs, "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}} set_weight_attrs(layer.up_gate_proj_weight, moe_extra_weight_attrs) set_weight_attrs(layer.down_proj_weight, moe_extra_weight_attrs) @@ -1191,66 +1224,70 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): ) def process_weights_after_loading(self, layer): - """ """ - if not self.quant_config.is_checkpoint_bf16: - return - weight_id_map = {"gate_up": 0, "down": 1} - if ( - hasattr(layer.up_gate_proj_weight, "tensor_track") - and layer.up_gate_proj_weight.tensor_track is not None - and layer.up_gate_proj_weight.tensor_track.is_fully_copied() - ): - weight_type = "gate_up" - else: - weight_type = "down" - - # 1.init shape and type - # weight - weight_name = self.added_weight_attrs[weight_id_map[weight_type]] - unquantized_weight_name = weight_name.replace("quant_weight", "weight") - weight_shape = self.up_gate_proj_weight_shape if weight_type == "gate_up" else self.down_proj_weight_shape - weight_dtype = "int8" - # scale - scale_name = self.added_scale_attrs[weight_id_map[weight_type]] - scale_shape = self.up_gate_proj_scale_shape if weight_type == "gate_up" else self.down_proj_scale_shape - scale_dtype = self.default_dtype - - # 2.crate tmp tensor - - weight = paddle.empty(weight_shape, dtype=weight_dtype) - scale = paddle.empty(scale_shape, dtype=scale_dtype) + def _process_quantize(weight_idx): + # 1.init shape and type + # quantized_weight_name + weight_name = self.added_weight_attrs[weight_idx] + unquantized_weight_name = weight_name.replace("quant_weight", "weight") + weight_shape = self.up_gate_proj_weight_shape if weight_type == "gate_up" else self.down_proj_weight_shape + weight_dtype = "int8" + # scale + scale_name = self.added_scale_attrs[weight_idx] + scale_shape = self.up_gate_proj_scale_shape if weight_type == "gate_up" else self.down_proj_scale_shape + scale_dtype = self.default_dtype + + # 2.crate tmp tensor + + weight = paddle.empty(weight_shape, dtype=weight_dtype) + scale = paddle.empty(scale_shape, dtype=scale_dtype) + + # 3.quantize weight + + for expert_id in range(layer.num_local_experts): + weight[expert_id], scale[expert_id] = weight_quantize( + getattr(layer, unquantized_weight_name)[expert_id], algo=self.moe_quant_type + ) - # 3.quantize weight + free_tensor(getattr(layer, unquantized_weight_name)) - for expert_id in range(layer.num_local_experts): - weight[expert_id], scale[expert_id] = weight_quantize( - getattr(layer, unquantized_weight_name)[expert_id], algo=self.moe_quant_type + # create weight + setattr( + layer, + weight_name, + layer.create_parameter( + shape=weight_shape, + dtype=weight_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + # create scale + setattr( + layer, + scale_name, + layer.create_parameter( + shape=scale_shape, + dtype=scale_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), ) + getattr(layer, weight_name).copy_(weight, False) + getattr(layer, scale_name).copy_(scale, False) - free_tensor(getattr(layer, unquantized_weight_name)) + if self.quant_config.is_checkpoint_bf16: + weight_id_map = {"gate_up": 0, "down": 1} + if weight_fully_copied(layer.up_gate_proj_weight): + weight_type = "gate_up" + else: + weight_type = "down" - # create weight - setattr( - layer, - weight_name, - layer.create_parameter( - shape=weight_shape, - dtype=weight_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - # create scale - setattr( - layer, - scale_name, - layer.create_parameter( - shape=scale_shape, - dtype=scale_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - getattr(layer, weight_name).copy_(weight, False) - getattr(layer, scale_name).copy_(scale, False) + if self.model_format == "torch": + unquantized_weight_name = self.added_weight_attrs[weight_id_map[weight_type]].replace( + "quant_weight", "weight" + ) + process_weight_transpose(layer, unquantized_weight_name) + _process_quantize(weight_id_map[weight_type]) + else: + return def process_loaded_weights(self, layer: nn.Layer, state_dict): """ diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py index 0a1a8a8ee61..9910ac54c92 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py @@ -22,10 +22,9 @@ from fastdeploy.distributed.communication import tensor_model_parallel_all_reduce from fastdeploy.model_executor.layers.utils import get_tensor from fastdeploy.model_executor.ops.gpu import count_tokens_per_expert_func, deep_gemm -from fastdeploy.model_executor.utils import TensorTracker, set_weight_attrs -from fastdeploy.utils import ceil_div from .fused_moe_backend_base import MoEMethodBase +from .fused_moe_triton_backend import BlockWiseFP8MoEMethod class DeepGemmFusedMoeMethod(MoEMethodBase): @@ -37,184 +36,11 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): """ deepgemm create weight process. """ - self.up_gate_proj_weight_shape = [ - layer.num_local_experts, - layer.moe_intermediate_size * 2, - layer.hidden_size, - ] - self.down_proj_weight_shape = [ - layer.num_local_experts, - layer.hidden_size, - layer.moe_intermediate_size, - ] - self.up_gate_proj_scale_shape = [ - layer.num_local_experts, - ceil_div(layer.moe_intermediate_size * 2, self.quant_config.weight_block_size[0]), - ceil_div(layer.hidden_size, self.quant_config.weight_block_size[1]), - ] - self.down_proj_scale_shape = [ - layer.num_local_experts, - ceil_div(layer.hidden_size, self.quant_config.weight_block_size[0]), - ceil_div(layer.moe_intermediate_size, self.quant_config.weight_block_size[1]), - ] - # TODO(bukejiyu): remove v1 loader check when v0 loader is removed - if self.quant_config.is_checkpoint_bf16 and layer.fd_config.load_config.load_choices == "default_v1": - layer.up_gate_proj_weight = layer.create_parameter( - shape=[layer.num_local_experts, layer.hidden_size, layer.moe_intermediate_size * 2], - dtype=layer.weight_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ) - - layer.down_proj_weight = layer.create_parameter( - shape=[layer.num_local_experts, layer.moe_intermediate_size, layer.hidden_size], - dtype=layer.weight_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ) - extra_weight_attrs["weight_need_transpose"] = extra_weight_attrs.get("model_format") == "torch" - set_weight_attrs( - layer.up_gate_proj_weight, - { - **extra_weight_attrs, - "tensor_track": TensorTracker(shape=layer.up_gate_proj_weight.shape, output_dim=True), - }, - ) - set_weight_attrs( - layer.down_proj_weight, - { - **extra_weight_attrs, - "tensor_track": TensorTracker(shape=layer.down_proj_weight.shape, output_dim=False), - }, - ) - else: - self.weight_dtype = paddle.float8_e4m3fn - self.added_scale_attrs = ["up_gate_proj_weight_scale_inv", "down_proj_weight_scale_inv"] - up_gate_proj_weight_name = self.added_weight_attrs[0] - down_proj_weight_name = self.added_weight_attrs[1] - up_gate_proj_scale_name = self.added_scale_attrs[0] - down_proj_scale_name = self.added_scale_attrs[1] - setattr( - layer, - up_gate_proj_weight_name, - layer.create_parameter( - shape=self.up_gate_proj_weight_shape, - dtype=self.weight_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - setattr( - layer, - down_proj_weight_name, - layer.create_parameter( - shape=self.down_proj_weight_shape, - dtype=self.weight_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - # weight_scale - setattr( - layer, - up_gate_proj_scale_name, - layer.create_parameter( - shape=self.up_gate_proj_scale_shape, - dtype="float32", - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - setattr( - layer, - down_proj_scale_name, - layer.create_parameter( - shape=self.down_proj_scale_shape, - dtype="float32", - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - extra_weight_attrs["weight_need_transpose"] = not extra_weight_attrs.get("model_format") == "torch" - extra_weight_attrs = {**extra_weight_attrs, "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}} - set_weight_attrs( - getattr(layer, up_gate_proj_weight_name), - extra_weight_attrs, - ) - set_weight_attrs( - getattr(layer, up_gate_proj_scale_name), - extra_weight_attrs, - ) - - set_weight_attrs( - getattr(layer, down_proj_weight_name), - extra_weight_attrs, - ) - set_weight_attrs( - getattr(layer, down_proj_scale_name), - extra_weight_attrs, - ) + BlockWiseFP8MoEMethod.create_weights(self, layer, **extra_weight_attrs) def process_weights_after_loading(self, layer): """ """ - if not self.quant_config.is_checkpoint_bf16: - return - weight_id_map = {"gate_up": 0, "down": 1} - if ( - hasattr(layer.up_gate_proj_weight, "tensor_track") - and layer.up_gate_proj_weight.tensor_track is not None - and layer.up_gate_proj_weight.tensor_track.is_fully_copied() - ): - weight_type = "gate_up" - layer.up_gate_proj_weight.tensor_track = None - else: - weight_type = "down" - layer.down_proj_weight.tensor_track = None - - # 1.init shape and type - self.added_scale_attrs = ["up_gate_proj_weight_scale_inv", "down_proj_weight_scale_inv"] - # weight - weight_name = self.added_weight_attrs[weight_id_map[weight_type]] - unquantized_weight_name = weight_name.replace("quant_weight", "weight") - weight_shape = self.up_gate_proj_weight_shape if weight_type == "gate_up" else self.down_proj_weight_shape - weight_dtype = paddle.float8_e4m3fn - # scale - scale_name = self.added_scale_attrs[weight_id_map[weight_type]] - scale_shape = self.up_gate_proj_scale_shape if weight_type == "gate_up" else self.down_proj_scale_shape - scale_dtype = "float32" - - # 2.crate tmp tensor - - weight = paddle.empty(shape=[weight_shape[0], weight_shape[2], weight_shape[1]], dtype=weight_dtype) - scale = paddle.empty(shape=[scale_shape[0], scale_shape[2], scale_shape[1]], dtype=scale_dtype) - - # 3.quantize weight - from fastdeploy.model_executor.layers.utils import per_block_cast_to_fp8 - - for expert_id in range(layer.num_local_experts): - weight_quant, scale[expert_id] = per_block_cast_to_fp8( - getattr(layer, unquantized_weight_name)[expert_id], self.quant_config.weight_block_size - ) - weight[expert_id].copy_(weight_quant, False) - - getattr(layer, unquantized_weight_name).value().get_tensor()._clear() - - # create weight - setattr( - layer, - weight_name, - layer.create_parameter( - shape=weight.shape, - dtype=weight_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - # create scale - setattr( - layer, - scale_name, - layer.create_parameter( - shape=scale.shape, - dtype=scale_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - getattr(layer, weight_name).copy_(weight.transpose([0, 2, 1]).contiguous(), False) - getattr(layer, scale_name).copy_(scale.transpose([0, 2, 1]).contiguous(), False) + BlockWiseFP8MoEMethod.process_weights_after_loading(self, layer) def process_loaded_weights(self, layer: nn.Layer, state_dict): """ diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py index d6d1a5a4dc1..75e9a09f437 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py @@ -20,7 +20,13 @@ import fastdeploy from fastdeploy.distributed.communication import tensor_model_parallel_all_reduce from fastdeploy.model_executor.layers.utils import get_tensor -from fastdeploy.model_executor.utils import TensorTracker, set_weight_attrs +from fastdeploy.model_executor.utils import ( + TensorTracker, + free_tensor, + process_weight_transpose, + set_weight_attrs, + weight_fully_copied, +) from fastdeploy.utils import ceil_div from ..quantization.quant_base import QuantMethodBase @@ -59,10 +65,7 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): """ Triton MoE create weight process. """ - self.weight_dtype = "int8" self.default_dtype = layer._helper.get_default_dtype() - up_gate_proj_weight_name = self.added_weight_attrs[0] - down_proj_weight_name = self.added_weight_attrs[1] self.up_gate_proj_weight_shape = [ layer.num_local_experts, layer.hidden_size, @@ -73,36 +76,69 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): layer.moe_intermediate_size, layer.hidden_size, ] + self.model_format = extra_weight_attrs.get("model_format") # TODO(bukejiyu): remove v1 loader check when v0 loader is removed if self.quant_config.is_checkpoint_bf16 and layer.fd_config.load_config.load_choices == "default_v1": + if self.model_format != "torch": + up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.hidden_size, + layer.moe_intermediate_size * 2, + ] + down_proj_weight_shape = [layer.num_local_experts, layer.moe_intermediate_size, layer.hidden_size] + up_gate_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=up_gate_proj_weight_shape, output_dim=True), + } + down_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=down_proj_weight_shape, output_dim=False), + } + else: + up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.moe_intermediate_size * 2, + layer.hidden_size, + ] + down_proj_weight_shape = [layer.num_local_experts, layer.hidden_size, layer.moe_intermediate_size] + up_gate_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=up_gate_proj_weight_shape, output_dim=False), + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } + down_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=down_proj_weight_shape, output_dim=True), + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } layer.up_gate_proj_weight = layer.create_parameter( - shape=self.up_gate_proj_weight_shape, + shape=up_gate_proj_weight_shape, dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) layer.down_proj_weight = layer.create_parameter( - shape=self.down_proj_weight_shape, + shape=down_proj_weight_shape, dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) - extra_weight_attrs["weight_need_transpose"] = extra_weight_attrs.get("model_format") == "torch" set_weight_attrs( layer.up_gate_proj_weight, - { - **extra_weight_attrs, - "tensor_track": TensorTracker(shape=layer.up_gate_proj_weight.shape, output_dim=True), - }, + up_gate_proj_attrs, ) set_weight_attrs( layer.down_proj_weight, - { - **extra_weight_attrs, - "tensor_track": TensorTracker(shape=layer.down_proj_weight.shape, output_dim=False), - }, + down_proj_attrs, ) else: + self.weight_dtype = "int8" + + up_gate_proj_weight_name = self.added_weight_attrs[0] + down_proj_weight_name = self.added_weight_attrs[1] + up_gate_proj_scale_name = self.added_scale_attrs[0] + down_proj_scale_name = self.added_scale_attrs[1] + setattr( layer, up_gate_proj_weight_name, @@ -124,7 +160,7 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): # weight_scale setattr( layer, - self.added_scale_attrs[0], + up_gate_proj_scale_name, layer.create_parameter( shape=[layer.num_local_experts, layer.moe_intermediate_size * 2], dtype=self.default_dtype, @@ -133,7 +169,7 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): ) setattr( layer, - self.added_scale_attrs[1], + down_proj_scale_name, layer.create_parameter( shape=[layer.num_local_experts, layer.hidden_size], dtype=self.default_dtype, @@ -185,59 +221,62 @@ def process_loaded_weights(self, layer: nn.Layer, state_dict): def process_weights_after_loading(self, layer): """ """ - if not self.quant_config.is_checkpoint_bf16: - return - algo = layer.quant_method.quant_config.name() - assert algo == "wint8" - max_bound = 127 - weight_id_map = {"gate_up": 0, "down": 1} - if ( - hasattr(layer.up_gate_proj_weight, "tensor_track") - and layer.up_gate_proj_weight.tensor_track is not None - and layer.up_gate_proj_weight.tensor_track.is_fully_copied() - ): - weight_type = "gate_up" - layer.up_gate_proj_weight.tensor_track = None - else: - weight_type = "down" - layer.down_proj_weight.tensor_track = None + def _process_quantize(weight_idx): + algo = layer.quant_method.quant_config.name() + assert algo == "wint8" + max_bound = 127 + # weight + weight_name = self.added_weight_attrs[weight_id_map[weight_type]] + # scale + scale_name = self.added_scale_attrs[weight_id_map[weight_type]] - # weight - weight_name = self.added_weight_attrs[weight_id_map[weight_type]] - # scale - scale_name = self.added_scale_attrs[weight_id_map[weight_type]] + weight_tensor = getattr(layer, weight_name) + quanted_weight_scale = weight_tensor.abs().max(axis=1) + quanted_weight = weight_tensor / quanted_weight_scale[:, None, :] * max_bound + quanted_weight = paddle.round(quanted_weight).astype("int8") + quanted_weight_scale = quanted_weight_scale / max_bound - weight_tensor = getattr(layer, weight_name) - quanted_weight_scale = weight_tensor.abs().max(axis=1) - quanted_weight = weight_tensor / quanted_weight_scale[:, None, :] * max_bound - quanted_weight = paddle.round(quanted_weight).astype("int8") - quanted_weight_scale = quanted_weight_scale / max_bound + free_tensor(getattr(layer, weight_name)) - getattr(layer, weight_name).value().get_tensor()._clear() + # create weight + setattr( + layer, + weight_name, + layer.create_parameter( + shape=weight_tensor.shape, + dtype=quanted_weight.dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + # create scale + setattr( + layer, + scale_name, + layer.create_parameter( + shape=quanted_weight_scale.shape, + dtype=quanted_weight_scale.dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + getattr(layer, weight_name).copy_(quanted_weight, False) + getattr(layer, scale_name).copy_(quanted_weight_scale, False) - # create weight - setattr( - layer, - weight_name, - layer.create_parameter( - shape=weight_tensor.shape, - dtype=quanted_weight.dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - # create scale - setattr( - layer, - scale_name, - layer.create_parameter( - shape=quanted_weight_scale.shape, - dtype=quanted_weight_scale.dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - getattr(layer, weight_name).copy_(quanted_weight, False) - getattr(layer, scale_name).copy_(quanted_weight_scale, False) + if self.quant_config.is_checkpoint_bf16: + weight_id_map = {"gate_up": 0, "down": 1} + if weight_fully_copied(layer.up_gate_proj_weight): + weight_type = "gate_up" + else: + weight_type = "down" + if self.model_format == "torch": + unquantized_weight_name = self.added_weight_attrs[weight_id_map[weight_type]].replace( + "quant_weight", "weight" + ) + process_weight_transpose(layer, unquantized_weight_name) + _process_quantize(weight_id_map[weight_type]) + + else: + return def apply( self, @@ -443,34 +482,59 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): layer.hidden_size, 1, ] + self.model_format = extra_weight_attrs.get("model_format") if self.quant_config.is_checkpoint_bf16 and layer.fd_config.load_config.load_choices == "default_v1": + if self.model_format != "torch": + up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.hidden_size, + layer.moe_intermediate_size * 2, + ] + down_proj_weight_shape = [layer.num_local_experts, layer.moe_intermediate_size, layer.hidden_size] + up_gate_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=up_gate_proj_weight_shape, output_dim=True), + } + down_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=down_proj_weight_shape, output_dim=False), + } + else: + up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.moe_intermediate_size * 2, + layer.hidden_size, + ] + down_proj_weight_shape = [layer.num_local_experts, layer.hidden_size, layer.moe_intermediate_size] + up_gate_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=up_gate_proj_weight_shape, output_dim=False), + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } + down_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=down_proj_weight_shape, output_dim=True), + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } + layer.up_gate_proj_weight = layer.create_parameter( - shape=[layer.num_local_experts, layer.hidden_size, layer.moe_intermediate_size * 2], + shape=up_gate_proj_weight_shape, dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) layer.down_proj_weight = layer.create_parameter( - shape=[layer.num_local_experts, layer.moe_intermediate_size, layer.hidden_size], + shape=down_proj_weight_shape, dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) - - extra_weight_attrs["weight_need_transpose"] = extra_weight_attrs.get("model_format") == "torch" - set_weight_attrs( layer.up_gate_proj_weight, - { - **extra_weight_attrs, - "tensor_track": TensorTracker(shape=layer.up_gate_proj_weight.shape, output_dim=True), - }, + up_gate_proj_attrs, ) set_weight_attrs( layer.down_proj_weight, - { - **extra_weight_attrs, - "tensor_track": TensorTracker(shape=layer.down_proj_weight.shape, output_dim=False), - }, + down_proj_attrs, ) else: self.weight_dtype = paddle.float8_e4m3fn @@ -518,66 +582,70 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): def process_weights_after_loading(self, layer): """ """ - if not self.quant_config.is_checkpoint_bf16: - return - weight_id_map = {"gate_up": 0, "down": 1} - if ( - hasattr(layer.up_gate_proj_weight, "tensor_track") - and layer.up_gate_proj_weight.tensor_track is not None - and layer.up_gate_proj_weight.tensor_track.is_fully_copied() - ): - weight_type = "gate_up" - layer.up_gate_proj_weight.tensor_track = None - else: - weight_type = "down" - layer.down_proj_weight.tensor_track = None - # weight - weight_name = self.added_weight_attrs[weight_id_map[weight_type]] - weight_shape = self.up_gate_proj_weight_shape if weight_type == "gate_up" else self.down_proj_weight_shape - weight_dtype = paddle.float8_e4m3fn - # scale - scale_name = self.added_scale_attrs[weight_id_map[weight_type]] - scale_shape = self.up_gate_proj_scale_shape if weight_type == "gate_up" else self.down_proj_scale_shape - scale_dtype = "float32" + def _process_quantize(weight_idx): + # weight + weight_name = self.added_weight_attrs[weight_idx] + weight_shape = self.up_gate_proj_weight_shape if weight_type == "gate_up" else self.down_proj_weight_shape + weight_dtype = paddle.float8_e4m3fn + # scale + scale_name = self.added_scale_attrs[weight_idx] + scale_shape = self.up_gate_proj_scale_shape if weight_type == "gate_up" else self.down_proj_scale_shape + scale_dtype = "float32" + + # 2.crate tmp tensor + + weight = paddle.empty(shape=weight_shape, dtype=weight_dtype) + scale = paddle.empty(shape=scale_shape, dtype=scale_dtype) - # 2.crate tmp tensor + # 3.quantize weight + from fastdeploy.model_executor.layers.utils import per_token_cast_to_fp8 - weight = paddle.empty(shape=weight_shape, dtype=weight_dtype) - scale = paddle.empty(shape=scale_shape, dtype=scale_dtype) + for expert_id in range(layer.num_experts): + weight_quant, scale[expert_id] = per_token_cast_to_fp8( + getattr(layer, weight_name)[expert_id].transpose([1, 0]).contiguous(), + ) + weight[expert_id].copy_(weight_quant, False) - # 3.quantize weight - from fastdeploy.model_executor.layers.utils import per_token_cast_to_fp8 + free_tensor(getattr(layer, weight_name)) - for expert_id in range(layer.num_experts): - weight_quant, scale[expert_id] = per_token_cast_to_fp8( - getattr(layer, weight_name)[expert_id].transpose([1, 0]).contiguous(), + # create weight + setattr( + layer, + weight_name, + layer.create_parameter( + shape=weight_shape, + dtype=weight_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), ) - weight[expert_id].copy_(weight_quant, False) - getattr(layer, weight_name).value().get_tensor()._clear() + # create scale + setattr( + layer, + scale_name, + layer.create_parameter( + shape=scale_shape, + dtype=scale_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + getattr(layer, weight_name).copy_(weight, False) + getattr(layer, scale_name).copy_(scale, False) + + if self.quant_config.is_checkpoint_bf16: + # dynamic quantize + weight_id_map = {"gate_up": 0, "down": 1} + if weight_fully_copied(layer.up_gate_proj_weight): + weight_type = "gate_up" + else: + weight_type = "down" + if self.model_format == "torch": + # pt model + process_weight_transpose(layer, self.added_weight_attrs[weight_id_map[weight_type]]) - # create weight - setattr( - layer, - weight_name, - layer.create_parameter( - shape=weight_shape, - dtype=weight_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - # create scale - setattr( - layer, - scale_name, - layer.create_parameter( - shape=scale_shape, - dtype=scale_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - getattr(layer, weight_name).copy_(weight, False) - getattr(layer, scale_name).copy_(scale, False) + _process_quantize(weight_id_map[weight_type]) + else: + return def check(self, layer: nn.Layer, up_gate_proj_weights, down_proj_weights): """ @@ -1107,45 +1175,123 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): ceil_div(layer.moe_intermediate_size, self.quant_config.weight_block_size[1]), ] # TODO(bukejiyu): remove v1 loader check when v0 loader is removed + self.model_format = extra_weight_attrs.get("model_format") + if self.quant_config.is_checkpoint_bf16 and layer.fd_config.load_config.load_choices == "default_v1": + if self.model_format != "torch": + up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.hidden_size, + layer.moe_intermediate_size * 2, + ] + down_proj_weight_shape = [layer.num_local_experts, layer.moe_intermediate_size, layer.hidden_size] + up_gate_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=up_gate_proj_weight_shape, output_dim=True), + } + down_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=down_proj_weight_shape, output_dim=False), + } + else: + up_gate_proj_weight_shape = [ + layer.num_local_experts, + layer.moe_intermediate_size * 2, + layer.hidden_size, + ] + down_proj_weight_shape = [layer.num_local_experts, layer.hidden_size, layer.moe_intermediate_size] + up_gate_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=up_gate_proj_weight_shape, output_dim=False), + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } + down_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker(shape=down_proj_weight_shape, output_dim=True), + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } layer.up_gate_proj_weight = layer.create_parameter( - shape=[layer.num_local_experts, layer.hidden_size, layer.moe_intermediate_size * 2], + shape=up_gate_proj_weight_shape, dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) layer.down_proj_weight = layer.create_parameter( - shape=[layer.num_local_experts, layer.moe_intermediate_size, layer.hidden_size], + shape=down_proj_weight_shape, dtype=layer.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ) - extra_weight_attrs["weight_need_transpose"] = extra_weight_attrs.get("model_format") == "torch" + set_weight_attrs( layer.up_gate_proj_weight, - { - **extra_weight_attrs, - "tensor_track": TensorTracker(shape=layer.up_gate_proj_weight.shape, output_dim=True), - }, + up_gate_proj_attrs, ) set_weight_attrs( layer.down_proj_weight, - { - **extra_weight_attrs, - "tensor_track": TensorTracker(shape=layer.down_proj_weight.shape, output_dim=False), - }, + down_proj_attrs, ) else: + # 1.init shape + extra_weight_attrs = {**extra_weight_attrs} + if layer.fd_config.load_config.load_choices == "default_v1": + if self.model_format != "torch": + # transpose [0,2,1] + up_gate_proj_weight_shape = ( + self.up_gate_proj_weight_shape[:1] + self.up_gate_proj_weight_shape[1:][::-1] + ) + up_gate_proj_scale_shape = ( + self.up_gate_proj_scale_shape[:1] + self.up_gate_proj_scale_shape[1:][::-1] + ) + down_proj_weight_shape = self.down_proj_weight_shape[:1] + self.down_proj_weight_shape[1:][::-1] + down_proj_scale_shape = self.down_proj_scale_shape[:1] + self.down_proj_scale_shape[1:][::-1] + up_gate_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker( + shape=up_gate_proj_weight_shape, + output_dim=False, + ), + } + down_proj_attrs = { + **extra_weight_attrs, + "tensor_track": TensorTracker( + shape=down_proj_weight_shape, + output_dim=False, + ), + } + else: + up_gate_proj_weight_shape = self.up_gate_proj_weight_shape + up_gate_proj_scale_shape = self.up_gate_proj_scale_shape + down_proj_weight_shape = self.down_proj_weight_shape + down_proj_scale_shape = self.down_proj_scale_shape + up_gate_proj_attrs = { + **extra_weight_attrs, + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } + down_proj_attrs = { + **extra_weight_attrs, + "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}, + } + else: + # v0 loader + up_gate_proj_weight_shape = self.up_gate_proj_weight_shape + up_gate_proj_scale_shape = self.up_gate_proj_scale_shape + down_proj_weight_shape = self.down_proj_weight_shape + down_proj_scale_shape = self.down_proj_scale_shape + up_gate_proj_attrs = {} + down_proj_attrs = {} + self.weight_dtype = paddle.float8_e4m3fn self.added_scale_attrs = ["up_gate_proj_weight_scale_inv", "down_proj_weight_scale_inv"] up_gate_proj_weight_name = self.added_weight_attrs[0] down_proj_weight_name = self.added_weight_attrs[1] up_gate_proj_scale_name = self.added_scale_attrs[0] down_proj_scale_name = self.added_scale_attrs[1] + setattr( layer, up_gate_proj_weight_name, layer.create_parameter( - shape=self.up_gate_proj_weight_shape, + shape=up_gate_proj_weight_shape, dtype=self.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ), @@ -1154,7 +1300,7 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): layer, down_proj_weight_name, layer.create_parameter( - shape=self.down_proj_weight_shape, + shape=down_proj_weight_shape, dtype=self.weight_dtype, default_initializer=paddle.nn.initializer.Constant(0), ), @@ -1164,7 +1310,7 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): layer, up_gate_proj_scale_name, layer.create_parameter( - shape=self.up_gate_proj_scale_shape, + shape=up_gate_proj_scale_shape, dtype="float32", default_initializer=paddle.nn.initializer.Constant(0), ), @@ -1173,97 +1319,116 @@ def create_weights(self, layer: nn.Layer, **extra_weight_attrs): layer, down_proj_scale_name, layer.create_parameter( - shape=self.down_proj_scale_shape, + shape=down_proj_scale_shape, dtype="float32", default_initializer=paddle.nn.initializer.Constant(0), ), ) - - extra_weight_attrs["weight_need_transpose"] = not extra_weight_attrs.get("model_format") == "torch" - extra_weight_attrs = {**extra_weight_attrs, "SHARD_ID_TO_SHARDED_DIM": {"gate": 0, "down": 1, "up": 0}} set_weight_attrs( getattr(layer, up_gate_proj_weight_name), - extra_weight_attrs, + up_gate_proj_attrs, ) set_weight_attrs( getattr(layer, up_gate_proj_scale_name), - extra_weight_attrs, + up_gate_proj_attrs, ) set_weight_attrs( getattr(layer, down_proj_weight_name), - extra_weight_attrs, + down_proj_attrs, ) set_weight_attrs( getattr(layer, down_proj_scale_name), - extra_weight_attrs, + down_proj_attrs, ) def process_weights_after_loading(self, layer): - """ """ - if not self.quant_config.is_checkpoint_bf16: - return - weight_id_map = {"gate_up": 0, "down": 1} - if ( - hasattr(layer.up_gate_proj_weight, "tensor_track") - and layer.up_gate_proj_weight.tensor_track is not None - and layer.up_gate_proj_weight.tensor_track.is_fully_copied() - ): - weight_type = "gate_up" - layer.up_gate_proj_weight.tensor_track = None - else: - weight_type = "down" - layer.down_proj_weight.tensor_track = None - - # 1.init shape and type - self.added_scale_attrs = ["up_gate_proj_weight_scale_inv", "down_proj_weight_scale_inv"] - # weight - weight_name = self.added_weight_attrs[weight_id_map[weight_type]] - unquantized_weight_name = weight_name.replace("quant_weight", "weight") - weight_shape = self.up_gate_proj_weight_shape if weight_type == "gate_up" else self.down_proj_weight_shape - weight_dtype = paddle.float8_e4m3fn - # scale - scale_name = self.added_scale_attrs[weight_id_map[weight_type]] - scale_shape = self.up_gate_proj_scale_shape if weight_type == "gate_up" else self.down_proj_scale_shape - scale_dtype = "float32" - - # 2.crate tmp tensor - - weight = paddle.empty(shape=[weight_shape[0], weight_shape[2], weight_shape[1]], dtype=weight_dtype) - scale = paddle.empty(shape=[scale_shape[0], scale_shape[2], scale_shape[1]], dtype=scale_dtype) - - # 3.quantize weight - from fastdeploy.model_executor.layers.utils import per_block_cast_to_fp8 - - for expert_id in range(layer.num_local_experts): - weight_quant, scale[expert_id] = per_block_cast_to_fp8( - getattr(layer, unquantized_weight_name)[expert_id], self.quant_config.weight_block_size - ) - weight[expert_id].copy_(weight_quant, False) - getattr(layer, unquantized_weight_name).value().get_tensor()._clear() - # create weight - setattr( - layer, - weight_name, - layer.create_parameter( - shape=weight.shape, - dtype=weight_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - # create scale - setattr( - layer, - scale_name, - layer.create_parameter( - shape=scale.shape, - dtype=scale_dtype, - default_initializer=paddle.nn.initializer.Constant(0), - ), - ) - getattr(layer, weight_name).copy_(weight.transpose([0, 2, 1]).contiguous(), False) - getattr(layer, scale_name).copy_(scale.transpose([0, 2, 1]).contiguous(), False) + def _process_quantize(weight_idx): + # 1.init shape and type + self.added_scale_attrs = ["up_gate_proj_weight_scale_inv", "down_proj_weight_scale_inv"] + # weight + weight_name = self.added_weight_attrs[weight_idx] + unquantized_weight_name = weight_name.replace("quant_weight", "weight") + weight_shape = self.up_gate_proj_weight_shape if weight_type == "gate_up" else self.down_proj_weight_shape + weight_dtype = paddle.float8_e4m3fn + # scale + scale_name = self.added_scale_attrs[weight_idx] + scale_shape = self.up_gate_proj_scale_shape if weight_type == "gate_up" else self.down_proj_scale_shape + scale_dtype = "float32" + + # 2.crate tmp tensor + + weight = paddle.empty(shape=[weight_shape[0], weight_shape[2], weight_shape[1]], dtype=weight_dtype) + scale = paddle.empty(shape=[scale_shape[0], scale_shape[2], scale_shape[1]], dtype=scale_dtype) + + # 3.quantize weight + from fastdeploy.model_executor.layers.utils import per_block_cast_to_fp8 + + for expert_id in range(layer.num_local_experts): + weight_quant, scale[expert_id] = per_block_cast_to_fp8( + getattr(layer, unquantized_weight_name)[expert_id], self.quant_config.weight_block_size + ) + weight[expert_id].copy_(weight_quant, False) + + free_tensor(getattr(layer, unquantized_weight_name)) + + # create weight + setattr( + layer, + weight_name, + layer.create_parameter( + shape=weight.shape, + dtype=weight_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + # create scale + setattr( + layer, + scale_name, + layer.create_parameter( + shape=scale.shape, + dtype=scale_dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + getattr(layer, weight_name).copy_(weight.transpose([0, 2, 1]).contiguous(), False) + getattr(layer, scale_name).copy_(scale.transpose([0, 2, 1]).contiguous(), False) + + if self.quant_config.is_checkpoint_bf16: + # dynamic quantize + weight_id_map = {"gate_up": 0, "down": 1} + if weight_fully_copied(layer.up_gate_proj_weight): + weight_type = "gate_up" + else: + weight_type = "down" + if self.model_format == "torch": + # pt model + unquantized_weight_name = self.added_weight_attrs[weight_id_map[weight_type]].replace( + "quant_weight", "weight" + ) + process_weight_transpose(layer, unquantized_weight_name) + _process_quantize(weight_id_map[weight_type]) + else: + if self.model_format != "torch": + up_gate_proj_weight_name = self.added_weight_attrs[0] + down_proj_weight_name = self.added_weight_attrs[1] + up_gate_proj_scale_name = self.added_scale_attrs[0] + down_proj_scale_name = self.added_scale_attrs[1] + if ( + not weight_fully_copied(getattr(layer, up_gate_proj_weight_name)) + or not weight_fully_copied(getattr(layer, down_proj_weight_name)) + or not weight_fully_copied(getattr(layer, up_gate_proj_scale_name)) + or not weight_fully_copied(getattr(layer, down_proj_scale_name)) + ): + return + process_weight_transpose(layer, up_gate_proj_weight_name) + process_weight_transpose(layer, down_proj_weight_name) + process_weight_transpose(layer, up_gate_proj_scale_name) + process_weight_transpose(layer, down_proj_scale_name) + else: + return def process_loaded_weights(self, layer: nn.Layer, state_dict): """ diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index a27015c1a84..897e129e1e7 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -16,14 +16,13 @@ from typing import Optional -import numpy as np import paddle from paddle import nn from paddleformers.utils.log import logger from fastdeploy import envs from fastdeploy.model_executor.layers.utils import get_tensor -from fastdeploy.model_executor.utils import slice_fn +from fastdeploy.model_executor.utils import h2d_copy, slice_fn from fastdeploy.platforms import current_platform from fastdeploy.worker.experts_manager import RedundantExpertManger @@ -31,6 +30,7 @@ from fastdeploy.model_executor.ops.gpu import noaux_tc except: logger.warning("import noaux_tc Failed!") +import numpy as np def get_moe_method(): @@ -118,6 +118,7 @@ def __init__( weight_key_map: dict = {}, with_bias: bool = False, activation="swiglu", + model_format: Optional[str] = None, ): """ Initialize the Moe layer with given parameters. @@ -201,7 +202,7 @@ def __init__( self.quant_method.create_weights( self, weight_loader=self.weight_loader, - model_format=fd_config.model_config.model_format, + model_format=fd_config.model_config.model_format if model_format is None else model_format, num_experts=self.num_local_experts if self.ep_size > 1 else self.num_experts, hidden_size=self.hidden_size, moe_intermediate_size=self.moe_intermediate_size, @@ -214,72 +215,68 @@ def __init__( tp_size={self.tp_size}." ) - def weight_loader(self, param, loaded_weight, expert_id, shard_id: Optional[str] = None): + def weight_loader( + self, param, loaded_weight, expert_id, shard_id: Optional[str] = None, source: Optional[str] = None + ): + """ + source:Avoid redundant transpose of fused weights when weight_loader is called iteratively + """ if expert_id is None and shard_id is None: # MoE experts has been fused in disk self._load_fused_experts_weight(param, loaded_weight) return + if hasattr(param, "SHARD_ID_TO_SHARDED_DIM"): + SHARD_ID_TO_SHARDED_DIM = param.SHARD_ID_TO_SHARDED_DIM + elif current_platform.is_cuda() or current_platform.is_iluvatar(): + SHARD_ID_TO_SHARDED_DIM = {"gate": 1, "down": 0, "up": 1} + else: + SHARD_ID_TO_SHARDED_DIM = {"gate": 0, "down": 1, "up": 0} - if expert_id - self.expert_id_offset >= 0 and expert_id - self.expert_id_offset < self.num_local_experts: - if hasattr(param, "SHARD_ID_TO_SHARDED_DIM"): - SHARD_ID_TO_SHARDED_DIM = param.SHARD_ID_TO_SHARDED_DIM - elif current_platform.is_cuda() or current_platform.is_iluvatar(): - SHARD_ID_TO_SHARDED_DIM = {"gate": 1, "down": 0, "up": 1} - else: - SHARD_ID_TO_SHARDED_DIM = {"gate": 0, "down": 1, "up": 0} - - if not param._is_initialized(): - param.initialize() - - if shard_id is None: - # 1.gate up fused in disk - weight_need_transpose = getattr(param, "weight_need_transpose", False) - output_size = param[expert_id - self.expert_id_offset].shape[SHARD_ID_TO_SHARDED_DIM["gate"]] - per_rank = output_size // 2 - start = self.tp_rank * per_rank - loaded_weight_shard_gate = slice_fn( - loaded_weight, weight_need_transpose ^ SHARD_ID_TO_SHARDED_DIM["gate"], start, start + per_rank - ) - self._load_gate_up_weight( - param, - expert_id, - loaded_weight_shard_gate, - "gate", - SHARD_ID_TO_SHARDED_DIM["gate"], - is_sharded=True, - ) - start_up = output_size // 2 * self.tp_size + self.tp_rank * per_rank - loaded_weight_shard_up = slice_fn( - loaded_weight, weight_need_transpose ^ SHARD_ID_TO_SHARDED_DIM["up"], start_up, start_up + per_rank - ) - self._load_gate_up_weight( - param, expert_id, loaded_weight_shard_up, "up", SHARD_ID_TO_SHARDED_DIM["up"], is_sharded=True - ) - else: - # 2.gate up splited in disk - assert shard_id in ["gate", "down", "up"] - self._load_expert_weight( - param=param, - expert_id=expert_id, - loaded_weight=loaded_weight, - shard_id=shard_id, - shard_dim=SHARD_ID_TO_SHARDED_DIM[shard_id], + if not param._is_initialized(): + param.initialize() + if not (expert_id - self.expert_id_offset >= 0 and expert_id - self.expert_id_offset < self.num_local_experts): + return + weight_need_transpose = getattr(param, "weight_need_transpose", False) + if shard_id is None: + # 1.gate up fused in disk + if weight_need_transpose: + loaded_weight = get_tensor(loaded_weight) + loaded_weight = loaded_weight.transpose([1, 0]) + output_size = param[expert_id - self.expert_id_offset].shape[SHARD_ID_TO_SHARDED_DIM["gate"]] + shard_offsets = [ + # (shard_id, shard_offset, shard_size) + ("gate", 0, output_size // 2 * self.tp_size), + ("up", output_size // 2 * self.tp_size, output_size // 2 * self.tp_size), + ] + + for shard_id, shard_offset, shard_size in shard_offsets: + loaded_weight_shard = slice_fn( + loaded_weight, SHARD_ID_TO_SHARDED_DIM[shard_id], shard_offset, shard_offset + shard_size ) + self.weight_loader(param, loaded_weight_shard, expert_id, shard_id, "fused") + else: + if weight_need_transpose and source != "fused": + loaded_weight = get_tensor(loaded_weight) + loaded_weight = loaded_weight.transpose([1, 0]) + # 2.gate up splited in disk + assert shard_id in ["gate", "down", "up"] + self._load_expert_weight( + param=param, + expert_id=expert_id, + loaded_weight=loaded_weight, + shard_id=shard_id, + shard_dim=SHARD_ID_TO_SHARDED_DIM[shard_id], + ) def _load_gate_up_weight(self, param, expert_id, loaded_weight, shard_id, shard_dim=None, is_sharded=False): - weight_need_transpose = getattr(param, "weight_need_transpose", False) if self.tp_size > 1 and not is_sharded: - tp_shard_dim = weight_need_transpose ^ shard_dim + tp_shard_dim = shard_dim weight_dim = -1 if tp_shard_dim else 0 - if isinstance(loaded_weight, (np.ndarray, paddle.Tensor)): - size = loaded_weight.shape[weight_dim] - else: - size = loaded_weight.get_shape()[weight_dim] + size = loaded_weight.shape[weight_dim] block_size = size // self.tp_size shard_offset = self.tp_rank * block_size shard_size = (self.tp_rank + 1) * block_size loaded_weight = slice_fn(loaded_weight, tp_shard_dim, shard_offset, shard_size) - loaded_weight = get_tensor(loaded_weight) expert_param = param[expert_id - self.expert_id_offset] dim = -1 if shard_dim else 0 param_shard_size = expert_param.shape[dim] // 2 @@ -310,22 +307,17 @@ def _load_gate_up_weight(self, param, expert_id, loaded_weight, shard_id, shard_ loaded_weight = loaded_weight.view(expert_param.dtype) else: loaded_weight = loaded_weight.cast(expert_param.dtype) - expert_param.copy_(loaded_weight, False) + h2d_copy(dst=expert_param, src=loaded_weight) def _load_down_weight(self, param, expert_id, loaded_weight, shard_id, shard_dim=None): - weight_need_transpose = getattr(param, "weight_need_transpose", False) if self.tp_size > 1 and shard_dim is not None: - tp_shard_dim = weight_need_transpose ^ shard_dim + tp_shard_dim = shard_dim dim = -1 if tp_shard_dim else 0 - if isinstance(loaded_weight, paddle.Tensor): - size = loaded_weight.shape[dim] - else: - size = loaded_weight.get_shape()[dim] + size = loaded_weight.shape[dim] block_size = size // self.tp_size shard_offset = self.tp_rank * block_size shard_size = (self.tp_rank + 1) * block_size loaded_weight = slice_fn(loaded_weight, tp_shard_dim, shard_offset, shard_size) - loaded_weight = get_tensor(loaded_weight) expert_param = param[expert_id - self.expert_id_offset] if hasattr(param, "tensor_track"): # for dyn quant @@ -341,7 +333,7 @@ def _load_down_weight(self, param, expert_id, loaded_weight, shard_id, shard_dim loaded_weight = loaded_weight.view(expert_param.dtype) else: loaded_weight = loaded_weight.cast(expert_param.dtype) - expert_param.copy_(loaded_weight, False) + h2d_copy(dst=expert_param, src=loaded_weight) def _load_fused_experts_weight(self, param, loaded_weight): if self.tp_size > 1: @@ -357,8 +349,7 @@ def _load_fused_experts_weight(self, param, loaded_weight): assert param.shape == loaded_weight.shape, ( f"Attempted to load weight ({loaded_weight.shape}) " f"into parameter ({param.shape})" ) - loaded_weight = get_tensor(loaded_weight) - param.copy_(loaded_weight, False) + h2d_copy(dst=param, src=loaded_weight) if hasattr(param, "tensor_track"): for i in range(self.num_local_experts): diff --git a/fastdeploy/model_executor/layers/quantization/block_wise_fp8.py b/fastdeploy/model_executor/layers/quantization/block_wise_fp8.py index e138b546f89..a7b61fc0ef8 100644 --- a/fastdeploy/model_executor/layers/quantization/block_wise_fp8.py +++ b/fastdeploy/model_executor/layers/quantization/block_wise_fp8.py @@ -22,10 +22,15 @@ from fastdeploy import envs from fastdeploy.model_executor.layers.linear import ( MergedColumnParallelLinear, + MergedReplicatedLinear, QKVParallelLinear, ) from fastdeploy.model_executor.layers.moe import FusedMoE -from fastdeploy.model_executor.utils import TensorTracker, set_weight_attrs +from fastdeploy.model_executor.utils import ( + TensorTracker, + process_weight_transpose, + set_weight_attrs, +) from ..utils import get_tensor, per_block_cast_to_fp8 from .quant_base import QuantConfigBase, QuantMethodBase @@ -90,51 +95,66 @@ def __init__( def create_weights(self, layer, **extra_weight_attrs): # TODO(bukejiyu): remove v1 loader check when v0 loader is removed + self.model_format = extra_weight_attrs.get("model_format") if self.quant_config.is_checkpoint_bf16 and layer.fd_config.load_config.load_choices == "default_v1": + weight_shape = layer.weight_shape[::-1] if self.model_format == "torch" else layer.weight_shape layer.weight = layer.create_parameter( - shape=layer.weight_shape, + shape=weight_shape, dtype=layer.weight_dtype, is_bias=False, default_initializer=paddle.nn.initializer.Constant(0), ) - extra_weight_attrs["weight_need_transpose"] = extra_weight_attrs.get("model_format") == "torch" quant_attrs = extra_weight_attrs - if isinstance(layer, MergedColumnParallelLinear) or isinstance(layer, QKVParallelLinear): + if ( + isinstance(layer, MergedColumnParallelLinear) + or isinstance(layer, QKVParallelLinear) + or isinstance(layer, MergedReplicatedLinear) + ): + tensor_output_dim = (self.model_format == "torch") ^ quant_attrs.get("output_dim", True) quant_attrs = { **extra_weight_attrs, - "tensor_track": TensorTracker( - shape=layer.weight_shape, output_dim=extra_weight_attrs.get("output_dim") - ), + "tensor_track": TensorTracker(shape=weight_shape, output_dim=tensor_output_dim), } + if self.model_format == "torch" and "output_dim" in quant_attrs: + quant_attrs["output_dim"] = not quant_attrs["output_dim"] set_weight_attrs( layer.weight, quant_attrs, ) else: layer.weight_shape.reverse() + weight_scale_inv_shape = [ + (layer.weight_shape[0] + self.quant_config.weight_block_size[0] - 1) + // self.quant_config.weight_block_size[0], + (layer.weight_shape[1] + self.quant_config.weight_block_size[1] - 1) + // self.quant_config.weight_block_size[1], + ] + + if self.model_format != "torch" and layer.fd_config.load_config.load_choices == "default_v1": + weight_shape = layer.weight_shape[::-1] + weight_scale_inv_shape = weight_scale_inv_shape[::-1] + else: + # v0 loader or torch model format + weight_shape = layer.weight_shape + weight_scale_inv_shape = weight_scale_inv_shape + extra_weight_attrs["output_dim"] = ( + not extra_weight_attrs["output_dim"] if extra_weight_attrs["output_dim"] is not None else None + ) + layer.weight_dtype = "float8_e4m3fn" layer.weight = layer.create_parameter( - shape=layer.weight_shape, + shape=weight_shape, dtype=layer.weight_dtype, is_bias=False, default_initializer=paddle.nn.initializer.Constant(0), ) layer.weight_scale_inv = layer.create_parameter( - shape=[ - (layer.weight_shape[0] + self.quant_config.weight_block_size[0] - 1) - // self.quant_config.weight_block_size[0], - (layer.weight_shape[1] + self.quant_config.weight_block_size[1] - 1) - // self.quant_config.weight_block_size[1], - ], + shape=weight_scale_inv_shape, dtype="float32", is_bias=False, ) - extra_weight_attrs["output_dim"] = ( - not extra_weight_attrs["output_dim"] if extra_weight_attrs["output_dim"] is not None else None - ) - extra_weight_attrs["weight_need_transpose"] = not extra_weight_attrs.get("model_format") == "torch" set_weight_attrs( layer.weight, extra_weight_attrs, @@ -148,31 +168,41 @@ def create_weights(self, layer, **extra_weight_attrs): ) def process_weights_after_loading(self, layer) -> None: - if not self.quant_config.is_checkpoint_bf16: - return - weight_tensor = layer.weight.transpose([1, 0]) - quanted_weight_tensor, weight_block_scale_tensor = per_block_cast_to_fp8(weight_tensor) + def _process_quantize(): + weight_tensor = layer.weight.transpose([1, 0]) + quanted_weight_tensor, weight_block_scale_tensor = per_block_cast_to_fp8(weight_tensor) - if hasattr(layer.weight, "tensor_track"): - layer.weight.tensor_track = None - layer.weight.value().get_tensor()._clear() - del layer.weight + if hasattr(layer.weight, "tensor_track"): + layer.weight.tensor_track = None + layer.weight.value().get_tensor()._clear() + del layer.weight - layer.weight = layer.create_parameter( - shape=quanted_weight_tensor.shape, - dtype="float8_e4m3fn", - is_bias=False, - default_initializer=paddle.nn.initializer.Constant(0), - ) - layer.weight_scale_inv = layer.create_parameter( - shape=weight_block_scale_tensor.shape, - dtype="float32", - is_bias=False, - default_initializer=paddle.nn.initializer.Constant(0), - ) + layer.weight = layer.create_parameter( + shape=quanted_weight_tensor.shape, + dtype="float8_e4m3fn", + is_bias=False, + default_initializer=paddle.nn.initializer.Constant(0), + ) + layer.weight_scale_inv = layer.create_parameter( + shape=weight_block_scale_tensor.shape, + dtype="float32", + is_bias=False, + default_initializer=paddle.nn.initializer.Constant(0), + ) - layer.weight.copy_(quanted_weight_tensor, False) - layer.weight_scale_inv.copy_(weight_block_scale_tensor, False) + layer.weight.copy_(quanted_weight_tensor, False) + layer.weight_scale_inv.copy_(weight_block_scale_tensor, False) + + if self.quant_config.is_checkpoint_bf16: + if self.model_format == "torch": + process_weight_transpose(layer, "weight") + _process_quantize() + else: + if self.model_format != "torch": + process_weight_transpose(layer, "weight") + process_weight_transpose(layer, "weight_scale_inv") + else: + return def process_loaded_weights(self, layer, weights) -> None: weight_tensor = weights.transpose([1, 0]) diff --git a/fastdeploy/model_executor/layers/quantization/mix_quant.py b/fastdeploy/model_executor/layers/quantization/mix_quant.py index cb068a9919c..9bd29b02adc 100644 --- a/fastdeploy/model_executor/layers/quantization/mix_quant.py +++ b/fastdeploy/model_executor/layers/quantization/mix_quant.py @@ -55,7 +55,6 @@ def __init__( self.quant_round_type = 0 self.is_permuted = is_permuted self.is_checkpoint_bf16 = not is_quantized - self.is_quantized = is_quantized self.hadamard_block_size = hadamard_block_size def name(self) -> str: @@ -83,7 +82,7 @@ def get_quant_method(self, layer) -> Optional[QuantMethodBase]: .from_config( { "is_permuted": self.is_permuted, - "is_quantized": self.is_quantized, + "is_quantized": not self.is_checkpoint_bf16, "hadamard_block_size": self.hadamard_block_size, } ) @@ -95,7 +94,7 @@ def get_quant_method(self, layer) -> Optional[QuantMethodBase]: .from_config( { "is_permuted": self.is_permuted, - "is_quantized": self.is_quantized, + "is_quantized": not self.is_checkpoint_bf16, "hadamard_block_size": self.hadamard_block_size, } ) @@ -113,6 +112,6 @@ def get_quant_method(self, layer) -> Optional[QuantMethodBase]: else: return ( get_quantization_config(self.dense_quant_type) - .from_config({"is_quantized": self.is_quantized}) + .from_config({"is_quantized": not self.is_checkpoint_bf16}) .get_quant_method(layer) ) diff --git a/fastdeploy/model_executor/layers/quantization/weight_only.py b/fastdeploy/model_executor/layers/quantization/weight_only.py index 35a4eeb6db6..a248775cd43 100644 --- a/fastdeploy/model_executor/layers/quantization/weight_only.py +++ b/fastdeploy/model_executor/layers/quantization/weight_only.py @@ -28,7 +28,12 @@ MergedReplicatedLinear, QKVParallelLinear, ) -from fastdeploy.model_executor.utils import TensorTracker, free_tensor, set_weight_attrs +from fastdeploy.model_executor.utils import ( + TensorTracker, + free_tensor, + process_weight_transpose, + set_weight_attrs, +) from fastdeploy.platforms import current_platform if current_platform.is_xpu(): @@ -231,26 +236,33 @@ def __init__( def create_weights(self, layer, **extra_weight_attrs): # TODO(bukejiyu): remove v1 loader check when v0 loader is removed + self.model_format = extra_weight_attrs.get("model_format") if self.quant_config.is_checkpoint_bf16 and layer.fd_config.load_config.load_choices == "default_v1": + weight_shape = layer.weight_shape[::-1] if self.model_format == "torch" else layer.weight_shape layer.weight = layer.create_parameter( - shape=layer.weight_shape, + shape=weight_shape, dtype=layer.weight_dtype, is_bias=False, default_initializer=paddle.nn.initializer.Constant(0), ) - extra_weight_attrs["weight_need_transpose"] = extra_weight_attrs.get("model_format") == "torch" + quant_attrs = extra_weight_attrs + if ( isinstance(layer, MergedColumnParallelLinear) or isinstance(layer, QKVParallelLinear) or isinstance(layer, MergedReplicatedLinear) ): + # Only MergedReplicatedLinear uses the default outdim. + tensor_output_dim = (self.model_format == "torch") ^ quant_attrs.get("output_dim", True) quant_attrs = { - **extra_weight_attrs, - "tensor_track": TensorTracker( - shape=layer.weight_shape, output_dim=extra_weight_attrs.get("output_dim", True) - ), + **quant_attrs, + "tensor_track": TensorTracker(shape=weight_shape, output_dim=tensor_output_dim), } + + if self.model_format == "torch" and "output_dim" in quant_attrs: + quant_attrs["output_dim"] = not quant_attrs["output_dim"] + set_weight_attrs( layer.weight, quant_attrs, @@ -279,16 +291,11 @@ def create_weights(self, layer, **extra_weight_attrs): default_initializer=paddle.nn.initializer.Constant(0), ) - output_dim = extra_weight_attrs.get("output_dim") - output_dim = not output_dim - weight_loader = extra_weight_attrs.get("weight_loader") + if "output_dim" in extra_weight_attrs: + extra_weight_attrs["output_dim"] = not extra_weight_attrs["output_dim"] set_weight_attrs( layer.weight, - { - "weight_loader": weight_loader, - "output_dim": output_dim, - "weight_need_transpose": not extra_weight_attrs.get("model_format") == "torch", - }, + extra_weight_attrs, ) layer.weight_scale = layer.create_parameter( @@ -299,47 +306,49 @@ def create_weights(self, layer, **extra_weight_attrs): set_weight_attrs( layer.weight_scale, - { - "weight_loader": weight_loader, - "output_dim": output_dim, - }, + extra_weight_attrs, ) def process_weights_after_loading(self, layer) -> None: - if not self.quant_config.is_checkpoint_bf16: - return - if isinstance(self, MacheteWeightOnlyLinearMethod): - - # Using group scale for machete - quanted_weight_tensor, weight_scale_tensor = machete_quantize_and_pack( - w=layer.weight, - atype=layer._dtype, - quant_type="uint4b8" if self.quant_config.name() == "wint4" else "uint8b128", - group_size=self.quant_config.group_size, + def _process_quantize(): + if isinstance(self, MacheteWeightOnlyLinearMethod): + # Using group scale for machete + quanted_weight_tensor, weight_scale_tensor = machete_quantize_and_pack( + w=layer.weight, + atype=layer._dtype, + quant_type="uint4b8" if self.quant_config.name() == "wint4" else "uint8b128", + group_size=self.quant_config.group_size, + ) + else: + quanted_weight_tensor, weight_scale_tensor = weight_quantize( + layer.weight, + algo=self.quant_config.algo, + arch=self.quant_config.weight_only_linear_arch, + ) + + free_tensor(layer.weight) + + layer.weight = layer.create_parameter( + shape=quanted_weight_tensor.shape, + dtype="int8" if not isinstance(self, MacheteWeightOnlyLinearMethod) else "int32", + is_bias=False, + default_initializer=paddle.nn.initializer.Constant(0), ) - else: - quanted_weight_tensor, weight_scale_tensor = weight_quantize( - layer.weight, - algo=self.quant_config.algo, - arch=self.quant_config.weight_only_linear_arch, + layer.weight_scale = layer.create_parameter( + shape=weight_scale_tensor.shape, + dtype=layer._dtype, + is_bias=False, + default_initializer=paddle.nn.initializer.Constant(0), ) + layer.weight.copy_(quanted_weight_tensor, False) + layer.weight_scale.copy_(weight_scale_tensor, False) - free_tensor(layer.weight) - - layer.weight = layer.create_parameter( - shape=quanted_weight_tensor.shape, - dtype="int8" if not isinstance(self, MacheteWeightOnlyLinearMethod) else "int32", - is_bias=False, - default_initializer=paddle.nn.initializer.Constant(0), - ) - layer.weight_scale = layer.create_parameter( - shape=weight_scale_tensor.shape, - dtype=layer._dtype, - is_bias=False, - default_initializer=paddle.nn.initializer.Constant(0), - ) - layer.weight.copy_(quanted_weight_tensor, False) - layer.weight_scale.copy_(weight_scale_tensor, False) + if self.quant_config.is_checkpoint_bf16: + if self.model_format == "torch": + process_weight_transpose(layer, "weight") + _process_quantize() + else: + return @abstractmethod def process_loaded_weights(self, layer, weights) -> None: diff --git a/fastdeploy/model_executor/layers/quantization/wfp8afp8.py b/fastdeploy/model_executor/layers/quantization/wfp8afp8.py index 8ba0a0764ef..819700a477e 100644 --- a/fastdeploy/model_executor/layers/quantization/wfp8afp8.py +++ b/fastdeploy/model_executor/layers/quantization/wfp8afp8.py @@ -21,6 +21,7 @@ from fastdeploy.model_executor.layers.linear import ( MergedColumnParallelLinear, + MergedReplicatedLinear, QKVParallelLinear, ) from fastdeploy.model_executor.layers.moe import FusedMoE @@ -33,7 +34,11 @@ QuantMethodBase, ) from fastdeploy.model_executor.layers.utils import per_token_cast_to_fp8 -from fastdeploy.model_executor.utils import TensorTracker, set_weight_attrs +from fastdeploy.model_executor.utils import ( + TensorTracker, + process_weight_transpose, + set_weight_attrs, +) class WFP8AFP8Config(QuantConfigBase): @@ -101,22 +106,28 @@ def create_weights(self, layer, **extra_weight_attrs): (weight_shape[i] + weight_block_size[i] - 1) // weight_block_size[i] if weight_block_size[i] > 0 else 1 ) scale_shape = scale_shape[::-1] + self.model_format = extra_weight_attrs.get("model_format") if self.quant_config.is_checkpoint_bf16 and layer.fd_config.load_config.load_choices == "default_v1": + weight_shape = weight_shape[::-1] if self.model_format == "torch" else weight_shape layer.weight = layer.create_parameter( shape=weight_shape, dtype=layer.weight_dtype, is_bias=False, default_initializer=paddle.nn.initializer.Constant(0), ) - extra_weight_attrs["weight_need_transpose"] = extra_weight_attrs.get("model_format") == "torch" quant_attrs = extra_weight_attrs - if isinstance(layer, MergedColumnParallelLinear) or isinstance(layer, QKVParallelLinear): + if ( + isinstance(layer, MergedColumnParallelLinear) + or isinstance(layer, QKVParallelLinear) + or isinstance(layer, MergedReplicatedLinear) + ): + tensor_output_dim = (self.model_format == "torch") ^ quant_attrs.get("output_dim", True) quant_attrs = { **extra_weight_attrs, - "tensor_track": TensorTracker( - shape=layer.weight_shape, output_dim=extra_weight_attrs.get("output_dim") - ), + "tensor_track": TensorTracker(shape=weight_shape, output_dim=tensor_output_dim), } + if self.model_format == "torch" and "output_dim" in quant_attrs: + quant_attrs["output_dim"] = not quant_attrs["output_dim"] set_weight_attrs( layer.weight, quant_attrs, @@ -142,30 +153,39 @@ def create_weights(self, layer, **extra_weight_attrs): def process_weights_after_loading(self, layer) -> None: if not self.quant_config.is_checkpoint_bf16: return - weight_tensor = layer.weight.transpose([1, 0]).contiguous() - assert self.quant_config.weight_block_size == [-1, 1] - qweight, weight_scale = per_token_cast_to_fp8(weight_tensor) - - if hasattr(layer.weight, "tensor_track"): - layer.weight.tensor_track = None - layer.weight.value().get_tensor()._clear() - del layer.weight - - layer.weight = layer.create_parameter( - shape=qweight.shape, - dtype="float8_e4m3fn", - is_bias=False, - default_initializer=paddle.nn.initializer.Constant(0), - ) - layer.weight_scale = layer.create_parameter( - shape=weight_scale.shape, - dtype="float32", - is_bias=False, - default_initializer=paddle.nn.initializer.Constant(0), - ) - layer.weight.copy_(qweight, False) - layer.weight_scale.copy_(weight_scale, False) + def _process_quantize(): + weight_tensor = layer.weight.transpose([1, 0]).contiguous() + assert self.quant_config.weight_block_size == [-1, 1] + qweight, weight_scale = per_token_cast_to_fp8(weight_tensor) + + if hasattr(layer.weight, "tensor_track"): + layer.weight.tensor_track = None + layer.weight.value().get_tensor()._clear() + del layer.weight + + layer.weight = layer.create_parameter( + shape=qweight.shape, + dtype="float8_e4m3fn", + is_bias=False, + default_initializer=paddle.nn.initializer.Constant(0), + ) + layer.weight_scale = layer.create_parameter( + shape=weight_scale.shape, + dtype="float32", + is_bias=False, + default_initializer=paddle.nn.initializer.Constant(0), + ) + + layer.weight.copy_(qweight, False) + layer.weight_scale.copy_(weight_scale, False) + + if self.quant_config.is_checkpoint_bf16: + if self.model_format == "torch": + process_weight_transpose(layer, "weight") + _process_quantize() + else: + return def process_loaded_weights(self, layer, weights) -> None: """ """ diff --git a/fastdeploy/model_executor/load_weight_utils.py b/fastdeploy/model_executor/load_weight_utils.py index 88bffcc3482..e1b502be66a 100644 --- a/fastdeploy/model_executor/load_weight_utils.py +++ b/fastdeploy/model_executor/load_weight_utils.py @@ -41,7 +41,7 @@ from fastdeploy.model_executor.models.tp_utils import ( check_tensor_parallel_prerequisites, ) -from fastdeploy.model_executor.utils import switch_config_context +from fastdeploy.model_executor.utils import multi_switch_config_context from fastdeploy.platforms import current_platform @@ -58,10 +58,14 @@ def pdparams_weight_iterator(paddle_file_list: list[str]): def load_weights_from_cache(model, weights_iterator): params_dict = dict(model.named_parameters()) for loaded_weight_name, loaded_weight in weights_iterator: + if loaded_weight_name not in params_dict: + logger.info(f"{loaded_weight_name} is not in model parameters.") + continue param = params_dict[loaded_weight_name] param.copy_(loaded_weight, False) if "embeddings" in loaded_weight_name and getattr(model, "tie_word_embeddings", False): - model.lm_head.load_state_dict({model.lm_head.weight_key: loaded_weight}) + model.lm_head.linear.weight.set_value(loaded_weight) + model.lm_head.process_weights_after_loading() for _, model_sublayer in model.named_sublayers(): if isinstance(model_sublayer, KVBatchLinear): model_sublayer.process_weights_after_loading() @@ -70,17 +74,18 @@ def load_weights_from_cache(model, weights_iterator): def get_weight_iterator(model_path: str): _, files_list, use_safetensors = get_all_weights_file(model_path) if use_safetensors: - weights_iterator = fast_weights_iterator(files_list) + weights_iterator = safetensors_weights_iterator(files_list) else: weights_iterator = pdparams_weight_iterator(files_list) return weights_iterator def is_weight_cache_enabled(fd_config, weight_cache_path=".cache"): + weight_cache_context = contextlib.nullcontext() weight_cache_dir = None enable_cache = False - if envs.FD_ENABLE_MODEL_LOAD_CACHE: + if envs.FD_ENABLE_MODEL_LOAD_CACHE and fd_config.quant_config is not None: model_weight_cache_path = os.path.join(fd_config.model_config.model, weight_cache_path) # model_type + quantization + tp_size + ep_size weight_cache_key = "_".join( @@ -99,7 +104,11 @@ def is_weight_cache_enabled(fd_config, weight_cache_path=".cache"): f"Loading will prioritize cached models. Users are responsible for ensuring the saved model is correct. If any error occurs, deleting the cache at {weight_cache_dir} may resolve it." ) enable_cache = True - weight_cache_context = switch_config_context(fd_config.quant_config, "is_quantized", True) + + weight_cache_context = multi_switch_config_context( + (fd_config.quant_config, "is_checkpoint_bf16", False), + (fd_config.model_config, "model_format", "paddle"), + ) return enable_cache, weight_cache_dir, weight_cache_context @@ -127,32 +136,34 @@ def wrapper(*args, **kwargs): tp_weight_cache_dir = os.path.join( weight_cache_dir, f"rank{str(fd_config.parallel_config.tensor_parallel_rank)}" ) - context = switch_config_context(fd_config.model_config, "model", tp_weight_cache_dir) + context = multi_switch_config_context((fd_config.model_config, "model", tp_weight_cache_dir)) else: context = contextlib.nullcontext() with context: result = func(*args, **kwargs) - if ( - envs.FD_ENABLE_MODEL_LOAD_CACHE - and weight_cache_dir is not None - and not os.path.exists(weight_cache_dir) - ): - assert fd_config.quant_config is not None and getattr( - fd_config.quant_config, "is_checkpoint_bf16", False - ), "Save cache only for dynamic quantization" + + if envs.FD_ENABLE_MODEL_LOAD_CACHE: + if not ( + fd_config.quant_config is not None and getattr(fd_config.quant_config, "is_checkpoint_bf16", False) + ): + # Save cache only for dynamic quantization + return result + if weight_cache_dir is None: + return result tp_weight_cache_dir = os.path.join( weight_cache_dir, f"rank{str(fd_config.parallel_config.tensor_parallel_rank)}" ) - logger.info(f"Saving model to {tp_weight_cache_dir}") - os.makedirs( - tp_weight_cache_dir, - exist_ok=True, - ) - _save_model(model.state_dict(), os.path.join(tp_weight_cache_dir, "cache.pdparams")) - else: - reason = "weights already cached" if envs.FD_ENABLE_MODEL_LOAD_CACHE else "cache disabled" - logger.info(f"Skip saving ,{reason}") + if not os.path.exists(tp_weight_cache_dir): + logger.info(f"Saving model to {tp_weight_cache_dir}") + os.makedirs( + tp_weight_cache_dir, + exist_ok=True, + ) + _save_model(model.state_dict(), os.path.join(tp_weight_cache_dir, "cache.pdparams")) + else: + reason = "weights already cached" if envs.FD_ENABLE_MODEL_LOAD_CACHE else "cache disabled" + logger.info(f"Skip saving ,{reason}") return result return wrapper @@ -310,7 +321,7 @@ def safetensors_weights_iterator(safe_tensor_list: list[str]): safe_tensor_list, desc="Loading safetensors checkpoint shards", ): - with safe_open(st_file, framework="np") as f: + with safe_open(st_file, framework="paddle", device="cpu") as f: for name in f.keys(): param = f.get_tensor(name) yield name, param @@ -377,7 +388,7 @@ def load_pre_sharded_checkpoint(model_path: str, local_rank: int, use_fastsafete _, safetensor_files, _ = get_all_weights_file(os.path.join(model_path, f"rank{local_rank}")) weights_iterator = safetensors_weights_iterator(safetensor_files) for name, weight in weights_iterator: - state_dict[name] = weight + state_dict[name] = weight.clone() return state_dict diff --git a/fastdeploy/model_executor/model_loader/default_loader_v1.py b/fastdeploy/model_executor/model_loader/default_loader_v1.py index 0193b259d04..d688e1dde80 100644 --- a/fastdeploy/model_executor/model_loader/default_loader_v1.py +++ b/fastdeploy/model_executor/model_loader/default_loader_v1.py @@ -29,6 +29,7 @@ from fastdeploy.model_executor.model_loader.base_loader import BaseModelLoader from fastdeploy.model_executor.models.adapters import as_embedding_model from fastdeploy.model_executor.models.model_base import ModelRegistry +from fastdeploy.model_executor.utils import process_final_after_loading from fastdeploy.platforms import current_platform @@ -55,6 +56,8 @@ def load_weights(self, model, fd_config: FDConfig, enable_cache: bool = False) - load_weights_from_cache(model, weights_iterator) else: model.load_weights(weights_iterator) + if fd_config.speculative_config.model_type != "mtp": + process_final_after_loading(model, fd_config) self.clean_memory_fragments() diff --git a/fastdeploy/model_executor/models/deepseek_v3.py b/fastdeploy/model_executor/models/deepseek_v3.py index 691c05bda7a..07aaa28a19c 100644 --- a/fastdeploy/model_executor/models/deepseek_v3.py +++ b/fastdeploy/model_executor/models/deepseek_v3.py @@ -671,7 +671,7 @@ def load_weights(self, weights_iterator) -> None: param_down_proj_name="experts.down_proj_", ) params_dict = dict(self.named_parameters()) - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config) for loaded_weight_name, loaded_weight in weights_iterator: loaded_weight_name = loaded_weight_name.replace("deepseek_v3", "model") diff --git a/fastdeploy/model_executor/models/ernie4_5_moe.py b/fastdeploy/model_executor/models/ernie4_5_moe.py index 2bbc5845a90..7f38551609f 100644 --- a/fastdeploy/model_executor/models/ernie4_5_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_moe.py @@ -562,7 +562,9 @@ def load_weights(self, weights_iterator) -> None: ) params_dict = dict(self.named_parameters()) - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading( + dict(self.named_sublayers()), fd_config=self.fd_config + ) for loaded_weight_name, loaded_weight in weights_iterator: loaded_weight_name = loaded_weight_name.replace("model", "ernie") @@ -598,7 +600,7 @@ def load_weights(self, weights_iterator) -> None: process_weights_after_loading_fn(model_sublayer_name, param) if self.tie_word_embeddings: - self.lm_head.load_state_dict({self.lm_head.weight_key: self.ernie.embed_tokens.embeddings.weight}) + self.lm_head.linear.weight.set_value(self.ernie.embed_tokens.embeddings.weight) def compute_logits(self, hidden_states: paddle.Tensor): logits = self.lm_head(hidden_states) diff --git a/fastdeploy/model_executor/models/ernie4_5_mtp.py b/fastdeploy/model_executor/models/ernie4_5_mtp.py index 70a57660d6a..0aedb040062 100644 --- a/fastdeploy/model_executor/models/ernie4_5_mtp.py +++ b/fastdeploy/model_executor/models/ernie4_5_mtp.py @@ -403,7 +403,7 @@ def load_weights(self, weights_iterator) -> None: params_dict = dict(self.named_parameters()) shard_id = None - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config) for loaded_weight_name, loaded_weight in weights_iterator: for param_name, weight_name, exp_id, shard_id in all_param_mapping: if weight_name not in loaded_weight_name: diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py b/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py index d2ecec40e6e..2d8c53b2218 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/dfnrope/modeling.py @@ -34,7 +34,7 @@ from paddleformers.transformers.model_utils import PretrainedModel from fastdeploy.model_executor.layers.utils import divide, get_tensor -from fastdeploy.model_executor.utils import set_weight_attrs +from fastdeploy.model_executor.utils import fd_cast, h2d_copy, set_weight_attrs from fastdeploy.platforms import current_platform from .activation import ACT2FN @@ -210,7 +210,7 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N if load_bias: head_dim = self.hidden_size // self.num_heads shard_weight = loaded_weight[...].reshape([3, self.num_heads, head_dim]) - shard_weight = np.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] + shard_weight = paddle.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] shard_weight = shard_weight.reshape([-1]) else: shard_weight = loaded_weight[...].reshape( @@ -221,13 +221,14 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N self.head_dim, ] ) - shard_weight = np.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] + shard_weight = paddle.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] shard_weight = shard_weight.reshape([self.hidden_size, -1]) shard_weight = get_tensor(shard_weight) + shard_weight = fd_cast(shard_weight, param) assert param.shape == shard_weight.shape, ( f" Attempted to load weight ({shard_weight.shape}) " f"into parameter ({param.shape})" ) - param.copy_(shard_weight, False) + h2d_copy(param, shard_weight) def forward( self, diff --git a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py index 689f75964b5..be774413103 100644 --- a/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py +++ b/fastdeploy/model_executor/models/ernie4_5_vl/ernie4_5_vl_moe.py @@ -166,13 +166,7 @@ def __init__( skip_quant=True, weight_dtype="float32", weight_key="weight" if moe_tag == "Text" else "weight_1", - ) - - # TODO(hehongyu): remove this after fix model network - setattr( - self.gate.weight, - "weight_need_transpose", - False, + model_format="", ) def forward(self, hidden_states: paddle.Tensor): @@ -683,7 +677,7 @@ def load_weights(self, weights_iterator) -> None: all_param_mapping = general_params_mapping + text_expert_params_mapping + image_expert_params_mapping params_dict = dict(self.named_parameters()) - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config) expert_id = None shard_id = None for loaded_weight_name, loaded_weight in weights_iterator: @@ -724,10 +718,7 @@ def load_weights(self, weights_iterator) -> None: ) process_weights_after_loading_fn(model_sublayer_name, param) if self.tie_word_embeddings: - # because we use lazy guard and is not initialized by default - if not self.lm_head.linear.weight._is_initialized(): - self.lm_head.linear.weight.initialize() - self.lm_head.load_state_dict({self.lm_head.weight_key: self.ernie.embed_tokens.embeddings.weight}) + self.lm_head.linear.weight.set_value(self.ernie.embed_tokens.embeddings.weight) @paddle.no_grad() def set_state_dict(self, state_dict: Dict[str, Union[np.ndarray, paddle.Tensor]]): diff --git a/fastdeploy/model_executor/models/glm4_moe.py b/fastdeploy/model_executor/models/glm4_moe.py index 8df18447f42..8850ce81243 100644 --- a/fastdeploy/model_executor/models/glm4_moe.py +++ b/fastdeploy/model_executor/models/glm4_moe.py @@ -442,7 +442,7 @@ def load_weights(self, weights_iterator) -> None: param_down_proj_name="experts.down_proj_", ) params_dict = dict(self.named_parameters()) - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config) for loaded_weight_name, loaded_weight in weights_iterator: for param_name, weight_name, shard_id in stacked_params_mapping: if weight_name not in loaded_weight_name: diff --git a/fastdeploy/model_executor/models/gpt_oss.py b/fastdeploy/model_executor/models/gpt_oss.py index be2c714c020..e951fff92f5 100644 --- a/fastdeploy/model_executor/models/gpt_oss.py +++ b/fastdeploy/model_executor/models/gpt_oss.py @@ -121,6 +121,7 @@ def __init__(self, fd_config: FDConfig, layer_id: int, prefix: str = ""): weight_key_map=weight_key_map, with_bias=True, activation="swigluoai", + model_format="", ) def forward(self, hidden_states: paddle.Tensor): @@ -270,7 +271,7 @@ def load_weights(self, weights_iterator) -> None: ("down_proj_bias", "down_proj_bias", None, None), ] params_dict = dict(self.named_parameters()) - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config) for loaded_weight_name, loaded_weight in weights_iterator: for param_name, weight_name, shard_id in stacked_params_mapping: if weight_name not in loaded_weight_name: diff --git a/fastdeploy/model_executor/models/paddleocr_vl/paddleocr_vl.py b/fastdeploy/model_executor/models/paddleocr_vl/paddleocr_vl.py index 225f64f4319..13afbe3c985 100644 --- a/fastdeploy/model_executor/models/paddleocr_vl/paddleocr_vl.py +++ b/fastdeploy/model_executor/models/paddleocr_vl/paddleocr_vl.py @@ -158,7 +158,7 @@ def load_weights(self, weights_iterator) -> None: ] params_dict = dict(self.named_parameters()) - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config) for loaded_weight_name, loaded_weight in weights_iterator: loaded_weight_name = ( self.process_weights_before_loading_fn(loaded_weight_name) diff --git a/fastdeploy/model_executor/models/paddleocr_vl/projector.py b/fastdeploy/model_executor/models/paddleocr_vl/projector.py index 7f1f4be1260..f1b5ef60928 100644 --- a/fastdeploy/model_executor/models/paddleocr_vl/projector.py +++ b/fastdeploy/model_executor/models/paddleocr_vl/projector.py @@ -20,8 +20,6 @@ import paddle import paddle.nn as nn -from fastdeploy.model_executor.layers.utils import get_tensor - class GELUActivation(nn.Layer): """ @@ -98,7 +96,6 @@ def forward(self, image_features, image_grid_thw): return hidden_states def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = None): - loaded_weight = get_tensor(loaded_weight) loaded_weight = loaded_weight.transpose([1, 0]) assert param.shape == loaded_weight.shape, ( f" Attempted to load weight ({loaded_weight.shape}) " f"into parameter ({param.shape})" diff --git a/fastdeploy/model_executor/models/paddleocr_vl/siglip.py b/fastdeploy/model_executor/models/paddleocr_vl/siglip.py index 9bc93da4bf9..0bb256cd51f 100644 --- a/fastdeploy/model_executor/models/paddleocr_vl/siglip.py +++ b/fastdeploy/model_executor/models/paddleocr_vl/siglip.py @@ -23,8 +23,7 @@ import paddle.nn.functional as F from paddleformers.transformers.model_utils import PretrainedModel -from fastdeploy.model_executor.layers.utils import get_tensor -from fastdeploy.model_executor.utils import slice_fn +from fastdeploy.model_executor.utils import h2d_copy, slice_fn from .config import PaddleOCRVisionConfig from .siglip_ops import get_activation_fn, neox_rope_embedding @@ -71,7 +70,6 @@ def __init__(self, config): def qkv_weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = None): # Tensor parallelism splits the weight along the output_dim - loaded_weight = get_tensor(loaded_weight) if loaded_weight.dim() == 2: loaded_weight = loaded_weight.transpose([1, 0]) @@ -98,10 +96,9 @@ def qkv_weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] loaded_weight = loaded_weight.view(param.dtype) else: loaded_weight = loaded_weight.cast(param.dtype) - param.copy_(loaded_weight, False) + h2d_copy(param, loaded_weight) def out_proj_weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = None): - loaded_weight = get_tensor(loaded_weight) loaded_weight = loaded_weight.transpose([1, 0]) assert param.shape == loaded_weight.shape, ( f" Attempted to load weight ({loaded_weight.shape}) " f"into parameter ({param.shape})" @@ -289,7 +286,6 @@ def __init__(self, config): self.fc2.weight.weight_loader = self.weight_loader def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = None): - loaded_weight = get_tensor(loaded_weight) loaded_weight = loaded_weight.transpose([1, 0]) assert param.shape == loaded_weight.shape, ( f" Attempted to load weight ({loaded_weight.shape}) " f"into parameter ({param.shape})" diff --git a/fastdeploy/model_executor/models/qwen2.py b/fastdeploy/model_executor/models/qwen2.py index 7dd6dd040ae..69b010c0bf0 100644 --- a/fastdeploy/model_executor/models/qwen2.py +++ b/fastdeploy/model_executor/models/qwen2.py @@ -348,7 +348,7 @@ def load_weights(self, weights_iterator) -> None: ] params_dict = dict(self.named_parameters()) - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config) for loaded_weight_name, loaded_weight in weights_iterator: loaded_weight_name = ( self.process_weights_before_loading_fn(loaded_weight_name) @@ -377,7 +377,7 @@ def load_weights(self, weights_iterator) -> None: model_sublayer_name = re.sub(r"\.(weight)$", "", model_param_name) process_weights_after_loading_fn(model_sublayer_name, param) if self.tie_word_embeddings: - self.lm_head.load_state_dict({self.lm_head.weight_key: self.qwen2.embed_tokens.embeddings.weight}) + self.lm_head.linear.weight.set_value(self.qwen2.embed_tokens.embeddings.weight) @classmethod def name(self): diff --git a/fastdeploy/model_executor/models/qwen2_5_vl/dfnrope/modeling.py b/fastdeploy/model_executor/models/qwen2_5_vl/dfnrope/modeling.py index b0d4a0282d6..8f7b262432f 100644 --- a/fastdeploy/model_executor/models/qwen2_5_vl/dfnrope/modeling.py +++ b/fastdeploy/model_executor/models/qwen2_5_vl/dfnrope/modeling.py @@ -32,7 +32,7 @@ from paddleformers.transformers.model_utils import PretrainedModel from fastdeploy.model_executor.layers.utils import divide, get_tensor -from fastdeploy.model_executor.utils import set_weight_attrs +from fastdeploy.model_executor.utils import fd_cast, h2d_copy, set_weight_attrs from .activation import ACT2FN from .configuration import DFNRopeVisionTransformerConfig @@ -134,7 +134,7 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N if load_bias: head_dim = self.hidden_size // self.num_heads shard_weight = loaded_weight[...].reshape([3, self.num_heads, head_dim]) - shard_weight = np.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] + shard_weight = paddle.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] shard_weight = shard_weight.reshape([-1]) else: shard_weight = loaded_weight[...].reshape( @@ -145,13 +145,13 @@ def weight_loader(self, param, loaded_weight, loaded_shard_id: Optional[str] = N self.head_dim, ] ) - shard_weight = np.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] + shard_weight = paddle.split(shard_weight, self.tensor_parallel_degree, axis=-2)[self.tensor_parallel_rank] shard_weight = shard_weight.reshape([self.hidden_size, -1]) - shard_weight = get_tensor(shard_weight) + shard_weight = fd_cast(shard_weight, param) assert param.shape == shard_weight.shape, ( f" Attempted to load weight ({shard_weight.shape}) " f"into parameter ({param.shape})" ) - param.copy_(shard_weight, False) + h2d_copy(param, shard_weight) def forward( self, diff --git a/fastdeploy/model_executor/models/qwen2_5_vl/qwen2_5_vl.py b/fastdeploy/model_executor/models/qwen2_5_vl/qwen2_5_vl.py index 91237cd5ffc..53f5c766ec6 100644 --- a/fastdeploy/model_executor/models/qwen2_5_vl/qwen2_5_vl.py +++ b/fastdeploy/model_executor/models/qwen2_5_vl/qwen2_5_vl.py @@ -208,7 +208,7 @@ def load_weights(self, weights_iterator) -> None: ] params_dict = dict(self.named_parameters()) - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config) for loaded_weight_name, loaded_weight in weights_iterator: for param_name, weight_name, shard_id in stacked_params_mapping: if weight_name not in loaded_weight_name: @@ -231,10 +231,7 @@ def load_weights(self, weights_iterator) -> None: process_weights_after_loading_fn(model_sublayer_name, param) if self.tie_word_embeddings: - # because we use lazy guard and is not initialized by default - if not self.lm_head.linear.weight._is_initialized(): - self.lm_head.linear.weight.initialize() - self.lm_head.load_state_dict({self.lm_head.weight_key: self.model.embed_tokens.embeddings.weight}) + self.lm_head.linear.weight.set_value(self.ernie.embed_tokens.embeddings.weight) @paddle.no_grad() def set_state_dict(self, state_dict: Dict[str, Union[np.ndarray, paddle.Tensor]]): diff --git a/fastdeploy/model_executor/models/qwen3.py b/fastdeploy/model_executor/models/qwen3.py index 8e1816a0ea2..8b1004d7622 100644 --- a/fastdeploy/model_executor/models/qwen3.py +++ b/fastdeploy/model_executor/models/qwen3.py @@ -292,7 +292,7 @@ def load_weights(self, weights_iterator) -> None: for param_name, param in params_dict.items() } - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config) for loaded_weight_name, loaded_weight in weights_iterator: for param_name, weight_name, shard_id in stacked_params_mapping: @@ -320,7 +320,7 @@ def load_weights(self, weights_iterator) -> None: process_weights_after_loading_fn(model_sublayer_name, param) if self.tie_word_embeddings and not is_pooling_model: - self.lm_head.load_state_dict({self.lm_head.weight_key: self.model.embed_tokens.embeddings.weight}) + self.lm_head.linear.weight.set_value(self.model.embed_tokens.embeddings.weight) @paddle.no_grad() def set_state_dict(self, state_dict): diff --git a/fastdeploy/model_executor/models/qwen3moe.py b/fastdeploy/model_executor/models/qwen3moe.py index 64b813eb589..9537b84f22c 100644 --- a/fastdeploy/model_executor/models/qwen3moe.py +++ b/fastdeploy/model_executor/models/qwen3moe.py @@ -360,7 +360,7 @@ def load_weights(self, weights_iterator) -> None: ] expert_params_mapping = self.get_expert_mapping() params_dict = dict(self.named_parameters()) - process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers())) + process_weights_after_loading_fn = process_weights_after_loading(dict(self.named_sublayers()), self.fd_config) for loaded_weight_name, loaded_weight in weights_iterator: for param_name, weight_name, shard_id in stacked_params_mapping: if weight_name not in loaded_weight_name: diff --git a/fastdeploy/model_executor/utils.py b/fastdeploy/model_executor/utils.py index 40af040e9a7..5e7c734f723 100644 --- a/fastdeploy/model_executor/utils.py +++ b/fastdeploy/model_executor/utils.py @@ -128,13 +128,35 @@ def slice_fn(weight_or_paramter, output_dim, start, end, step=1): return weight_or_paramter -def process_weights_after_loading(sublayers_dict: dict): +def process_weight_transpose(layer, weight_name): + weight = getattr(layer, weight_name) + if len(weight.shape) == 2: + weight_transpose = weight.transpose([1, 0]) + elif len(weight.shape) == 3: + weight_transpose = weight.transpose([0, 2, 1]) + + weight_tmp = layer.create_parameter( + shape=weight_transpose.shape, + dtype=weight_transpose.dtype, + default_initializer=paddle.nn.initializer.Constant(0), + is_bias=False, + ) + weight_tmp.copy_(weight_transpose, False) + free_tensor(weight) + setattr(layer, weight_name, weight_tmp) + + +def process_weights_after_loading(sublayers_dict: dict, fd_config: FDConfig): """ - process_weights_after_loading: e.g., handle extracted weights (quantization, reshaping, etc.) + process_weights_after_loading: """ def fn(model_sublayer_name: str, param=None): - from fastdeploy.model_executor.layers.linear import KVBatchLinear + from fastdeploy.model_executor.layers.linear import ( + KVBatchLinear, + UnquantizedLinearMethod, + ) + from fastdeploy.model_executor.layers.moe.moe import get_moe_method if model_sublayer_name not in sublayers_dict: return @@ -143,6 +165,10 @@ def fn(model_sublayer_name: str, param=None): model_sublayer.process_weights_after_loading() if hasattr(model_sublayer, "quant_method"): quant_method = getattr(model_sublayer, "quant_method", None) + unquant_moe_cls = type(get_moe_method()) + if type(quant_method) is UnquantizedLinearMethod or type(quant_method) is unquant_moe_cls: + # skip unquantized linear + return if not hasattr(quant_method, "process_weights_after_loading"): return if param is not None and hasattr(param, "tensor_track") and param.tensor_track is None: @@ -184,6 +210,36 @@ def fn(weight_name): return fn +def weight_fully_copied(weight): + return ( + hasattr(weight, "tensor_track") and weight.tensor_track is not None and weight.tensor_track.is_fully_copied() + ) + + +def process_final_after_loading(model, fd_config: FDConfig): + # process_final_after_loading handles the post-loading process for cases other than dynamic quantization. + from fastdeploy.model_executor.layers.linear import ( + KVBatchLinear, + UnquantizedLinearMethod, + ) + from fastdeploy.model_executor.layers.moe.moe import get_moe_method + + for name, sublayer in model.named_sublayers(): + quant_method = getattr(sublayer, "quant_method", None) + if quant_method is not None: + unquant_moe_cls = type(get_moe_method()) + if not (type(quant_method) is UnquantizedLinearMethod or type(quant_method) is unquant_moe_cls): + continue + if hasattr(quant_method, "process_weights_after_loading"): + quant_method.process_weights_after_loading(sublayer) + if isinstance(sublayer, KVBatchLinear): + continue + if not hasattr(sublayer, "process_weights_after_loading"): + continue + # Only for specific layers, such as lmhead + sublayer.process_weights_after_loading() + + def free_tensor(tensor): if hasattr(tensor, "tensor_track"): tensor.tensor_track = None @@ -191,6 +247,15 @@ def free_tensor(tensor): del tensor +def fd_cast(weight, param): + if weight.dtype != param.dtype: + if weight.dtype == paddle.int8 and param.dtype == paddle.float8_e4m3fn: + weight = weight.view(param.dtype) + else: + weight = weight.cast(param.dtype) + return weight + + def default_weight_loader(fd_config: FDConfig = None) -> None: """Default weight loader""" @@ -200,7 +265,6 @@ def fn(param, loaded_weight, shard_id: Optional[Union[int, str]] = None): output_dim = getattr(param, "output_dim", None) weight_need_transpose = getattr(param, "weight_need_transpose", False) if weight_need_transpose: - loaded_weight = get_tensor(loaded_weight) loaded_weight = loaded_weight.transpose([1, 0]) # Tensor parallelism splits the weight along the output_dim if output_dim is not None and fd_config is not None and fd_config.parallel_config.tensor_parallel_size > 1: @@ -214,20 +278,15 @@ def fn(param, loaded_weight, shard_id: Optional[Union[int, str]] = None): shard_size = (fd_config.parallel_config.tensor_parallel_rank + 1) * block_size loaded_weight = slice_fn(loaded_weight, output_dim, shard_offset, shard_size) - loaded_weight = get_tensor(loaded_weight) # mlp.gate.weight is precision-sensitive, so we cast it to float32 for computation - if param.dtype != loaded_weight.dtype: - if loaded_weight.dtype == paddle.int8 and param.dtype == paddle.float8_e4m3fn: - loaded_weight = loaded_weight.view(param.dtype) - else: - loaded_weight = loaded_weight.cast(param.dtype) + loaded_weight = fd_cast(loaded_weight, param) if param.shape != loaded_weight.shape: # for e_score_correction_bias loaded_weight = loaded_weight.reshape(param.shape) assert param.shape == loaded_weight.shape, ( f" Attempted to load weight ({loaded_weight.shape}) " f"into parameter ({param.shape})" ) - param.copy_(loaded_weight, False) + h2d_copy(dst=param, src=loaded_weight) return fn @@ -255,10 +314,44 @@ def is_paddle_support_v1_loader(): return is_same +_support_new_h2d = None + + +def is_paddle_support_new_h2d(): + import subprocess + import sys + + global _support_new_h2d + if _support_new_h2d is not None: + return _support_new_h2d + + code = """ +import paddle +try: + dst = paddle.zeros([2, 4], dtype='bfloat16') + src = paddle.ones([2, 2], dtype='bfloat16', device='cpu') + dst = dst[..., :2] + dst.copy_(src) + print(1) +except: + print(0) +""" + result = subprocess.run([sys.executable, "-c", code], capture_output=True) + _support_new_h2d = result.stdout.strip() == b"1" + return _support_new_h2d + + +def h2d_copy(dst, src, blocking=True): + if not current_platform.is_cuda() or not is_paddle_support_new_h2d(): + # For non-GPU devices, data is transferred to device (H2D) in advance. + src = get_tensor(src) + if not dst._is_initialized(): + dst.initialize() + dst.copy_(src, blocking) + + def v1_loader_support(fd_config): - _v1_no_support_archs = [ - "Qwen2VLForConditionalGeneration", - ] + _v1_no_support_archs = ["Qwen2VLForConditionalGeneration"] def _err_msg(msg: str) -> str: logger.info(msg + "; fallback to the v0 loader for model loading.") @@ -310,14 +403,20 @@ def temporary_dtype(dtype: str): @contextmanager -def switch_config_context(config_obj, config_attr_name, value): - """switch_config_context""" - origin_value = getattr(config_obj, config_attr_name) - setattr(config_obj, config_attr_name, value) +def multi_switch_config_context(*changes): + """ + changes: (obj, attr, new_value) + """ + originals = [] try: + for obj, attr, new_value in changes: + old_value = getattr(obj, attr) + originals.append((obj, attr, old_value)) + setattr(obj, attr, new_value) yield finally: - setattr(config_obj, config_attr_name, origin_value) + for obj, attr, old_value in originals: + setattr(obj, attr, old_value) def rename_offline_ckpt_suffix_to_fd_suffix( diff --git a/requirements.txt b/requirements.txt index ab726f74511..1abb05ec055 100644 --- a/requirements.txt +++ b/requirements.txt @@ -32,6 +32,7 @@ fastsafetensors==0.1.14 msgpack gunicorn modelscope +safetensors==0.7.0rc0 opentelemetry-api>=1.24.0 opentelemetry-sdk>=1.24.0 opentelemetry-instrumentation-redis diff --git a/tests/model_loader/test_load_mtp.py b/tests/model_loader/test_load_mtp.py index 59438166f32..aa3d47dc115 100644 --- a/tests/model_loader/test_load_mtp.py +++ b/tests/model_loader/test_load_mtp.py @@ -53,8 +53,8 @@ def setUp(self): def test_load_weights_normal_case(self): weights_iterator = [ - ("ernie.embed_tokens.weight", np.random.rand(32000, 768).astype("float32")), - ("ernie.mtp_block.0.self_attn.qkv_proj.weight", np.random.rand(768, 768 * 3).astype("float32")), + ("ernie.embed_tokens.weight", paddle.rand([32000, 768], dtype="float32")), + ("ernie.mtp_block.0.self_attn.qkv_proj.weight", paddle.rand([768, 768 * 3], dtype="float32")), ] for k, v in self.model.named_parameters(): print("{}".format(k)) @@ -65,8 +65,8 @@ def test_load_weights_normal_case(self): def test_load_weights_with_unexpected_keys(self): weights_iterator = [ - ("unknown_key", np.random.rand(10, 10).astype("float32")), - ("ernie.embed_tokens.weight", np.random.rand(32000, 768).astype("float32")), + ("unknown_key", paddle.rand([10, 10], dtype="float32")), + ("ernie.embed_tokens.weight", paddle.rand([32000, 768], dtype="float32")), ] self.model.load_weights(iter(weights_iterator))