[Feature] Support MegaMoE#7943
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
该 PR 旨在为 FastDeploy 的 MoE 路径引入 MegaMoE 支持:新增一个用于 pre-dispatch/量化准备的 CUDA 自定义算子,并在 DeepGEMM MoE backend 中接入对应的 buffer 与执行流程,同时补充算子级别的单测。
Changes:
- 新增
mega_moe_pre_dispatchCUDA 自定义算子,并在 custom ops 构建脚本中按架构条件编译进包。 - 在
DeepGemmFusedMoeMethod中新增 MegaMoE 相关 buffer 初始化、权重量化布局处理与apply_mage_moe执行路径。 - 增加
tests/operators/test_mega_moe_pre_dispatch.py用于校验 pre-dispatch 输出正确性。
另外:PR 标题格式符合要求,但当前 PR 描述的 Motivation/Modifications/Usage/Accuracy Tests 均未补全,建议补充(尤其是启用方式如环境变量、适用硬件/算子依赖与精度结果)。
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
tests/operators/test_mega_moe_pre_dispatch.py |
新增 MegaMoE pre-dispatch 单测(当前存在导入与分布式初始化可用性问题) |
fastdeploy/model_executor/layers/quantization/fp8_utils.py |
增加权重交织与 scale layout 变换辅助函数以适配 MegaMoE 权重格式 |
fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py |
DeepGEMM MoE backend 接入 MegaMoE:buffer、量化流程与执行入口 |
fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py |
在基类 apply 中根据环境变量切换到 MegaMoE 路径 |
fastdeploy/envs.py |
新增环境变量 FD_ENABLE_MAGE_MOE 开关 |
custom_ops/setup_ops.py |
将 mega_moe_pre_dispatch.cu 加入 SM100/103 构建源文件 |
custom_ops/gpu_ops/mega_moe_pre_dispatch.cu |
新增 MegaMoE pre-dispatch CUDA Kernel 与静态算子注册 |
| if layer.fd_config.scheduler_config.splitwise_role == "mixed" and is_moe_start_layer: | ||
| self.ep_decoder_runner.clean_low_latency_buffer() | ||
| return self.apply_ep_decode( | ||
| if envs.FD_ENABLE_MAGE_MOE: |
| from fastdeploy.model_executor.ops.gpu import ( | ||
| count_tokens_per_expert_func, | ||
| depermute_prefill_combine, | ||
| prefill_permute_to_masked_gemm, | ||
| mega_moe_pre_dispatch, | ||
| ) |
| # 1. Select topk experts and weights. | ||
| topk_idx, topk_weights = self.moe_select(layer, gate_out) | ||
|
|
||
| mega_moe_pre_dispatch( | ||
| x, | ||
| topk_idx, | ||
| topk_weights, | ||
| self.mega_moe_buffer.x, | ||
| self.mega_moe_buffer.x_sf, | ||
| self.mega_moe_buffer.topk_idx, | ||
| self.mega_moe_buffer.topk_weights, | ||
| self.num_max_tokens_per_rank, | ||
| 32, # group_size | ||
| ) |
| return ffn_out | ||
|
|
||
|
|
||
| @singleton |
| # Whether enable mega moe | ||
| "FD_ENABLE_MAGE_MOE": lambda: bool(int(os.getenv("FD_ENABLE_MAGE_MOE", "0"))), |
| from ernie5_serving.mm_custom_ops import mega_moe_pre_dispatch | ||
| from fastdeploy.model_executor.layers.moe.fused_moe_deepgemm_backend import MegaMoEBuffer |
| @classmethod | ||
| def setUpClass(cls): | ||
| paddle.seed(2025) | ||
| strategy = fleet.DistributedStrategy() | ||
| cls.expert_parallel_size = 8 | ||
| strategy.hybrid_configs = { | ||
| "dp_degree": 1, | ||
| "mp_degree": cls.expert_parallel_size, | ||
| "pp_degree": 1, | ||
| "sharding_degree": 1, | ||
| } | ||
| fleet.init(is_collective=True, strategy=strategy) | ||
| cls.ep_group = dist.new_group(range(cls.expert_parallel_size)) | ||
|
|
| self.x = paddle.randn([self.num_tokens, self.hidden_size], dtype=paddle.bfloat16) | ||
| scores = paddle.randn((self.num_tokens, self.num_experts), dtype=paddle.float32) | ||
| self.topk_weights, self.topk_idx = paddle.topk(scores, self.top_k, axis=-1, largest=True, sorted=False) | ||
| self.topk_idx = self.topk_idx.astype("int32") |
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览当前 Required 未全部通过:4 个 required 任务失败,0 个 required 任务等待/运行。建议优先处理 Approval、代码格式、MegaMoE 单测导入/算子导出问题;CE 超时可在修复阻塞问题后先 rerun 验证。
2 任务状态汇总日志列说明:失败任务直接使用 CI 日志链接;可选任务不阻塞合并,仅供参考。 2.1 Required任务 : 6/10 通过
2.2 可选任务 — 27/31 通过
3 失败详情(仅 required)Approval — 人工审批(置信度: 高)该 Job 需要人工 Approval,完成审批后 CI 才会继续执行。 Pre Commit — 代码规范(置信度: 高)Pre Commit
根因详情: 关键日志: -
+
- 32, # group_size
+ 32, # group_size
- raise ValueError(
- f"MegaMoE buffer capacity exceeded: num_tokens={num_tokens}, capacity={buffer_capacity}"
- )
+ raise ValueError(f"MegaMoE buffer capacity exceeded: num_tokens={num_tokens}, capacity={buffer_capacity}")修复建议:
修复建议摘要: 运行pre-commit并提交格式化结果 关联变更: Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage — 测试失败(置信度: 高)Run FastDeploy Unit Tests and Coverage / run_tests_with_coverage
失败用例:
根因详情: 关键日志: 修复建议:
修复建议摘要: 编译/导出op并修正测试导入 关联变更: Extracted partial CE model tasks to run in CI. / run_ce_cases — 超时(置信度: 中)Extracted partial CE model tasks to run in CI. / run_ce_cases
失败用例:
根因详情: 关键日志: 修复建议:
修复建议摘要: 先rerun,若复现再缩短输出 关联变更: 未发现与本 PR MegaMoE 变更的直接代码关联;失败用例为既有 |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-27 19:48:08
📋 Review 摘要
PR 概述:新增 MegaMoE 执行路径,面向 SM100(Blackwell)GPU,基于 FP8 pre-dispatch + FP4 权重量化实现高效 EP MoE 推理。
变更范围:custom_ops/gpu_ops/、fastdeploy/envs.py、model_executor/layers/moe/、model_executor/layers/quantization/fp8_utils.py、tests/operators/
影响面 Tag:[Feature] [OP] [Quantization]
问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | fused_moe_backend_base.py:234 |
apply_mage_moe 在基类中调用但只在 DeepGEMM 子类中定义,其他 backend 启用 env var 会崩溃 |
| 🔴 Bug | test_mega_moe_pre_dispatch.py:24 |
依赖内部包 ernie5_serving,公开仓库无法运行,单测实际失效 |
| 🟡 建议 | fastdeploy/envs.py:301 |
env var 命名 FD_ENABLE_MAGE_MOE 与特性名 "MegaMoE" 拼写不一致(MAGE vs MEGA) |
| 🟡 建议 | fused_moe_deepgemm_backend.py |
apply_mage_moe 中 buffer 溢出检查在 mega_moe_pre_dispatch 调用之后,逻辑顺序颠倒 |
| 🟡 建议 | fp8_utils.py:_transpose_sf_for_utccp |
assert 用于运行时输入校验,Python -O 下静默失效 |
🔴 Bug 1:apply_mage_moe 仅定义于子类,基类调用会导致其他 backend 崩溃
位置:fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py,apply() 方法内
问题:MoEMethodBase.apply() 在 if envs.FD_ENABLE_MAGE_MOE: 分支中直接调用 self.apply_mage_moe(...),但该方法只在 DeepGemmFusedMoeMethod 中定义。若 cutlass / triton / blackwell 等其他 MoE backend 实例 ep_size > 1 且用户设置了 FD_ENABLE_MAGE_MOE=1,将触发:
AttributeError: 'XxxMoeMethod' object has no attribute 'apply_mage_moe'
修复建议:在 MoEMethodBase 中添加默认实现:
def apply_mage_moe(self, layer, x, gate, topk_ids_hookfunc, shared_experts, fc1_latent_proj, fc2_latent_proj):
raise NotImplementedError(
f"{type(self).__name__} does not support MegaMoE. "
"Please use DeepGemmFusedMoeMethod or unset FD_ENABLE_MAGE_MOE."
)或将 FD_ENABLE_MAGE_MOE 判断移入 DeepGemmFusedMoeMethod.apply() 覆写中,避免基类直接调用未定义方法。
🔴 Bug 2:单测依赖内部包 ernie5_serving
位置:tests/operators/test_mega_moe_pre_dispatch.py:24
from ernie5_serving.mm_custom_ops import mega_moe_pre_dispatch # 内部包,公开仓库不可用ernie5_serving 是百度内部服务包,不在 FastDeploy 公开仓库中,任何外部贡献者或 CI 环境(无内部依赖)运行此测试都会报 ModuleNotFoundError,导致单测实际失效。
修复建议:
# 改为:
from fastdeploy.model_executor.ops.gpu import mega_moe_pre_dispatch🟡 建议 1:env var 命名拼写与特性名不一致(MAGE vs MEGA)
位置:fastdeploy/envs.py
"FD_ENABLE_MAGE_MOE": lambda: bool(int(os.getenv("FD_ENABLE_MAGE_MOE", "0"))),特性名称为 MegaMoE,注释也写 "mega moe",但环境变量和函数名(apply_mage_moe)全部使用 MAGE。用户按特性名猜测 FD_ENABLE_MEGA_MOE=1 将无效且无错误提示。建议统一改为 FD_ENABLE_MEGA_MOE 并同步所有引用。
🟡 建议 2:buffer 溢出检查顺序颠倒
位置:fused_moe_deepgemm_backend.py,apply_mage_moe 方法
mega_moe_pre_dispatch(...) # 先调用 dispatch
buffer_capacity = ...
if num_tokens > buffer_capacity: # 检查在后,实际上是死代码(C++ 侧已先 PD_CHECK)
raise ValueError(...)Python 侧防御检查应在 dispatch 之前执行:
buffer_capacity = self.mega_moe_buffer.x.shape[0]
if num_tokens > buffer_capacity:
raise ValueError(f"MegaMoE buffer capacity exceeded: ...")
mega_moe_pre_dispatch(...)🟡 建议 3:assert 用于运行时校验
位置:fastdeploy/model_executor/layers/quantization/fp8_utils.py,_transpose_sf_for_utccp
assert sf.dtype == paddle.int and mn % 128 == 0assert 在 Python -O 优化模式下被移除,导致非法输入静默进入后续 reshape,产生难以排查的错误。建议改为:
if sf.dtype != paddle.int32:
raise ValueError(f"sf.dtype must be paddle.int32, got {sf.dtype}")
if mn % 128 != 0:
raise ValueError(f"mn must be a multiple of 128, got {mn}")📝 PR 规范检查
PR 描述中 ## Motivation、## Modifications、## Usage or Command、## Accuracy Tests 均为空(仅保留了模板占位注释),Checklist 全部未勾选,不符合模板要求。
标题建议(可直接复制):
[Feature] Support MegaMoE for Blackwell SM100 with FP8 pre-dispatch and FP4 weights
PR 描述建议(点击展开,可直接复制)
## Motivation
为 Blackwell(SM100)GPU 增加 MegaMoE 执行路径,基于对称通信 buffer + FP8 输入量化(per-group UE8M0)+ FP4 权重量化,通过 `deep_gemm.fp8_fp4_mega_moe` 完成 EP MoE 前向,替代原有基于 DeepEP 的 prefill/decode 双路径。通过环境变量 `FD_ENABLE_MAGE_MOE=1` 启用。
## Modifications
- `custom_ops/gpu_ops/mega_moe_pre_dispatch.cu`:新增 CUDA kernel,将 BF16 输入量化为 FP8(per-group UE8M0 scale)并写入对称通信 buffer,同时将 topk_idx/topk_weights 复制至 buffer
- `custom_ops/setup_ops.py`:将 `mega_moe_pre_dispatch.cu` 加入 SM100 编译分支
- `fastdeploy/envs.py`:新增 `FD_ENABLE_MAGE_MOE` 环境变量开关(默认关闭)
- `fastdeploy/model_executor/layers/moe/fused_moe_backend_base.py`:`MoEMethodBase.apply()` 中增加 `FD_ENABLE_MAGE_MOE` 分支,路由至 `apply_mage_moe`
- `fastdeploy/model_executor/layers/moe/fused_moe_deepgemm_backend.py`:`DeepGemmFusedMoeMethod` 新增 `init_ep`(初始化 MegaMoE buffer)、`process_weights_after_loading`(FP4 权重量化)、`moe_select`、`apply_mage_moe` 方法;新增 `MegaMoEBuffer` singleton 管理 buffer 生命周期
- `fastdeploy/model_executor/layers/quantization/fp8_utils.py`:新增 `_interleave_weights`、`_transpose_sf_for_utccp` 辅助函数,用于 FP4 权重 layout 转换
- `tests/operators/test_mega_moe_pre_dispatch.py`:新增 `mega_moe_pre_dispatch` 算子单测
## Usage or Command
```bash
FD_ENABLE_MAGE_MOE=1 python -m fastdeploy.entrypoints.openai.api_server \
--model <model_path> --tensor-parallel-size 1 --expert-parallel-size 8
```
## Accuracy Tests
N/A(本 PR 为新特性,未提供精度对比数据;建议补充与原 EP prefill/decode 路径的输出一致性测试)
## Checklist
- [x] Add at least a tag in the PR title.
- Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
- You can add new tags based on the PR content, but the semantics must be clear.
- [ ] Format your code, run `pre-commit` before commit.
- [x] Add unit tests. Please write the reason in this PR if no unit tests.
- [ ] Provide accuracy results.
- [ ] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.总体评价
整体实现思路清晰,CUDA kernel 结构规范,MegaMoE buffer 生命周期管理采用 singleton 模式合理。但有两处阻塞性问题需要修复:基类调用未定义方法会导致其他 backend 崩溃,以及单测依赖内部包导致无法在公开仓库运行。建议同时修复 env var 拼写(MAGE → MEGA)和 buffer 检查顺序。
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## develop #7943 +/- ##
==========================================
Coverage ? 63.49%
==========================================
Files ? 467
Lines ? 65155
Branches ? 9989
==========================================
Hits ? 41368
Misses ? 21014
Partials ? 2773
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Motivation
Modifications
Usage or Command
Accuracy Tests
Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.