diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index c5a5d82ae0d..9de096d012c 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -192,6 +192,9 @@ "FD_XPU_ENABLE_MIXED_EP_MODE": lambda: bool(int(os.getenv("FD_XPU_ENABLE_MIXED_EP_MODE", "0"))), # Whether to use phi FP8 quantization,if 1,use paddle default. "FD_USE_PHI_FP8_QUANT": lambda: bool(int(os.getenv("FD_USE_PHI_FP8_QUANT", "1"))), + # Enables the Paddle/phi combined TopK operator only when topk_method == noaux_tc, + # intended for training alignment. Defaults to 0 (disabled). + "FD_USE_PHI_MOE_TOPK": lambda: bool(int(os.getenv("FD_USE_PHI_MOE_TOPK", "0"))), # Whether to use phi MOE permute,if 1,use paddle op. "FD_USE_PHI_MOE_PERMUTE": lambda: bool(int(os.getenv("FD_USE_PHI_MOE_PERMUTE", "0"))), # Control class SiluAndMul to use swiglu or fusid_bias_act operator in the forward_cuda function @@ -218,6 +221,8 @@ # has been observed to cause NaN computation errors. # Set to 1 to enable the lock; defaults to 0 (disabled). "FD_USE_KVCACHE_LOCK": lambda: bool(int(os.getenv("FD_USE_KVCACHE_LOCK", "0"))), + # Whether to probe MoE routing probabilities and use Fleet's fused SwiGLU kernel. + "FD_MOE_PROB_IN_ADVANCE": lambda: bool(int(os.getenv("FD_MOE_PROB_IN_ADVANCE", "0"))), # Suspend rollouting routing replay "FD_SUSPEND_ROUTING_REPLAY": lambda: bool(int(os.getenv("FD_SUSPEND_ROUTING_REPLAY", "0"))), } diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py index 264086f0d10..fb03afefb70 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py @@ -22,7 +22,10 @@ import fastdeploy from fastdeploy.model_executor.layers.moe.ep import deep_ep -from fastdeploy.model_executor.layers.quantization.fp8_utils import deep_gemm +from fastdeploy.model_executor.layers.quantization.fp8_utils import ( + deep_gemm, + paddlefleet_ops, +) from fastdeploy.model_executor.layers.utils import get_tensor from fastdeploy.model_executor.ops.gpu import count_tokens_per_expert_func from fastdeploy.platforms import current_platform @@ -84,6 +87,7 @@ def m_grouped_fp8_gemm_nt_contiguous_custom_python_op( layer_added_scale_attrs_1: paddle.Tensor, # getattr(layer, self.added_scale_attrs[1]) quant_config_weight_block_size_0: int, # self.quant_config.weight_block_size[0] disable_ue8m0_cast: bool, + dst_weights: paddle.Tensor, ): # up_gate_proj @@ -104,23 +108,30 @@ def m_grouped_fp8_gemm_nt_contiguous_custom_python_op( ) # swiglu - ffn_out = paddle.incubate.nn.functional.swiglu(ffn_out) - - # down_proj - if not fastdeploy.envs.FD_USE_PHI_FP8_QUANT: - ffn_in_x, ffn_in_x_scale_tensor = fastdeploy.model_executor.ops.gpu.per_token_quant( - ffn_out, quant_config_weight_block_size_0 + if fastdeploy.envs.FD_MOE_PROB_IN_ADVANCE: + ffn_in_x, ffn_in_x_scale_tensor = paddlefleet_ops.fuse_weighted_swiglu_fp8_quant( + ffn_out, dst_weights, using_pow2_scaling=True, use_ue8m0=not disable_ue8m0_cast ) - ffn_in_x_scale_tensor = ffn_in_x_scale_tensor.transpose([1, 0]).contiguous() - ffn_in_x_scale_tensor = ffn_in_x_scale_tensor.transpose([1, 0]) + ffn_in_x_scale_tensor = paddle.transpose(paddle.transpose(ffn_in_x_scale_tensor, [1, 0]).contiguous(), [1, 0]) else: - ffn_in_x, ffn_in_x_scale_tensor = paddle.incubate.nn.functional.fp8_quant_blockwise( - ffn_out, - using_pow2_scale=not disable_ue8m0_cast, - using_ue8m0_scale=not disable_ue8m0_cast, - ) - ffn_in_x_scale_tensor = ffn_in_x_scale_tensor.T[: ffn_in_x.shape[0]] + ffn_out = paddle.incubate.nn.functional.swiglu(ffn_out) + + # down_proj + if not fastdeploy.envs.FD_USE_PHI_FP8_QUANT: + ffn_in_x, ffn_in_x_scale_tensor = fastdeploy.model_executor.ops.gpu.per_token_quant( + ffn_out, quant_config_weight_block_size_0, not disable_ue8m0_cast + ) + + ffn_in_x_scale_tensor = ffn_in_x_scale_tensor.transpose([1, 0]).contiguous() + ffn_in_x_scale_tensor = ffn_in_x_scale_tensor.transpose([1, 0]) + else: + ffn_in_x, ffn_in_x_scale_tensor = paddle.incubate.nn.functional.fp8_quant_blockwise( + ffn_out, + using_pow2_scale=not disable_ue8m0_cast, + using_ue8m0_scale=not disable_ue8m0_cast, + ) + ffn_in_x_scale_tensor = ffn_in_x_scale_tensor.T[: ffn_in_x.shape[0]] ffn_out = paddle.empty( (permute_input.shape[0], layer_added_weight_attrs_1.shape[1]), @@ -136,6 +147,66 @@ def m_grouped_fp8_gemm_nt_contiguous_custom_python_op( return ffn_out +def moe_topk_select( + gating_output: paddle.Tensor, + n_group: int, + topk_group: int, + top_k: int, + routed_scaling_factor: float, + e_score_correction_bias: paddle.Tensor, + renormalize: bool = False, +): + """ + Topk selection using paddle PHI topk API. + + Args: + gating_output: gate output logits, shape [seq_len, n_experts] + n_group: number of expert groups + topk_group: number of top-k groups to select + top_k: number of top experts per token + routed_scaling_factor: scaling factor for routed experts + e_score_correction_bias: bias for expert selection + renormalize: whether to renormalize topk probabilities + + Returns: + topk_weights: normalized topk probabilities, shape [seq_len, top_k] + topk_ids: topk expert indices, shape [seq_len, top_k] + """ + # compute gate probs via sigmoid + gate_probs = paddle.nn.functional.sigmoid(gating_output) + # probs_for_choice includes correction bias for topk selection + probs_for_choice = gate_probs + e_score_correction_bias if e_score_correction_bias is not None else gate_probs + # group-based topk selection + n_group = n_group if n_group > 0 else 1 + topk_group = topk_group if topk_group > 0 else 1 + if n_group > 1 and topk_group < n_group: + seq_length, n_experts = probs_for_choice.shape + group_scores = ( + probs_for_choice.reshape([seq_length, n_group, -1]).topk(2, axis=-1)[0].sum(axis=-1) + ) # [seq_len, n_group] + group_idx = paddle.topk(group_scores, k=topk_group, axis=-1, sorted=True)[1] # [seq_len, topk_group] + group_mask = paddle.zeros_like(group_scores).put_along_axis( + group_idx, paddle.to_tensor(1.0, dtype=group_scores.dtype), axis=-1 + ) + score_mask = ( + group_mask.unsqueeze(-1).expand([seq_length, n_group, n_experts // n_group]).reshape([seq_length, -1]) + ) # [seq_len, n_experts] + probs_for_choice = probs_for_choice.masked_fill(~score_mask.astype(paddle.bool), float("-inf")) + + _, topk_ids = paddle.topk(probs_for_choice, top_k, axis=-1) + topk_weights = paddle.take_along_axis(gate_probs, topk_ids, axis=-1) + + # normalize combine weights + if renormalize: + topk_weights = topk_weights / paddle.clip(topk_weights.sum(-1, keepdim=True), min=1e-12) + + # apply routed scaling factor + if routed_scaling_factor: + topk_weights = topk_weights * routed_scaling_factor + + return topk_weights, topk_ids + + class DeepGemmFusedMoeMethod(MoEMethodBase): """ DeepGemmFusedMoeMethod is a class that implements the MoEMethodBase interface for DeepGemm backend. @@ -257,7 +328,22 @@ def apply_ep_prefill( hidden_size = x.shape[1] # 1. Select topk experts and weights - topk_idx, topk_weights = self.ep_prefill_runner.moe_select(layer, gate_out) + if ( + fastdeploy.envs.FD_USE_PHI_MOE_TOPK + and layer.redundant_table_manger is None + and layer.topk_method == "noaux_tc" + ): + topk_weights, topk_idx = moe_topk_select( + gate_out, + layer.n_group, + layer.topk_group, + layer.top_k, + layer.routed_scaling_factor, + layer.gate_correction_bias, + getattr(layer, "renormalize", True), + ) + else: + topk_idx, topk_weights = self.ep_prefill_runner.moe_select(layer, gate_out) if topk_ids_hookfunc is not None: topk_ids_hookfunc(topk_ids=topk_idx) @@ -371,22 +457,31 @@ def apply_ep_prefill( ) del permute_input - # swiglu - ffn_out = paddle.incubate.nn.functional.swiglu(ffn_out, None) + if fastdeploy.envs.FD_MOE_PROB_IN_ADVANCE: + ffn_in_x, ffn_in_x_scale_tensor = paddlefleet_ops.fuse_weighted_swiglu_fp8_quant( + ffn_out, dst_weights, using_pow2_scaling=True, use_ue8m0=self.quant_config.deepgemm_scale_ue8m0 + ) - # down_proj - if not fastdeploy.envs.FD_USE_PHI_FP8_QUANT: - ffn_in_x, ffn_in_x_scale_tensor = fastdeploy.model_executor.ops.gpu.per_token_quant( - ffn_out, self.quant_config.weight_block_size[0] + ffn_in_x_scale_tensor = paddle.transpose( + paddle.transpose(ffn_in_x_scale_tensor, [1, 0]).contiguous(), [1, 0] ) - ffn_in_x_scale_tensor = ffn_in_x_scale_tensor.transpose([1, 0]).contiguous().transpose([1, 0]) else: - ffn_in_x, ffn_in_x_scale_tensor = paddle.incubate.nn.functional.fp8_quant_blockwise( - ffn_out, - using_pow2_scale=self.quant_config.deepgemm_scale_ue8m0, - using_ue8m0_scale=self.quant_config.deepgemm_scale_ue8m0, - ) - ffn_in_x_scale_tensor = ffn_in_x_scale_tensor.T[: ffn_in_x.shape[0]] + # swiglu + ffn_out = paddle.incubate.nn.functional.swiglu(ffn_out, None) + + # down_proj + if not fastdeploy.envs.FD_USE_PHI_FP8_QUANT: + ffn_in_x, ffn_in_x_scale_tensor = fastdeploy.model_executor.ops.gpu.per_token_quant( + ffn_out, self.quant_config.weight_block_size[0], self.quant_config.deepgemm_scale_ue8m0 + ) + ffn_in_x_scale_tensor = ffn_in_x_scale_tensor.transpose([1, 0]).contiguous().transpose([1, 0]) + else: + ffn_in_x, ffn_in_x_scale_tensor = paddle.incubate.nn.functional.fp8_quant_blockwise( + ffn_out, + using_pow2_scale=self.quant_config.deepgemm_scale_ue8m0, + using_ue8m0_scale=self.quant_config.deepgemm_scale_ue8m0, + ) + ffn_in_x_scale_tensor = ffn_in_x_scale_tensor.T[: ffn_in_x.shape[0]] del ffn_out ffn_out = paddle.empty( @@ -408,7 +503,7 @@ def apply_ep_prefill( token_prob_unzipped=dst_weights, total_zipped_tokens=recv_x.shape[0], num_experts=layer.num_local_experts, - using_weighted_combine=True, + using_weighted_combine=not fastdeploy.envs.FD_MOE_PROB_IN_ADVANCE, ) else: @@ -523,15 +618,28 @@ def apply_tp( gate_out = gate(x.cast("float32")) if layer.topk_method == "noaux_tc": - _, topk_weights, topk_ids = fastdeploy.model_executor.layers.moe.moe.get_moe_scores( - gate_out, - layer.n_group, - layer.topk_group, - layer.top_k, - layer.routed_scaling_factor, - layer.gate_correction_bias, - getattr(layer, "renormalize", True), - ) + + if not fastdeploy.envs.FD_USE_PHI_MOE_TOPK: + _, topk_weights, topk_ids = fastdeploy.model_executor.layers.moe.moe.get_moe_scores( + gate_out, + layer.n_group, + layer.topk_group, + layer.top_k, + layer.routed_scaling_factor, + layer.gate_correction_bias, + getattr(layer, "renormalize", True), + ) + else: + topk_weights, topk_ids = moe_topk_select( + gate_out, + layer.n_group, + layer.topk_group, + layer.top_k, + layer.routed_scaling_factor, + layer.gate_correction_bias, + getattr(layer, "renormalize", True), + ) + else: topk_ids, topk_weights = fastdeploy.model_executor.ops.gpu.moe_topk_select( gate_out, @@ -613,6 +721,7 @@ def apply_tp( getattr(layer, self.added_scale_attrs[1]), self.quant_config.weight_block_size[0], disable_ue8m0_cast=not self.quant_config.deepgemm_scale_ue8m0, + dst_weights=dst_weights if fastdeploy.envs.FD_MOE_PROB_IN_ADVANCE else None, ) # prmt back per rank @@ -624,7 +733,7 @@ def apply_tp( token_prob_unzipped=dst_weights, total_zipped_tokens=recv_x.shape[0], num_experts=layer.num_experts, - using_weighted_combine=True, + using_weighted_combine=not fastdeploy.envs.FD_MOE_PROB_IN_ADVANCE, ) else: tmp_ffn_out = fastdeploy.model_executor.ops.gpu.ep_moe_expert_combine( diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py index c3f3e84d466..6de7524645e 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_triton_backend.py @@ -40,6 +40,7 @@ pass from fastdeploy.model_executor.layers.moe.moe import get_moe_scores from fastdeploy.model_executor.layers.quantization.fp8_utils import ( + fused_stack_transpose_quant, quant_weight_ue8m0, transform_scale_ue8m0, ) @@ -1622,22 +1623,45 @@ def _process_quantize(weight_idx): ) weight[expert_id].copy_(weight_quant, False) else: - weight = paddle.empty(shape=weight_shape, dtype=weight_dtype) - scale_list = [] - - for expert_id in range(layer.num_local_experts): - w_q, s_fp32 = quant_weight_ue8m0( - weight_dequant=getattr(layer, unquantized_weight_name)[expert_id] - .transpose([1, 0]) - .contiguous(), - weight_block_size=self.quant_config.weight_block_size, - ) - s_ue8m0 = transform_scale_ue8m0( - s_fp32, mn=w_q.shape[-2], weight_block_size=self.quant_config.weight_block_size - ) - weight[expert_id].copy_(w_q, False) - scale_list.append(s_ue8m0) - scale = paddle.to_tensor(scale_list) + if fastdeploy.envs.FD_USE_PHI_FP8_QUANT: + num_expert = layer.num_local_experts + expert_weight_list = [getattr(layer, unquantized_weight_name)[i] for i in range(num_expert)] + weight = paddle.empty(shape=weight_shape, dtype=weight_dtype) + scale_list = [] + chunk_size = 64 + + for start_idx in range(0, num_expert, chunk_size): + end_idx = min(start_idx + chunk_size, num_expert) + local_chunk_size = end_idx - start_idx + chunk_experts = [w.contiguous() for w in expert_weight_list[start_idx:end_idx]] + + w1_t_quant, w1_t_scale = fused_stack_transpose_quant( + chunk_experts, use_ue8m0=self.quant_config.deepgemm_scale_ue8m0 + ) + w1_t_quant = w1_t_quant.reshape([local_chunk_size, -1, w1_t_quant.shape[-1]]) + w1_t_scale = w1_t_scale.reshape([local_chunk_size, -1, w1_t_scale.shape[-1]]) + + weight[start_idx:end_idx].copy_(w1_t_quant, False) + scale_list.append(w1_t_scale) + + scale = paddle.concat(scale_list, axis=0) + else: + weight = paddle.empty(shape=weight_shape, dtype=weight_dtype) + scale_list = [] + + for expert_id in range(layer.num_local_experts): + w_q, s_fp32 = quant_weight_ue8m0( + weight_dequant=getattr(layer, unquantized_weight_name)[expert_id] + .transpose([1, 0]) + .contiguous(), + weight_block_size=self.quant_config.weight_block_size, + ) + s_ue8m0 = transform_scale_ue8m0( + s_fp32, mn=w_q.shape[-2], weight_block_size=self.quant_config.weight_block_size + ) + weight[expert_id].copy_(w_q, False) + scale_list.append(s_ue8m0) + scale = paddle.to_tensor(scale_list) free_tensor(getattr(layer, unquantized_weight_name)) free_tensor(getattr(layer, weight_name)) diff --git a/fastdeploy/model_executor/layers/quantization/fp8_utils.py b/fastdeploy/model_executor/layers/quantization/fp8_utils.py index d4cc8a210b4..ac838477234 100644 --- a/fastdeploy/model_executor/layers/quantization/fp8_utils.py +++ b/fastdeploy/model_executor/layers/quantization/fp8_utils.py @@ -14,6 +14,8 @@ # limitations under the License. """ +import importlib + import paddle from paddleformers.utils.log import logger @@ -22,6 +24,33 @@ from ..utils import get_sm_version +def try_import(modules, name=None, fail_msg=None): + """ + try_import + """ + if not isinstance(modules, (list, tuple)): + modules = [modules] + + for m in modules: + assert isinstance(m, str), m + try: + m = importlib.import_module(m) + except ImportError: + m = None + + if m is not None: + if name is None: + return m + elif hasattr(m, name): + return getattr(m, name) + + if fail_msg is not None: + logger.warning(fail_msg) + + +paddlefleet_ops = try_import(["paddlefleet.ops"]) + + def load_deep_gemm(): """ Load DeepGemm module according to FastDeploy env switch. @@ -130,3 +159,25 @@ def quant_weight_ue8m0(weight_dequant, weight_block_size): ) return out_w, out_s + + +def fused_stack_transpose_quant(expert_weight_list, use_ue8m0=False): + """fused_stack_transpose_quant""" + if hasattr(paddlefleet_ops, "fuse_stack_transpose_fp8_quant"): + # Blackwell (SM100) GPUs require pow2_scale quantization. + # Guard with is_cuda() so non-CUDA environments do not call into + # paddle.device.cuda.* and cause a crash. + use_pow2_scale = current_platform.is_cuda() and get_sm_version() == 100 + + w, scale = paddlefleet_ops.fuse_stack_transpose_fp8_quant( + expert_weight_list, + use_pow2_scale, + use_ue8m0, + use_ue8m0, + ) + if use_ue8m0: + scale = scale.T + else: + raise RuntimeError("'fuse_stack_transpose_fp8_quant' is not available in the current paddlefleet_ops.") + + return w, scale diff --git a/tests/layers/test_deepgemm_fused_moe.py b/tests/layers/test_deepgemm_fused_moe.py new file mode 100644 index 00000000000..5381ee866a3 --- /dev/null +++ b/tests/layers/test_deepgemm_fused_moe.py @@ -0,0 +1,511 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +from __future__ import annotations + +import sys +import types + +import paddle +import pytest + +# --------------------------------------------------------------------------- +# Minimal stub before any fastdeploy import: deep_ep requires distributed setup +# --------------------------------------------------------------------------- + +deep_ep_stub = types.ModuleType("fastdeploy.model_executor.layers.moe.ep.deep_ep") +deep_ep_stub.Buffer = types.SimpleNamespace(capture=lambda: object()) +sys.modules["fastdeploy.model_executor.layers.moe.ep.deep_ep"] = deep_ep_stub + +from fastdeploy.model_executor.layers.moe import ( # noqa: E402 + fused_moe_deepgemm_backend as backend, +) + +# --------------------------------------------------------------------------- +# Detect whether deepgemm JIT compilation works on this machine. +# It requires the host compiler to support C++17 (GCC >= 7). +# CI machines with older GCC will fail to compile the kernel. +# --------------------------------------------------------------------------- + + +def _deepgemm_available() -> bool: + """Try to JIT-compile a minimal deepgemm kernel; return False on failure.""" + try: + from fastdeploy.model_executor.layers.quantization.fp8_utils import deep_gemm + + lhs = paddle.zeros([128, 128], dtype="float8_e4m3fn") + lhs_scale = paddle.ones([128, 1], dtype="float32") + rhs = paddle.zeros([1, 128, 128], dtype="float8_e4m3fn") + rhs_scale = paddle.ones([1, 1, 1], dtype="float32") + out = paddle.empty([128, 128], dtype="bfloat16") + m_indices = paddle.zeros([128], dtype="int32") + deep_gemm.m_grouped_gemm_fp8_fp8_bf16_nt_contiguous((lhs, lhs_scale), (rhs, rhs_scale), out, m_indices) + return True + except Exception: + return False + + +_DEEPGEMM_AVAILABLE = _deepgemm_available() + +requires_deepgemm = pytest.mark.skipif( + not _DEEPGEMM_AVAILABLE, + reason="deepgemm JIT compilation requires C++17-capable host compiler (GCC >= 7)", +) + +# --------------------------------------------------------------------------- +# Test parameters – deepgemm requires: +# M alignment = 128 (tokens dispatched to each expert) +# N, K must be multiples of 128 +# --------------------------------------------------------------------------- +NUM_EXPERTS = 2 +HIDDEN_SIZE = 128 # K +MOE_INTER = 128 # moe_intermediate_size → N_up = 256, N_down = 128 +TOP_K = 2 +EP_SIZE = 1 +# Use ≥128 tokens so that after top-k expansion M≥128 (deepgemm alignment) +NUM_TOKENS = 128 # ensures token_all_num = NUM_TOKENS * TOP_K / ... ≥ 128 + +# Weight block size matching deepgemm: 128×128 +WEIGHT_BLOCK_SIZE = (128, 128) + + +# --------------------------------------------------------------------------- +# Dummy helpers +# --------------------------------------------------------------------------- + + +class DummyQuantConfig: + def __init__(self): + self.weight_block_size = WEIGHT_BLOCK_SIZE + self.deepgemm_scale_ue8m0 = False + self.is_checkpoint_bf16 = False + + def name(self): + return "blockwise_fp8" + + +class DummyFDConfig: + def __init__(self): + self.load_config = types.SimpleNamespace(load_choices="default_v1", dynamic_load_weight=False) + self.model_config = types.SimpleNamespace( + enable_cache=False, + model="dummy", + # ep_size * this = max tokens buffer for masked GEMM; must be ≥ aligned M + num_max_dispatch_tokens_per_rank=128, + ) + self.scheduler_config = types.SimpleNamespace(max_num_batched_tokens=NUM_TOKENS) + self.parallel_config = types.SimpleNamespace(tensor_parallel_size=1) + + +class DummyLayer(paddle.nn.Layer): + """Layer with properly-shaped fp8 weights for deepgemm.""" + + def __init__(self): + super().__init__() + qc = DummyQuantConfig() + E = NUM_EXPERTS + K = HIDDEN_SIZE + N_up = MOE_INTER * 2 # 256 + N_down = HIDDEN_SIZE # 128 + K_down = MOE_INTER # 128 + + self.num_local_experts = E + self.num_experts = E + self.hidden_size = K + self.moe_intermediate_size = MOE_INTER + self.top_k = TOP_K + self.ep_size = EP_SIZE + self.n_group = 1 + self.topk_group = 1 + self.routed_scaling_factor = 1.0 + self.renormalize = True + self.gate_correction_bias = paddle.zeros([E], dtype="float32") + self.topk_method = "noaux_tc" + self.fd_config = DummyFDConfig() + self.quant_method = types.SimpleNamespace(quant_config=qc) + + # up_gate_proj_weight: [E, N_up, K] (deepgemm NT: each expert [N, K]) + self.up_gate_proj_weight = self.create_parameter( + shape=[E, N_up, K], + dtype="float8_e4m3fn", + default_initializer=paddle.nn.initializer.Constant(0), + ) + # down_proj_weight: [E, N_down, K_down] + self.down_proj_weight = self.create_parameter( + shape=[E, N_down, K_down], + dtype="float8_e4m3fn", + default_initializer=paddle.nn.initializer.Constant(0), + ) + # Scales: [E, ceil(N/128), ceil(K/128)] + self.up_gate_proj_weight_scale_inv = self.create_parameter( + shape=[E, N_up // 128, K // 128], # [2, 2, 1] + dtype="float32", + default_initializer=paddle.nn.initializer.Constant(1.0), + ) + self.down_proj_weight_scale_inv = self.create_parameter( + shape=[E, N_down // 128, K_down // 128], # [2, 1, 1] + dtype="float32", + default_initializer=paddle.nn.initializer.Constant(1.0), + ) + + +class DummyGate(paddle.nn.Layer): + def __init__(self, num_experts): + super().__init__() + self.num_experts = num_experts + + def forward(self, x): + return paddle.ones([x.shape[0], self.num_experts], dtype="float32") + + +def _make_method(): + qc = DummyQuantConfig() + method = backend.DeepGemmFusedMoeMethod(qc) + method.added_weight_attrs = ["up_gate_proj_weight", "down_proj_weight"] + method.added_scale_attrs = ["up_gate_proj_weight_scale_inv", "down_proj_weight_scale_inv"] + return method + + +# --------------------------------------------------------------------------- +# Tests: apply_tp +# --------------------------------------------------------------------------- + + +class TestApplyTp: + """apply_tp with FD_USE_PHI_FP8_QUANT=True, FD_USE_PHI_MOE_PERMUTE=True.""" + + @requires_deepgemm + def test_apply_tp_noaux_tc_path(self): + """noaux_tc: get_moe_scores → fp8_quant_blockwise → moe_permute → deepgemm → moe_unpermute.""" + layer = DummyLayer() + gate = DummyGate(layer.num_local_experts) + method = _make_method() + + x = paddle.randn([NUM_TOKENS, HIDDEN_SIZE], dtype="bfloat16") + captured = {} + + def hook(topk_ids): + captured["topk_ids"] = topk_ids + + out = method.apply_tp(layer, x, gate, topk_ids_hookfunc=hook) + + assert "topk_ids" in captured + assert list(out.shape) == [NUM_TOKENS, HIDDEN_SIZE] + + @requires_deepgemm + def test_apply_tp_aux_path(self): + """Non-noaux_tc: moe_topk_select → fp8_quant_blockwise → moe_permute → deepgemm → moe_unpermute.""" + layer = DummyLayer() + layer.topk_method = "greedy" + gate = DummyGate(layer.num_local_experts) + method = _make_method() + + x = paddle.randn([NUM_TOKENS, HIDDEN_SIZE], dtype="bfloat16") + out = method.apply_tp(layer, x, gate) + + assert list(out.shape) == [NUM_TOKENS, HIDDEN_SIZE] + + +# --------------------------------------------------------------------------- +# Tests: apply_ep_prefill +# --------------------------------------------------------------------------- + + +class TestApplyEpPrefill: + """apply_ep_prefill: stub only the EP communication runner.""" + + def _make_zero_runner(self, layer): + """Runner that returns 0 tokens per expert → zero-token branch.""" + + class ZeroRunner: + num_worst_tokens = 0 + ep_engine = types.SimpleNamespace(async_finish=False) + + def moe_select(self, _layer, gate_out): + n = gate_out.shape[0] + return ( + paddle.zeros([n, _layer.top_k], dtype="int64"), + paddle.ones([n, _layer.top_k], dtype="float32"), + ) + + def dispatch(self, x, topk_idx, topk_weights, **kwargs): + # x is already fp8 (after fp8_quant_blockwise), scale comes via x_scale_tensor kwarg + n = x.shape[0] + scale = kwargs.get("x_scale_tensor", paddle.ones([n, 1], dtype="float32")) + return ( + (x, scale), + topk_idx, + topk_weights, + [0, 0], + object(), + types.SimpleNamespace(current_stream_wait=lambda: None), + ) + + def combine(self, out, handle, weights, event): + return out, types.SimpleNamespace(current_stream_wait=lambda: None) + + return ZeroRunner() + + def _make_contiguous_runner(self, layer): + """Runner that returns token_all_num > 0 → contiguous GEMM branch.""" + + class ContiguousRunner: + num_worst_tokens = 0 + ep_engine = types.SimpleNamespace(async_finish=False) + + def moe_select(self, _layer, gate_out): + n = gate_out.shape[0] + # Route all tokens to expert 0 so count is deterministic + topk_ids = paddle.zeros([n, _layer.top_k], dtype="int64") + topk_weights = paddle.ones([n, _layer.top_k], dtype="float32") + return topk_ids, topk_weights + + def dispatch(self, x, topk_idx, topk_weights, **kwargs): + n = x.shape[0] + scale = kwargs.get("x_scale_tensor", paddle.ones([n, 1], dtype="float32")) + # non-zero counts so token_all_num > 0 + num_per_expert = [n * layer.top_k // layer.num_local_experts] * layer.num_local_experts + return ( + (x, scale), + topk_idx, + topk_weights, + num_per_expert, + object(), + types.SimpleNamespace(current_stream_wait=lambda: None), + ) + + def combine(self, out, handle, weights, event): + return out, types.SimpleNamespace(current_stream_wait=lambda: None) + + return ContiguousRunner() + + def test_ep_prefill_zero_token_path(self): + """All experts get 0 tokens → returns empty [0, hidden_size] tensor.""" + layer = DummyLayer() + layer.topk_method = "greedy" + gate = DummyGate(layer.num_local_experts) + method = _make_method() + method.ep_prefill_runner = self._make_zero_runner(layer) + + x = paddle.randn([NUM_TOKENS, HIDDEN_SIZE], dtype="bfloat16") + out = method.apply_ep_prefill(layer, x, gate) + assert list(out.shape) == [0, HIDDEN_SIZE] + + @requires_deepgemm + def test_ep_prefill_contiguous_path(self): + """token_all_num > 0, num_worst_tokens == 0 → moe_permute + contiguous deepgemm.""" + layer = DummyLayer() + layer.topk_method = "greedy" + gate = DummyGate(layer.num_local_experts) + method = _make_method() + method.ep_prefill_runner = self._make_contiguous_runner(layer) + + x = paddle.randn([NUM_TOKENS, HIDDEN_SIZE], dtype="bfloat16") + out = method.apply_ep_prefill(layer, x, gate) + assert len(out.shape) == 2 + assert out.shape[1] == HIDDEN_SIZE + + @requires_deepgemm + def test_ep_prefill_prob_in_advance_phi_moe_permute(self, monkeypatch): + """FD_MOE_PROB_IN_ADVANCE=True + FD_USE_PHI_MOE_PERMUTE=True: + fuse_weighted_swiglu_fp8_quant path → moe_unpermute with using_weighted_combine=False.""" + import fastdeploy + + monkeypatch.setattr(fastdeploy.envs, "FD_MOE_PROB_IN_ADVANCE", True) + monkeypatch.setattr(fastdeploy.envs, "FD_USE_PHI_MOE_PERMUTE", True) + + # Stub paddlefleet_ops.fuse_weighted_swiglu_fp8_quant + from fastdeploy.model_executor.layers.quantization import fp8_utils + + def fake_fuse_weighted_swiglu_fp8_quant(ffn_out, dst_weights, using_pow2_scaling=True, use_ue8m0=False): + half = ffn_out.shape[-1] // 2 + out_fp8 = ffn_out[:, :half].cast("float8_e4m3fn") + scale = paddle.ones([ffn_out.shape[0], 1], dtype="float32") + return out_fp8, scale + + fake_ops = types.SimpleNamespace(fuse_weighted_swiglu_fp8_quant=fake_fuse_weighted_swiglu_fp8_quant) + monkeypatch.setattr(fp8_utils, "paddlefleet_ops", fake_ops) + # Also patch the reference used in the backend module + monkeypatch.setattr(backend, "paddlefleet_ops", fake_ops) + + layer = DummyLayer() + layer.topk_method = "greedy" + gate = DummyGate(layer.num_local_experts) + method = _make_method() + method.ep_prefill_runner = self._make_contiguous_runner(layer) + + x = paddle.randn([NUM_TOKENS, HIDDEN_SIZE], dtype="bfloat16") + out = method.apply_ep_prefill(layer, x, gate) + assert len(out.shape) == 2 + assert out.shape[1] == HIDDEN_SIZE + + @requires_deepgemm + def test_ep_prefill_prob_in_advance_no_phi_moe_permute(self, monkeypatch): + """FD_MOE_PROB_IN_ADVANCE=True + FD_USE_PHI_MOE_PERMUTE=False: + fuse_weighted_swiglu_fp8_quant path → ep_moe_expert_combine.""" + import fastdeploy + + monkeypatch.setattr(fastdeploy.envs, "FD_MOE_PROB_IN_ADVANCE", True) + monkeypatch.setattr(fastdeploy.envs, "FD_USE_PHI_MOE_PERMUTE", False) + + from fastdeploy.model_executor.layers.quantization import fp8_utils + + def fake_fuse_weighted_swiglu_fp8_quant(ffn_out, dst_weights, using_pow2_scaling=True, use_ue8m0=False): + half = ffn_out.shape[-1] // 2 + out_fp8 = ffn_out[:, :half].cast("float8_e4m3fn") + scale = paddle.ones([ffn_out.shape[0], 1], dtype="float32") + return out_fp8, scale + + fake_ops = types.SimpleNamespace(fuse_weighted_swiglu_fp8_quant=fake_fuse_weighted_swiglu_fp8_quant) + monkeypatch.setattr(fp8_utils, "paddlefleet_ops", fake_ops) + monkeypatch.setattr(backend, "paddlefleet_ops", fake_ops) + + layer = DummyLayer() + layer.topk_method = "greedy" + gate = DummyGate(layer.num_local_experts) + method = _make_method() + method.ep_prefill_runner = self._make_contiguous_runner(layer) + + x = paddle.randn([NUM_TOKENS, HIDDEN_SIZE], dtype="bfloat16") + out = method.apply_ep_prefill(layer, x, gate) + assert len(out.shape) == 2 + assert out.shape[1] == HIDDEN_SIZE + + @requires_deepgemm + def test_ep_prefill_prob_in_advance_phi_fp8_quant(self, monkeypatch): + """FD_MOE_PROB_IN_ADVANCE=True + FD_USE_PHI_FP8_QUANT=True + FD_USE_PHI_MOE_PERMUTE=True: + fp8_quant_blockwise input quant → fuse_weighted_swiglu_fp8_quant → moe_unpermute path.""" + import fastdeploy + + monkeypatch.setattr(fastdeploy.envs, "FD_MOE_PROB_IN_ADVANCE", True) + monkeypatch.setattr(fastdeploy.envs, "FD_USE_PHI_FP8_QUANT", True) + monkeypatch.setattr(fastdeploy.envs, "FD_USE_PHI_MOE_PERMUTE", True) + + from fastdeploy.model_executor.layers.quantization import fp8_utils + + def fake_fuse_weighted_swiglu_fp8_quant(ffn_out, dst_weights, using_pow2_scaling=True, use_ue8m0=False): + half = ffn_out.shape[-1] // 2 + out_fp8 = ffn_out[:, :half].cast("float8_e4m3fn") + scale = paddle.ones([ffn_out.shape[0], 1], dtype="float32") + return out_fp8, scale + + fake_ops = types.SimpleNamespace(fuse_weighted_swiglu_fp8_quant=fake_fuse_weighted_swiglu_fp8_quant) + monkeypatch.setattr(fp8_utils, "paddlefleet_ops", fake_ops) + monkeypatch.setattr(backend, "paddlefleet_ops", fake_ops) + + layer = DummyLayer() + layer.topk_method = "greedy" + gate = DummyGate(layer.num_local_experts) + method = _make_method() + method.ep_prefill_runner = self._make_contiguous_runner(layer) + + x = paddle.randn([NUM_TOKENS, HIDDEN_SIZE], dtype="bfloat16") + out = method.apply_ep_prefill(layer, x, gate) + assert len(out.shape) == 2 + assert out.shape[1] == HIDDEN_SIZE + + @requires_deepgemm + def test_ep_prefill_prob_in_advance_phi_fp8_quant_no_phi_moe_permute(self, monkeypatch): + """FD_MOE_PROB_IN_ADVANCE=True + FD_USE_PHI_FP8_QUANT=True + FD_USE_PHI_MOE_PERMUTE=False: + fp8_quant_blockwise input quant → fuse_weighted_swiglu_fp8_quant → ep_moe_expert_combine path.""" + import fastdeploy + + monkeypatch.setattr(fastdeploy.envs, "FD_MOE_PROB_IN_ADVANCE", True) + monkeypatch.setattr(fastdeploy.envs, "FD_USE_PHI_FP8_QUANT", True) + monkeypatch.setattr(fastdeploy.envs, "FD_USE_PHI_MOE_PERMUTE", False) + + from fastdeploy.model_executor.layers.quantization import fp8_utils + + def fake_fuse_weighted_swiglu_fp8_quant(ffn_out, dst_weights, using_pow2_scaling=True, use_ue8m0=False): + half = ffn_out.shape[-1] // 2 + out_fp8 = ffn_out[:, :half].cast("float8_e4m3fn") + scale = paddle.ones([ffn_out.shape[0], 1], dtype="float32") + return out_fp8, scale + + fake_ops = types.SimpleNamespace(fuse_weighted_swiglu_fp8_quant=fake_fuse_weighted_swiglu_fp8_quant) + monkeypatch.setattr(fp8_utils, "paddlefleet_ops", fake_ops) + monkeypatch.setattr(backend, "paddlefleet_ops", fake_ops) + + layer = DummyLayer() + layer.topk_method = "greedy" + gate = DummyGate(layer.num_local_experts) + method = _make_method() + method.ep_prefill_runner = self._make_contiguous_runner(layer) + + x = paddle.randn([NUM_TOKENS, HIDDEN_SIZE], dtype="bfloat16") + out = method.apply_ep_prefill(layer, x, gate) + assert len(out.shape) == 2 + assert out.shape[1] == HIDDEN_SIZE + + +# --------------------------------------------------------------------------- +# Tests: apply_ep_decode +# --------------------------------------------------------------------------- + + +class TestApplyEpDecode: + """apply_ep_decode: stub only the EP communication runner.""" + + def _make_decode_runner(self, layer): + """Decode runner: dispatch returns fp8 tuple + token counts, combine aggregates.""" + max_dispatch = layer.fd_config.model_config.num_max_dispatch_tokens_per_rank + + class DecodeRunner: + ep_engine = types.SimpleNamespace(async_finish=False) + + def moe_select(self, _layer, gate_out): + n = gate_out.shape[0] + top_k = _layer.top_k + return ( + paddle.zeros([n, top_k], dtype="int64"), + paddle.ones([n, top_k], dtype="float32"), + ) + + def dispatch(self, x, topk_idx, topk_weights, use_fp8=False, use_ue8m0=False): + E = layer.num_local_experts + ep = layer.ep_size + K = layer.hidden_size + # Return (fp8_tensor, scale) tuple as expected by apply_ep_decode + x_fp8 = paddle.zeros([E, ep * max_dispatch, K], dtype="float8_e4m3fn") + scale = paddle.ones([E, ep * max_dispatch, 1], dtype="float32") + token_nums = paddle.zeros([E], dtype="int32") + return (x_fp8, scale), token_nums, object() + + def combine(self, ffn_out, topk_idx, topk_weights, handle): + n_tok = topk_idx.shape[0] + return paddle.zeros([n_tok, layer.hidden_size], dtype="bfloat16") + + return DecodeRunner() + + @requires_deepgemm + def test_ep_decode_masked_gemm_path(self): + """dispatch → masked deepgemm → fused_mask_swiglu_fp8_quant → masked deepgemm → combine.""" + layer = DummyLayer() + layer.topk_method = "greedy" + gate = DummyGate(layer.num_local_experts) + method = _make_method() + method.ep_decoder_runner = self._make_decode_runner(layer) + + x = paddle.randn([NUM_TOKENS, HIDDEN_SIZE], dtype="bfloat16") + captured = {} + + def hook(topk_ids): + captured["topk_ids"] = topk_ids + + out = method.apply_ep_decode(layer, x, gate, topk_ids_hookfunc=hook) + + assert "topk_ids" in captured + assert list(out.shape) == [NUM_TOKENS, HIDDEN_SIZE] diff --git a/tests/layers/test_fp8_ue8m0.py b/tests/layers/test_fp8_ue8m0.py index a6a44a4e338..25e45f2006d 100644 --- a/tests/layers/test_fp8_ue8m0.py +++ b/tests/layers/test_fp8_ue8m0.py @@ -20,6 +20,8 @@ import paddle +import fastdeploy.model_executor.layers.moe.fused_moe_triton_backend as moe_backend_module +import fastdeploy.model_executor.layers.quantization.fp8_utils as fp8_utils_module from fastdeploy.model_executor.layers.moe.fused_moe_triton_backend import ( BlockWiseFP8MoEMethod, ) @@ -101,11 +103,34 @@ def setUp(self): self.quant_config.deepgemm_scale_ue8m0 = True # set deepgemm_scale_ue8m0 to True def test_create_layer_with_ue8m0_scale(self): - def fake_per_block_cast_to_fp8(x, use_ue8m0=True): - out_w = x.astype(paddle.float8_e4m3fn) - out_s = paddle.ones([(x.shape[0] // 128), (x.shape[1] // 128)], dtype=paddle.float32) + # This test covers the quant_weight_ue8m0 branch in BlockWiseFP8MoEMethod.process_weights_after_loading + # The branch is entered when: + # - deepgemm_scale_ue8m0=True + # - FD_USE_PHI_FP8_QUANT=False (so we don't use fused_stack_transpose_quant) + # - is_checkpoint_bf16=True + + def fake_quant_weight_ue8m0(weight_dequant, weight_block_size): + # Mock quant_weight_ue8m0 behavior + n, k = weight_dequant.shape[-2], weight_dequant.shape[-1] + out_w = weight_dequant.astype(paddle.float8_e4m3fn) + # Scale shape: [ceil_div(n, 128), ceil_div(k, 128)] + out_s = paddle.ones([(n + 127) // 128, (k + 127) // 128], dtype="float32") return out_w, out_s + def fake_transform_scale_ue8m0(sf, mn, weight_block_size=None): + # Mock transform_scale_ue8m0 behavior + # For input [mn, k] where k is small (e.g., 2): + # After index_select: sf becomes [mn, k] + # After TMA align and pack: result is [mn, align(k, 4)//4] + # For k=2, align(2,4)=4, so result is [mn, 1] + if weight_block_size: + indices = paddle.arange(mn) // 128 + sf = paddle.index_select(sf, -2, indices) + # Final shape: [mn, align(k,4)//4] + aligned_k = ((sf.shape[-1] + 3) // 4) * 4 if sf.shape[-1] < 4 else ((sf.shape[-1] + 4 - 1) // 4) * 4 + result_shape = [sf.shape[0], aligned_k // 4] + return paddle.zeros(result_shape, dtype=paddle.int32) + fd_config = mock.MagicMock() fd_config.load_config.load_choices.return_value = "default_v1" layer = DummyFusedMoELayer( @@ -113,30 +138,333 @@ def fake_per_block_cast_to_fp8(x, use_ue8m0=True): ) method = BlockWiseFP8MoEMethod(quant_config=self.quant_config) - method.up_gate_proj_weight_shape = [1, 512, 256] - method.down_proj_weight_shape = [1, 256, 256] - method.up_gate_proj_scale_shape = [1, 512, 1] + # Call create_weights to initialize method attributes and layer parameters + method.create_weights(layer, model_format="torch") + + # Override the down_proj scale shape to match expected [1, 256, 1] method.down_proj_scale_shape = [1, 256, 1] - if "fastdeploy.model_executor.ops.gpu.deep_gemm.utils" in sys.modules: - # This is for sm90, which DeepGEMM does not support ue8m0 scale - fake = types.ModuleType("fastdeploy.model_executor.ops.gpu.deep_gemm") - fake2 = types.ModuleType("fastdeploy.model_executor.ops.gpu.deep_gemm.utils.math") - fake2.per_block_cast_to_fp8 = fake_per_block_cast_to_fp8 - fake3 = types.ModuleType("fastdeploy.model_executor.ops.gpu.deep_gemm.utils") - fake3.align = lambda x, y: (x + y - 1) // y * y - fake3.get_tma_aligned_size = lambda x, y: (x + 16 // y - 1) // (16 // y) * (16 // y) + # Mock quant_weight_ue8m0 and transform_scale_ue8m0 for the else branch + # Patch them after create_weights is called + with mock.patch.object(moe_backend_module, "quant_weight_ue8m0", fake_quant_weight_ue8m0): + with mock.patch.object(moe_backend_module, "transform_scale_ue8m0", fake_transform_scale_ue8m0): + with mock.patch.object(moe_backend_module, "free_tensor", lambda tensor: None): + import fastdeploy.envs as _fd_envs - fake.utils = fake3 - fake3.math = fake2 + # Set FD_USE_PHI_FP8_QUANT=False to enter the quant_weight_ue8m0 branch + with mock.patch.object(_fd_envs, "FD_USE_PHI_FP8_QUANT", False): + method.model_format = "torch" + method.process_weights_after_loading(layer) - sys.modules["fastdeploy.model_executor.ops.gpu.deep_gemm"].utils = fake3 + self.assertTrue(layer.down_proj_weight_scale_inv.dtype == paddle.int32) + self.assertEqual(layer.down_proj_weight_scale_inv.shape, method.down_proj_scale_shape) + + def _make_fake_fused_stack_transpose_quant(self, weight_shape): + """Return a fake fused_stack_transpose_quant compatible with weight_shape [N, H, W].""" + + def fake_fused(expert_list, use_ue8m0=False): + n = len(expert_list) + # weight_shape: [total_experts, dim_a, dim_b] + # fused_stack_transpose_quant returns a flat tensor that gets reshaped to + # [chunk_n, dim_a, dim_b] inside _process_quantize. + # We return [n*dim_a, dim_b] so reshape([n, -1, dim_b]) works. + _, dim_a, dim_b = weight_shape + w = paddle.zeros([n * dim_a, dim_b], dtype=paddle.float8_e4m3fn) + s = paddle.ones([n * dim_a, max(1, dim_b // 128)], dtype=paddle.float32) + return w, s + + return fake_fused + + def test_fleet_fp8_quant_single_chunk(self): + """FD_USE_PHI_FP8_QUANT=True, num_experts <= 64: single chunk path runs without error.""" + import fastdeploy.envs as _fd_envs + + fd_config = mock.MagicMock() + fd_config.load_config.load_choices.return_value = "default_v1" + num_experts = 4 + hidden_size = 256 + intermediate_size = 256 + + layer = DummyFusedMoELayer( + fd_config=fd_config, + num_local_experts=num_experts, + moe_intermediate_size=intermediate_size, + hidden_size=hidden_size, + ) + method = BlockWiseFP8MoEMethod(quant_config=self.quant_config) + up_gate_shape = [num_experts, hidden_size, intermediate_size * 2] + down_shape = [num_experts, intermediate_size, hidden_size] + method.up_gate_proj_weight_shape = up_gate_shape + method.down_proj_weight_shape = down_shape + method.up_gate_proj_scale_shape = [num_experts, intermediate_size * 2, 1] + method.down_proj_scale_shape = [num_experts, hidden_size, 1] method.model_format = "torch" - method.process_weights_after_loading(layer) + # Force gate_up branch so the fleet-FP8 chunk path is exercised for up_gate_proj_weight + with mock.patch.object(_fd_envs, "FD_USE_PHI_FP8_QUANT", True): + with mock.patch.object(moe_backend_module, "weight_fully_copied", return_value=True): + with mock.patch.object( + moe_backend_module, + "fused_stack_transpose_quant", + self._make_fake_fused_stack_transpose_quant(up_gate_shape), + ): + method.process_weights_after_loading(layer) - self.assertTrue(layer.down_proj_weight_scale_inv.dtype == paddle.int32) - self.assertEqual(layer.down_proj_weight_scale_inv.shape, method.down_proj_scale_shape) + # Weight must be fp8 and scale must cover all experts + self.assertEqual(layer.up_gate_proj_weight.dtype, paddle.float8_e4m3fn) + self.assertEqual(layer.up_gate_proj_weight_scale_inv.shape[0], num_experts) + + def test_fleet_fp8_quant_multi_chunk(self): + """FD_USE_PHI_FP8_QUANT=True, num_experts=70>64: two chunks processed and concat'd.""" + import fastdeploy.envs as _fd_envs + + fd_config = mock.MagicMock() + fd_config.load_config.load_choices.return_value = "default_v1" + num_experts = 70 # chunk_size=64 → two chunks: 64 + 6 + hidden_size = 128 + intermediate_size = 128 + + layer = DummyFusedMoELayer( + fd_config=fd_config, + num_local_experts=num_experts, + moe_intermediate_size=intermediate_size, + hidden_size=hidden_size, + ) + method = BlockWiseFP8MoEMethod(quant_config=self.quant_config) + up_gate_shape = [num_experts, hidden_size, intermediate_size * 2] + down_shape = [num_experts, intermediate_size, hidden_size] + method.up_gate_proj_weight_shape = up_gate_shape + method.down_proj_weight_shape = down_shape + method.up_gate_proj_scale_shape = [num_experts, intermediate_size * 2, 1] + method.down_proj_scale_shape = [num_experts, hidden_size, 1] + + chunks_seen = [] + original_fake = self._make_fake_fused_stack_transpose_quant(up_gate_shape) + + def recording_fake(expert_list, use_ue8m0=False): + chunks_seen.append(len(expert_list)) + return original_fake(expert_list, use_ue8m0) + + method.model_format = "torch" + # Force gate_up branch to exercise the multi-chunk concat logic + with mock.patch.object(_fd_envs, "FD_USE_PHI_FP8_QUANT", True): + with mock.patch.object(moe_backend_module, "weight_fully_copied", return_value=True): + with mock.patch.object(moe_backend_module, "fused_stack_transpose_quant", recording_fake): + method.process_weights_after_loading(layer) + + # Expect exactly two chunks: 64 then 6 + self.assertIn(64, chunks_seen, "First chunk should be 64 experts") + self.assertIn(6, chunks_seen, "Second chunk should be remaining 6 experts") + + # Final scale param shape[0] must equal num_experts (result of concat across chunks) + self.assertEqual(layer.up_gate_proj_weight_scale_inv.shape[0], num_experts) + + +class TestFusedStackTransposeQuant(unittest.TestCase): + """Unit tests for fp8_utils.fused_stack_transpose_quant.""" + + def _make_expert_weights(self, num_experts=4, out_features=128, in_features=64): + """Create a list of bfloat16 weight tensors as expert inputs.""" + return [paddle.randn([out_features, in_features], dtype="bfloat16") for _ in range(num_experts)] + + # ------------------------------------------------------------------ + # Helper: build a minimal fake paddlefleet_ops namespace + # ------------------------------------------------------------------ + def _fake_paddlefleet_ops(self, *, has_op=True, use_pow2_scale_result=False, num_experts=4, out=128, inp=64): + """Return a mock object that optionally exposes fuse_stack_transpose_fp8_quant.""" + fake_ops = mock.MagicMock() + if has_op: + stacked_w = paddle.zeros([num_experts, inp, out], dtype=paddle.float8_e4m3fn) + scale = paddle.ones([num_experts * inp, out // 128 if out >= 128 else 1], dtype=paddle.float32) + + def fake_quant(expert_weight_list, use_pow2_scale, use_ue8m0_w, use_ue8m0_s): + return stacked_w, scale + + fake_ops.fuse_stack_transpose_fp8_quant = fake_quant + else: + # Simulate that the attribute is absent + del fake_ops.fuse_stack_transpose_fp8_quant + return fake_ops + + # ------------------------------------------------------------------ + # Test: op not available → RuntimeError + # ------------------------------------------------------------------ + def test_raises_when_op_unavailable(self): + """fused_stack_transpose_quant should raise RuntimeError when paddlefleet_ops + does not expose fuse_stack_transpose_fp8_quant.""" + fake_ops = mock.MagicMock(spec=[]) # empty spec → no attributes + expert_weights = self._make_expert_weights() + with mock.patch.object(fp8_utils_module, "paddlefleet_ops", fake_ops): + with self.assertRaises(RuntimeError) as ctx: + fp8_utils_module.fused_stack_transpose_quant(expert_weights) + self.assertIn("fuse_stack_transpose_fp8_quant", str(ctx.exception)) + + # ------------------------------------------------------------------ + # Test: normal path (use_ue8m0=False, non-Blackwell) + # ------------------------------------------------------------------ + def test_normal_path_no_ue8m0(self): + """Returns (w, scale) without transposing scale when use_ue8m0=False.""" + num_experts, out, inp = 4, 128, 64 + stacked_w = paddle.zeros([num_experts, inp, out], dtype=paddle.float8_e4m3fn) + scale = paddle.ones([num_experts, out // 128 if out >= 128 else 1, inp], dtype=paddle.float32) + scale_shape_before = list(scale.shape) + + call_kwargs = {} + + def fake_quant(expert_weight_list, use_pow2_scale, use_ue8m0_w, use_ue8m0_s): + call_kwargs["use_pow2_scale"] = use_pow2_scale + call_kwargs["use_ue8m0_w"] = use_ue8m0_w + call_kwargs["use_ue8m0_s"] = use_ue8m0_s + return stacked_w, scale + + fake_ops = mock.MagicMock() + fake_ops.fuse_stack_transpose_fp8_quant = fake_quant + expert_weights = self._make_expert_weights(num_experts, out, inp) + + with mock.patch.object(fp8_utils_module, "paddlefleet_ops", fake_ops): + # Force non-Blackwell (SM < 100) + with mock.patch.object(fp8_utils_module, "get_sm_version", return_value=90): + with mock.patch.object(fp8_utils_module.current_platform, "is_cuda", return_value=True): + w_out, scale_out = fp8_utils_module.fused_stack_transpose_quant(expert_weights, use_ue8m0=False) + + self.assertIs(w_out, stacked_w) + self.assertIs(scale_out, scale) + # Scale must NOT be transposed when use_ue8m0 is False + self.assertEqual(list(scale_out.shape), scale_shape_before) + self.assertFalse(call_kwargs["use_pow2_scale"]) + self.assertFalse(call_kwargs["use_ue8m0_w"]) + self.assertFalse(call_kwargs["use_ue8m0_s"]) + + # ------------------------------------------------------------------ + # Test: use_ue8m0=True → scale is transposed + # ------------------------------------------------------------------ + def test_ue8m0_transposes_scale(self): + """When use_ue8m0=True the returned scale tensor should be transposed.""" + num_experts, out, inp = 2, 256, 128 + stacked_w = paddle.zeros([num_experts, inp, out], dtype=paddle.float8_e4m3fn) + # Deliberate non-square shape so transposition is detectable + scale = paddle.ones([num_experts * inp, out // 128], dtype=paddle.float32) + original_shape = list(scale.shape) + + def fake_quant(expert_weight_list, use_pow2_scale, use_ue8m0_w, use_ue8m0_s): + return stacked_w, scale + + fake_ops = mock.MagicMock() + fake_ops.fuse_stack_transpose_fp8_quant = fake_quant + expert_weights = self._make_expert_weights(num_experts, out, inp) + + with mock.patch.object(fp8_utils_module, "paddlefleet_ops", fake_ops): + with mock.patch.object(fp8_utils_module, "get_sm_version", return_value=90): + with mock.patch.object(fp8_utils_module.current_platform, "is_cuda", return_value=True): + w_out, scale_out = fp8_utils_module.fused_stack_transpose_quant(expert_weights, use_ue8m0=True) + + # Shape should be the transpose of original + self.assertEqual(list(scale_out.shape), list(reversed(original_shape))) + + # ------------------------------------------------------------------ + # Test: Blackwell GPU (SM 10) → use_pow2_scale=True + # ------------------------------------------------------------------ + def test_blackwell_sets_pow2_scale(self): + """On SM 10 (Blackwell) the op must be called with use_pow2_scale=True.""" + num_experts, out, inp = 2, 128, 64 + stacked_w = paddle.zeros([num_experts, inp, out], dtype=paddle.float8_e4m3fn) + scale = paddle.ones([num_experts, 1, inp], dtype=paddle.float32) + + received = {} + + def fake_quant(expert_weight_list, use_pow2_scale, use_ue8m0_w, use_ue8m0_s): + received["use_pow2_scale"] = use_pow2_scale + return stacked_w, scale + + fake_ops = mock.MagicMock() + fake_ops.fuse_stack_transpose_fp8_quant = fake_quant + expert_weights = self._make_expert_weights(num_experts, out, inp) + + with mock.patch.object(fp8_utils_module, "paddlefleet_ops", fake_ops): + with mock.patch.object(fp8_utils_module, "get_sm_version", return_value=100): + with mock.patch.object(fp8_utils_module.current_platform, "is_cuda", return_value=True): + fp8_utils_module.fused_stack_transpose_quant(expert_weights, use_ue8m0=False) + + self.assertTrue(received["use_pow2_scale"]) + + # ------------------------------------------------------------------ + # Test: non-Blackwell GPU (SM 9) → use_pow2_scale=False + # ------------------------------------------------------------------ + def test_non_blackwell_no_pow2_scale(self): + """On SM < 10 the op must be called with use_pow2_scale=False.""" + num_experts, out, inp = 2, 128, 64 + stacked_w = paddle.zeros([num_experts, inp, out], dtype=paddle.float8_e4m3fn) + scale = paddle.ones([num_experts, 1, inp], dtype=paddle.float32) + + received = {} + + def fake_quant(expert_weight_list, use_pow2_scale, use_ue8m0_w, use_ue8m0_s): + received["use_pow2_scale"] = use_pow2_scale + return stacked_w, scale + + fake_ops = mock.MagicMock() + fake_ops.fuse_stack_transpose_fp8_quant = fake_quant + expert_weights = self._make_expert_weights(num_experts, out, inp) + + with mock.patch.object(fp8_utils_module, "paddlefleet_ops", fake_ops): + with mock.patch.object(fp8_utils_module, "get_sm_version", return_value=90): + with mock.patch.object(fp8_utils_module.current_platform, "is_cuda", return_value=True): + fp8_utils_module.fused_stack_transpose_quant(expert_weights, use_ue8m0=False) + + self.assertFalse(received["use_pow2_scale"]) + + # ------------------------------------------------------------------ + # Test: op receives the correct expert_weight_list argument + # ------------------------------------------------------------------ + def test_expert_weight_list_forwarded(self): + """The expert weight list must be passed as-is to the underlying op.""" + stacked_w = paddle.zeros([2, 64, 128], dtype=paddle.float8_e4m3fn) + scale = paddle.ones([2, 1, 64], dtype=paddle.float32) + received_list = {} + + def fake_quant(expert_weight_list, use_pow2_scale, use_ue8m0_w, use_ue8m0_s): + received_list["weights"] = expert_weight_list + return stacked_w, scale + + fake_ops = mock.MagicMock() + fake_ops.fuse_stack_transpose_fp8_quant = fake_quant + expert_weights = self._make_expert_weights(num_experts=2, out_features=128, in_features=64) + + with mock.patch.object(fp8_utils_module, "paddlefleet_ops", fake_ops): + with mock.patch.object(fp8_utils_module, "get_sm_version", return_value=90): + with mock.patch.object(fp8_utils_module.current_platform, "is_cuda", return_value=True): + fp8_utils_module.fused_stack_transpose_quant(expert_weights) + + self.assertIs(received_list["weights"], expert_weights) + + # ------------------------------------------------------------------ + # Test: ue8m0=True on Blackwell → both flags propagate correctly + # ------------------------------------------------------------------ + def test_ue8m0_and_blackwell_combined(self): + """use_ue8m0=True on Blackwell: pow2_scale=True and ue8m0 flags=True, scale transposed.""" + num_experts, out, inp = 2, 256, 128 + stacked_w = paddle.zeros([num_experts, inp, out], dtype=paddle.float8_e4m3fn) + scale = paddle.ones([num_experts * inp, out // 128], dtype=paddle.float32) + original_shape = list(scale.shape) + received = {} + + def fake_quant(expert_weight_list, use_pow2_scale, use_ue8m0_w, use_ue8m0_s): + received.update(use_pow2_scale=use_pow2_scale, use_ue8m0_w=use_ue8m0_w, use_ue8m0_s=use_ue8m0_s) + return stacked_w, scale + + fake_ops = mock.MagicMock() + fake_ops.fuse_stack_transpose_fp8_quant = fake_quant + expert_weights = self._make_expert_weights(num_experts, out, inp) + + with mock.patch.object(fp8_utils_module, "paddlefleet_ops", fake_ops): + with mock.patch.object(fp8_utils_module, "get_sm_version", return_value=100): + with mock.patch.object(fp8_utils_module.current_platform, "is_cuda", return_value=True): + w_out, scale_out = fp8_utils_module.fused_stack_transpose_quant(expert_weights, use_ue8m0=True) + + self.assertTrue(received["use_pow2_scale"]) + self.assertTrue(received["use_ue8m0_w"]) + self.assertTrue(received["use_ue8m0_s"]) + self.assertEqual(list(scale_out.shape), list(reversed(original_shape))) if __name__ == "__main__": diff --git a/tests/layers/test_fused_moe_triton_backend.py b/tests/layers/test_fused_moe_triton_backend.py new file mode 100644 index 00000000000..1140cf72b16 --- /dev/null +++ b/tests/layers/test_fused_moe_triton_backend.py @@ -0,0 +1,697 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +from __future__ import annotations + +import importlib +import sys +import types + +import paddle +import pytest + +if not hasattr(paddle, "compat"): + paddle.compat = types.SimpleNamespace(enable_torch_proxy=lambda scope=None: None) +if not hasattr(paddle.nn.functional, "swiglu"): + paddle.nn.functional.swiglu = lambda x: x + +from fastdeploy.model_executor.layers.moe import fused_moe_triton_backend as backend + + +class DummyQuantConfig: + def __init__(self, is_checkpoint_bf16=False, weight_block_size=(2, 2), name_value="wint8"): + self.is_checkpoint_bf16 = is_checkpoint_bf16 + self.weight_block_size = weight_block_size + self._name_value = name_value + self.deepgemm_scale_ue8m0 = False + + def name(self): + return self._name_value + + +class DummyQuantMethod: + def __init__(self, quant_config): + self.quant_config = quant_config + + +class DummyLoadConfig: + def __init__(self, load_choices="default_v1"): + self.load_choices = load_choices + self.dynamic_load_weight = False + + +class DummyFDConfig: + def __init__(self, load_choices="default_v1"): + self.load_config = DummyLoadConfig(load_choices) + self.model_config = types.SimpleNamespace(enable_cache=False) + + +class DummyGate(paddle.nn.Layer): + def __init__(self, num_experts): + super().__init__() + self.num_experts = num_experts + + def forward(self, x): + return paddle.ones([x.shape[0], self.num_experts], dtype="float32") + + +class DummyLayer(paddle.nn.Layer): + def __init__( + self, + quant_config, + num_local_experts=2, + hidden_size=4, + moe_intermediate_size=3, + top_k=2, + load_choices="default_v1", + weight_dtype="float16", + ): + super().__init__() + self.num_local_experts = num_local_experts + self.num_experts = num_local_experts + self.hidden_size = hidden_size + self.moe_intermediate_size = moe_intermediate_size + self.top_k = top_k + self.n_group = 1 + self.topk_group = 1 + self.routed_scaling_factor = 1.0 + self.renormalize = True + self.gate_correction_bias = paddle.zeros([num_local_experts], dtype="float32") + self.topk_method = "noaux_tc" + self.fd_config = DummyFDConfig(load_choices) + self.weight_dtype = weight_dtype + self.quant_method = DummyQuantMethod(quant_config) + self.weight_key_map = { + "up_gate_proj_expert_weight_scale_key": "up_gate_scale_{}", + "down_proj_expert_weight_scale_key": "down_proj_scale_{}", + "up_gate_proj_expert_in_scale_key": "up_gate_in_scale_{}", + "down_proj_expert_in_scale_key": "down_proj_in_scale_{}", + } + self._up_weights = None + self._down_weights = None + + def extract_moe_ffn_weights(self, state_dict): + return self._up_weights, self._down_weights, None, None + + +class DummyKernel: + def __init__(self): + self.calls = [] + + def __getitem__(self, grid): + def _runner(*args, **kwargs): + if len(args) > 2 and isinstance(args[2], paddle.Tensor): + args[2].set_value(paddle.zeros_like(args[2])) + self.calls.append({"grid": grid, "args": args, "kwargs": kwargs}) + + return _runner + + +@pytest.fixture(autouse=True) +def patch_float8(monkeypatch): + monkeypatch.setattr(paddle, "float8_e4m3fn", paddle.float16, raising=False) + return monkeypatch + + +@pytest.fixture() +def fake_ops(monkeypatch): + def fake_moe_topk_select(gate_out, gate_correction_bias, top_k, apply_norm_weight, use_softmax): + token_num = gate_out.shape[0] + topk_ids = paddle.zeros([token_num, top_k], dtype="int64") + topk_weights = paddle.ones([token_num, top_k], dtype="float32") + return topk_ids, topk_weights + + def fake_get_moe_scores(*args, **kwargs): + gate_out = args[0] + token_num = gate_out.shape[0] + top_k = args[3] + topk_ids = paddle.zeros([token_num, top_k], dtype="int64") + topk_weights = paddle.ones([token_num, top_k], dtype="float32") + return gate_out, topk_weights, topk_ids + + def fake_triton_preprocess(topk_ids, num_local_experts, block_size): + token_num = topk_ids.shape[0] + top_k = topk_ids.shape[1] + sorted_token_ids = paddle.arange(token_num * top_k, dtype="int32") + expert_ids = paddle.zeros_like(sorted_token_ids) + num_tokens_post_padded = paddle.to_tensor([token_num * top_k], dtype="int32") + return sorted_token_ids, expert_ids, num_tokens_post_padded + + def fake_scaled_fp8_quant(x, use_per_token_if_dynamic=True): + x_scale = paddle.ones([x.shape[0], 1], dtype="float32") + return x, x_scale + + def fake_hadamard_quant_fp8(x, scale, topk_ids, top_k, intermediate_size, tiled): + return x + + def fake_fp8_quant_blockwise(x, using_pow2_scale=False, output_scale_transpose=False): + scale = paddle.ones([x.shape[0], x.shape[1]], dtype="float32") + return x, scale + + monkeypatch.setattr( + backend.fastdeploy.model_executor.ops.gpu, + "moe_topk_select", + fake_moe_topk_select, + raising=False, + ) + monkeypatch.setattr(backend, "get_moe_scores", fake_get_moe_scores) + monkeypatch.setattr(backend, "tritonmoe_preprocess_func", fake_triton_preprocess, raising=False) + monkeypatch.setattr( + backend.fastdeploy.model_executor.ops.gpu, + "tritonmoe_preprocess_func", + fake_triton_preprocess, + raising=False, + ) + monkeypatch.setattr(backend, "scaled_fp8_quant", fake_scaled_fp8_quant) + monkeypatch.setattr( + backend.fastdeploy.model_executor.ops.gpu, + "moe_fused_hadamard_quant_fp8", + fake_hadamard_quant_fp8, + raising=False, + ) + monkeypatch.setattr(paddle.incubate.nn.functional, "fp8_quant_blockwise", fake_fp8_quant_blockwise, raising=False) + monkeypatch.setattr(paddle.incubate.nn.functional, "swiglu", lambda x: x, raising=False) + return monkeypatch + + +def _make_block_scale(weight_tensor, block_size): + return paddle.ones( + [ + (weight_tensor.shape[0] + block_size[0] - 1) // block_size[0], + (weight_tensor.shape[1] + block_size[1] - 1) // block_size[1], + ], + dtype="float32", + ) + + +class TestFusedMoeTritonBackend: + def test_backend_imports_kernel_module(self, monkeypatch): + kernel = DummyKernel() + monkeypatch.setattr( + backend.fastdeploy.model_executor.ops.gpu, + "tritonmoe_preprocess_func", + lambda *args, **kwargs: None, + raising=False, + ) + monkeypatch.setitem( + sys.modules, + "fastdeploy.model_executor.layers.moe.triton_moe_kernels", + types.SimpleNamespace(fused_moe_kernel_paddle=kernel), + ) + reloaded = importlib.reload(backend) + assert hasattr(reloaded, "fused_moe_kernel_paddle") + + def test_triton_weight_only_create_and_apply(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=False) + layer = DummyLayer(quant_config) + method = backend.TritonWeightOnlyMoEMethod(quant_config) + method.create_weights(layer, model_format="torch") + + layer._up_weights = [ + paddle.arange(layer.hidden_size * layer.moe_intermediate_size * 2, dtype="float32").reshape( + [layer.hidden_size, layer.moe_intermediate_size * 2] + ) + for _ in range(layer.num_local_experts) + ] + layer._down_weights = [ + paddle.arange(layer.moe_intermediate_size * layer.hidden_size, dtype="float32").reshape( + [layer.moe_intermediate_size, layer.hidden_size] + ) + for _ in range(layer.num_local_experts) + ] + method.process_loaded_weights(layer, state_dict={}) + + assert paddle.any(layer.up_gate_proj_weight_scale > 0) + + kernel = DummyKernel() + monkeypatch.setattr(backend, "fused_moe_kernel_paddle", kernel, raising=False) + + x = paddle.randn([2, layer.hidden_size], dtype="float32") + gate = DummyGate(layer.num_local_experts) + captured = {} + + def hook(topk_ids): + captured["topk_ids"] = topk_ids + + _ = method.apply(layer, x, gate, topk_ids_hookfunc=hook) + assert "topk_ids" in captured + + empty_out = method.apply(layer, paddle.zeros([0, layer.hidden_size], dtype="float32"), gate) + assert empty_out.shape == [0, layer.hidden_size] + + def test_triton_weight_only_prequant_and_bf16_create(self, fake_ops): + quant_config = DummyQuantConfig(is_checkpoint_bf16=True) + layer = DummyLayer(quant_config, weight_dtype="float32") + method = backend.TritonWeightOnlyMoEMethod(quant_config) + assert method.process_prequanted_weights(layer, state_dict={}) is None + + method.create_weights(layer, model_format="not_torch") + assert list(layer.up_gate_proj_weight.shape) == [ + layer.num_local_experts, + layer.hidden_size, + layer.moe_intermediate_size * 2, + ] + + def test_triton_weight_only_process_weights_after_loading_bf16(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=True) + layer = DummyLayer(quant_config, weight_dtype="float32") + method = backend.TritonWeightOnlyMoEMethod(quant_config) + method.create_weights(layer, model_format="torch") + method.model_format = "torch" + + monkeypatch.setattr(backend, "weight_fully_copied", lambda tensor: True) + transpose_calls = [] + monkeypatch.setattr(backend, "process_weight_transpose", lambda _layer, name: transpose_calls.append(name)) + monkeypatch.setattr(backend, "free_tensor", lambda tensor: None) + + method.process_weights_after_loading(layer) + + assert transpose_calls + + def test_triton_weight_only_process_weights_after_loading_return(self, fake_ops): + quant_config = DummyQuantConfig(is_checkpoint_bf16=False) + layer = DummyLayer(quant_config) + method = backend.TritonWeightOnlyMoEMethod(quant_config) + assert method.process_weights_after_loading(layer) is None + + def test_triton_weight_only_apply_aux_topk(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=False) + layer = DummyLayer(quant_config) + layer.topk_method = "aux" + method = backend.TritonWeightOnlyMoEMethod(quant_config) + method.create_weights(layer, model_format="torch") + + kernel = DummyKernel() + monkeypatch.setattr(backend, "fused_moe_kernel_paddle", kernel, raising=False) + + called = {} + + def hook(topk_ids): + called["ids"] = topk_ids + + _ = method.apply( + layer, + paddle.randn([1, layer.hidden_size], dtype="float32"), + DummyGate(layer.num_local_experts), + hook, + ) + assert "ids" in called + + def test_wfp8afp8_method_apply_paths(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=False) + layer = DummyLayer(quant_config) + layer.topk_method = "aux" + method = backend.Wfp8Afp8MoEMethod(quant_config) + method.create_weights(layer, model_format="torch") + + kernel = DummyKernel() + monkeypatch.setitem( + sys.modules, + "fastdeploy.model_executor.layers.moe.triton_moe_kernels", + types.SimpleNamespace(fused_moe_kernel_paddle=kernel), + ) + monkeypatch.setattr(backend, "fused_moe_kernel_paddle", kernel, raising=False) + + x = paddle.randn([1, layer.hidden_size], dtype="float32") + gate = DummyGate(layer.num_local_experts) + captured = {} + + def hook(topk_ids): + captured["ids"] = topk_ids + + _ = method.apply(layer, x, gate, topk_ids_hookfunc=hook) + assert "ids" in captured + + up_gate = [ + paddle.zeros([layer.moe_intermediate_size * 2, layer.hidden_size], dtype="float32") + for _ in range(layer.num_local_experts) + ] + down_proj = [ + paddle.zeros([layer.hidden_size, layer.moe_intermediate_size], dtype="float32") + for _ in range(layer.num_local_experts) + ] + method.check(layer, up_gate, down_proj) + + def test_wfp8afp8_prequant_raises(self, fake_ops): + quant_config = DummyQuantConfig(is_checkpoint_bf16=False) + layer = DummyLayer(quant_config) + method = backend.Wfp8Afp8MoEMethod(quant_config) + with pytest.raises(NotImplementedError): + method.process_prequanted_weights(layer, state_dict={}) + + def test_wfp8afp8_create_weights_bf16_branch(self, fake_ops): + quant_config = DummyQuantConfig(is_checkpoint_bf16=True) + layer = DummyLayer(quant_config, weight_dtype="float32") + method = backend.Wfp8Afp8MoEMethod(quant_config) + method.create_weights(layer, model_format="not_torch") + assert list(layer.down_proj_weight.shape) == [ + layer.num_local_experts, + layer.moe_intermediate_size, + layer.hidden_size, + ] + + def test_wfp8afp8_process_weights_after_loading_bf16(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=True) + layer = DummyLayer(quant_config, weight_dtype="float32") + method = backend.Wfp8Afp8MoEMethod(quant_config) + method.create_weights(layer, model_format="torch") + method.model_format = "torch" + + monkeypatch.setattr(backend, "weight_fully_copied", lambda tensor: False) + transpose_calls = [] + monkeypatch.setattr(backend, "process_weight_transpose", lambda _layer, name: transpose_calls.append(name)) + monkeypatch.setattr(backend, "free_tensor", lambda tensor: None) + + def fake_per_token_cast_to_fp8(weight): + return weight.cast(paddle.float16), paddle.ones([weight.shape[1], 1], dtype="float32") + + monkeypatch.setattr( + backend.fastdeploy.model_executor.layers.utils, "per_token_cast_to_fp8", fake_per_token_cast_to_fp8 + ) + + method.process_weights_after_loading(layer) + assert transpose_calls + + def test_wfp8afp8_apply_noaux_and_empty(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=False) + layer = DummyLayer(quant_config) + method = backend.Wfp8Afp8MoEMethod(quant_config) + method.create_weights(layer, model_format="torch") + + kernel = DummyKernel() + monkeypatch.setitem( + sys.modules, + "fastdeploy.model_executor.layers.moe.triton_moe_kernels", + types.SimpleNamespace(fused_moe_kernel_paddle=kernel), + ) + + _ = method.apply( + layer, paddle.randn([1, layer.hidden_size], dtype="float32"), DummyGate(layer.num_local_experts) + ) + empty_out = method.apply( + layer, paddle.zeros([0, layer.hidden_size], dtype="float32"), DummyGate(layer.num_local_experts) + ) + assert empty_out.shape == [0, layer.hidden_size] + + def test_tensorwise_prequant_and_apply(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=False) + layer = DummyLayer(quant_config) + method = backend.TensorWiseFP8MoEMethod(quant_method=DummyQuantMethod(quant_config)) + method.create_weights(layer) + + monkeypatch.setattr(backend, "get_tensor", lambda tensor: tensor) + + state_dict = {} + up_weight = paddle.ones([layer.hidden_size, layer.moe_intermediate_size * 2], dtype="float32") + down_weight = paddle.ones([layer.moe_intermediate_size, layer.hidden_size], dtype="float32") + layer._up_weights = [up_weight for _ in range(layer.num_local_experts)] + layer._down_weights = [down_weight for _ in range(layer.num_local_experts)] + monkeypatch.setattr(layer, "extract_moe_ffn_weights", lambda _state: (layer._up_weights, layer._down_weights)) + + for idx in range(layer.num_local_experts): + state_dict[f"up_gate_scale_{idx}"] = paddle.ones([1], dtype="float32") * (idx + 1) + state_dict[f"down_proj_scale_{idx}"] = paddle.ones([1], dtype="float32") * (idx + 2) + state_dict[f"up_gate_in_scale_{idx}"] = paddle.ones([1], dtype="float32") * (idx + 3) + state_dict[f"down_proj_in_scale_{idx}"] = paddle.ones([1], dtype="float32") * (idx + 4) + + method.process_prequanted_weights(layer, state_dict) + + assert paddle.all(layer.up_gate_proj_in_scale > 0) + + kernel = DummyKernel() + monkeypatch.setitem( + sys.modules, + "fastdeploy.model_executor.layers.moe.triton_moe_kernels", + types.SimpleNamespace(fused_moe_kernel_paddle=kernel), + ) + monkeypatch.setattr(backend, "fused_moe_kernel_paddle", kernel, raising=False) + + layer.topk_method = "aux" + x = paddle.randn([2, layer.hidden_size], dtype="float32") + gate = DummyGate(layer.num_local_experts) + called = {} + + def hook(topk_ids): + called["hooked"] = topk_ids + + _ = method.apply(layer, x, gate, topk_ids_hookfunc=hook) + assert "hooked" in called + + def test_python_op_fused_moe_kernel_paddle(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=False, weight_block_size=(2, 2)) + layer = DummyLayer(quant_config) + + kernel = DummyKernel() + monkeypatch.setitem( + sys.modules, + "fastdeploy.model_executor.layers.moe.triton_moe_kernels", + types.SimpleNamespace(fused_moe_kernel_paddle=kernel), + ) + monkeypatch.setattr( + paddle.static, + "MetaTensor", + lambda shape, dtype: types.SimpleNamespace(shape=shape, dtype=dtype), + raising=False, + ) + + x = paddle.randn([2, layer.hidden_size], dtype="float32") + gate = DummyGate(layer.num_local_experts) + gate_out = gate(x) + + layer_added_weight_attrs_0 = paddle.randn( + [layer.num_local_experts, layer.moe_intermediate_size * 2, layer.hidden_size], dtype="float32" + ) + layer_added_weight_attrs1 = paddle.randn( + [layer.num_local_experts, layer.hidden_size, layer.moe_intermediate_size], dtype="float32" + ) + layer_added_scale_attrs_0 = paddle.ones([layer.num_local_experts, 2, 2], dtype="float32") + layer_added_scale_attrs1 = paddle.ones([layer.num_local_experts, 2, 2], dtype="float32") + + captured = {} + + def hook(topk_ids): + captured["topk"] = topk_ids + + config = { + "BLOCK_SIZE_M": 32, + "BLOCK_SIZE_N": 64, + "BLOCK_SIZE_K": 64, + "GROUP_SIZE_M": 1, + } + + _ = backend.python_op_fused_moe_kernel_paddle( + x, + layer_added_weight_attrs_0, + layer_added_scale_attrs_0, + layer_added_weight_attrs1, + layer_added_scale_attrs1, + gate_out, + layer.gate_correction_bias, + layer.top_k, + layer_added_weight_attrs_0.shape[1], + layer_added_weight_attrs1.shape[1], + layer.num_local_experts, + layer.moe_intermediate_size, + layer.hidden_size, + config, + quant_config, + hook, + ) + + assert "topk" in captured + + meta = backend.python_op_fused_moe_kernel_paddle_infer_meta( + x, + layer_added_weight_attrs_0, + layer_added_scale_attrs_0, + layer_added_weight_attrs1, + layer_added_scale_attrs1, + gate_out, + layer.gate_correction_bias, + layer.top_k, + layer_added_weight_attrs_0.shape[1], + layer_added_weight_attrs1.shape[1], + layer.num_local_experts, + layer.moe_intermediate_size, + layer.hidden_size, + config, + quant_config, + None, + ) + + assert meta.shape == [2, layer.hidden_size] + + def test_blockwise_create_weights_and_process(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=False, weight_block_size=(2, 2)) + layer = DummyLayer(quant_config) + method = backend.BlockWiseFP8MoEMethod(quant_config) + method.create_weights(layer, model_format="not_torch") + + transpose_calls = [] + monkeypatch.setattr(backend, "process_weight_transpose", lambda _layer, name: transpose_calls.append(name)) + + method.process_weights_after_loading(layer) + assert transpose_calls + + up_weights = [ + paddle.arange(layer.hidden_size * layer.moe_intermediate_size * 2, dtype="float32").reshape( + [layer.hidden_size, layer.moe_intermediate_size * 2] + ) + for _ in range(layer.num_local_experts) + ] + down_weights = [ + paddle.arange(layer.moe_intermediate_size * layer.hidden_size, dtype="float32").reshape( + [layer.moe_intermediate_size, layer.hidden_size] + ) + for _ in range(layer.num_local_experts) + ] + layer._up_weights = up_weights + layer._down_weights = down_weights + + def fake_per_block_cast_to_fp8(weight, block_size): + return weight.cast(paddle.float16), _make_block_scale(weight.transpose([1, 0]), block_size) + + monkeypatch.setattr( + backend.fastdeploy.model_executor.layers.utils, "per_block_cast_to_fp8", fake_per_block_cast_to_fp8 + ) + + method.process_loaded_weights(layer, state_dict={}) + + assert paddle.any(layer.up_gate_proj_weight_scale_inv > 0) + + def test_blockwise_process_weights_after_loading_bf16(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=True, weight_block_size=(2, 2)) + layer = DummyLayer(quant_config) + method = backend.BlockWiseFP8MoEMethod(quant_config) + method.create_weights(layer, model_format="torch") + method.model_format = "torch" + + monkeypatch.setattr(backend, "weight_fully_copied", lambda tensor: False) + + def fake_per_block_cast_to_fp8(weight, block_size): + return weight.cast(paddle.float16), _make_block_scale(weight, block_size) + + monkeypatch.setattr( + backend.fastdeploy.model_executor.layers.utils, "per_block_cast_to_fp8", fake_per_block_cast_to_fp8 + ) + monkeypatch.setattr(backend, "free_tensor", lambda tensor: None) + + method.process_weights_after_loading(layer) + + if not hasattr(layer, "up_gate_proj_weight_scale_inv"): + layer.up_gate_proj_weight_scale_inv = layer.create_parameter( + shape=method.up_gate_proj_scale_shape, + dtype="float32", + default_initializer=paddle.nn.initializer.Constant(0), + ) + + def fake_python_op(*args, **kwargs): + token_num = args[0].shape[0] + hidden_size = args[12] + return paddle.zeros([token_num, hidden_size], dtype=args[0].dtype) + + monkeypatch.setattr(backend, "python_op_fused_moe_kernel_paddle", fake_python_op) + + x = paddle.randn([2, layer.hidden_size], dtype="float32") + gate = DummyGate(layer.num_local_experts) + out = method.apply(layer, x, gate) + assert out.shape == [2, layer.hidden_size] + + def test_blockwise_check_and_apply_empty(self, fake_ops, monkeypatch): + quant_config = DummyQuantConfig(is_checkpoint_bf16=False, weight_block_size=(2, 2)) + layer = DummyLayer(quant_config) + method = backend.BlockWiseFP8MoEMethod(quant_config) + method.create_weights(layer, model_format="torch") + + up_gate = [ + paddle.zeros([layer.hidden_size, layer.moe_intermediate_size * 2], dtype="float32") + for _ in range(layer.num_local_experts) + ] + down_proj = [ + paddle.zeros([layer.moe_intermediate_size, layer.hidden_size], dtype="float32") + for _ in range(layer.num_local_experts) + ] + method.check(layer, up_gate, down_proj) + + def fake_python_op(*args, **kwargs): + token_num = args[0].shape[0] + hidden_size = args[12] + return paddle.zeros([token_num, hidden_size], dtype=args[0].dtype) + + monkeypatch.setattr(backend, "python_op_fused_moe_kernel_paddle", fake_python_op) + + out = method.apply( + layer, paddle.zeros([0, layer.hidden_size], dtype="float32"), DummyGate(layer.num_local_experts) + ) + assert out.shape == [0, layer.hidden_size] + + def test_blockwise_process_weights_ue8m0_branch(self, fake_ops, monkeypatch): + """Test the quant_weight_ue8m0 branch in BlockWiseFP8MoEMethod.process_weights_after_loading.""" + quant_config = DummyQuantConfig(is_checkpoint_bf16=True, weight_block_size=(128, 128)) + quant_config.deepgemm_scale_ue8m0 = True + layer = DummyLayer(quant_config, weight_dtype="bfloat16") + method = backend.BlockWiseFP8MoEMethod(quant_config) + method.create_weights(layer, model_format="torch") + method.model_format = "torch" + + # Set FD_USE_PHI_FP8_QUANT to False to enter the target branch + monkeypatch.setattr(backend.fastdeploy.envs, "FD_USE_PHI_FP8_QUANT", False) + monkeypatch.setattr(backend, "weight_fully_copied", lambda tensor: True) + + # Mock quant_weight_ue8m0 and transform_scale_ue8m0 + quant_calls = [] + transform_calls = [] + + def fake_quant_weight_ue8m0(weight_dequant, weight_block_size): + quant_calls.append({"weight_shape": weight_dequant.shape, "block_size": weight_block_size}) + # Return fake quantized weight and scale + n, k = weight_dequant.shape[-2], weight_dequant.shape[-1] + out_w = paddle.zeros(weight_dequant.shape, dtype=paddle.float8_e4m3fn) + out_s = paddle.ones([n, (k + 127) // 128], dtype="float32") + return out_w, out_s + + def fake_transform_scale_ue8m0(sf, mn, weight_block_size=None): + transform_calls.append({"sf_shape": sf.shape, "mn": mn, "block_size": weight_block_size}) + # Return fake transformed scale + return paddle.ones([sf.shape[0], sf.shape[1], 1], dtype="uint8") + + monkeypatch.setattr(backend, "quant_weight_ue8m0", fake_quant_weight_ue8m0) + monkeypatch.setattr(backend, "transform_scale_ue8m0", fake_transform_scale_ue8m0) + monkeypatch.setattr(backend, "free_tensor", lambda tensor: None) + monkeypatch.setattr(backend, "process_weight_transpose", lambda _layer, name: None) + + # Create unquantized weights for the layer + num_experts = layer.num_local_experts + hidden_size = layer.hidden_size + moe_intermediate_size = layer.moe_intermediate_size + + # Create weight attributes that the method expects + layer.up_gate_proj_weight = layer.create_parameter( + shape=[num_experts, moe_intermediate_size * 2, hidden_size], + dtype="bfloat16", + default_initializer=paddle.nn.initializer.Constant(0), + ) + layer.down_proj_weight = layer.create_parameter( + shape=[num_experts, hidden_size, moe_intermediate_size], + dtype="bfloat16", + default_initializer=paddle.nn.initializer.Constant(0), + ) + + method.process_weights_after_loading(layer) + + # Verify the quant_weight_ue8m0 branch was executed + assert len(quant_calls) > 0, "quant_weight_ue8m0 should have been called" + assert len(transform_calls) > 0, "transform_scale_ue8m0 should have been called" diff --git a/tests/operators/test_noaux_tc_redundant.py b/tests/operators/test_noaux_tc_redundant.py index 1afa24aabb8..60d1aad2a22 100644 --- a/tests/operators/test_noaux_tc_redundant.py +++ b/tests/operators/test_noaux_tc_redundant.py @@ -2,6 +2,9 @@ import paddle +from fastdeploy.model_executor.layers.moe.fused_moe_deepgemm_backend import ( + moe_topk_select, +) from fastdeploy.model_executor.layers.moe.moe import get_moe_scores @@ -106,6 +109,57 @@ def test_group_topk(self): print(f"topk_idx = {topk_idx}") assert equal_topk_value and equal_topk_ids + def test_group_topk_using_phi_topk(self): + + renormalize = True + + test_cases = [ + # (num_experts, n_group, topk_group, top_k, routed_scaling_factor) + (128, 1, 1, 8, 1.0), # glm45-air + (256, 8, 4, 8, 2.5), # deepseek + ] + + for case_tuple in test_cases: + num_experts, n_group, topk_group, top_k, routed_scaling_factor = case_tuple + for num_tokens in [1, 32, 64, 128]: + gating_output = paddle.rand([num_tokens, num_experts]) + e_score_correction_bias = paddle.rand([1, num_experts]) + + ref_topk_values, ref_topk_idx = self.native_group_topk( + gating_output=gating_output, + topk=top_k, + renormalize=renormalize, + num_expert_group=n_group, + topk_group=topk_group, + routed_scaling_factor=routed_scaling_factor, + e_score_correction_bias=e_score_correction_bias, + ) + + topk_values, topk_idx = moe_topk_select( + gating_output=gating_output, + n_group=n_group, + topk_group=topk_group, + top_k=top_k, + routed_scaling_factor=routed_scaling_factor, + e_score_correction_bias=e_score_correction_bias, + renormalize=renormalize, + ) + + equal_topk_value = paddle.allclose(topk_values, ref_topk_values, atol=1e-03, rtol=1e-03).item() + equal_topk_ids = paddle.allclose( + topk_idx.cast("int32"), ref_topk_idx.cast("int32"), atol=0.0, rtol=0.0 + ).item() + print( + f"Test Case[{case_tuple}], num_tokens = {num_tokens}, equal_topk_value: {equal_topk_value}, equal_topk_ids: {equal_topk_ids}" + ) + if not equal_topk_value: + print(f"ref_topk_values = {ref_topk_values}") + print(f"topk_values = {topk_values}") + if not equal_topk_ids: + print(f"ref_topk_idx = {ref_topk_idx}") + print(f"topk_idx = {topk_idx}") + assert equal_topk_value and equal_topk_ids + if __name__ == "__main__": unittest.main()