-
Notifications
You must be signed in to change notification settings - Fork 318
feat: refactor kv buffer #1265
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: refactor kv buffer #1265
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| from .kv_buffer import KvBuffer | ||
| from .quant_kv_buffer import QuantKvBuffer, PPLInt4QuantKvBuffer, PPLInt8QuantKvBuffer | ||
|
|
||
| __all__ = [ | ||
| "KvBuffer", | ||
| "QuantKvBuffer", | ||
| "PPLInt4QuantKvBuffer", | ||
| "PPLInt8QuantKvBuffer", | ||
| ] |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,95 @@ | ||||||||||||||||||||||||||||||||||||||||
| from typing import Any, List, Optional | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| import torch | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| from lightllm.common.mamba_cache_mem_manager.cache_manager import MambaCacheManager | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| from .kv_buffer import KvBuffer | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| class HybridKvBuffer(KvBuffer): | ||||||||||||||||||||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||||||||||||||||||||
| self, | ||||||||||||||||||||||||||||||||||||||||
| buffers: List[Optional[torch.Tensor]], | ||||||||||||||||||||||||||||||||||||||||
| head_num: int, | ||||||||||||||||||||||||||||||||||||||||
| full_attention_interval: int, | ||||||||||||||||||||||||||||||||||||||||
| mamba_cache_size: int, | ||||||||||||||||||||||||||||||||||||||||
| linear_attn_layer_num: int, | ||||||||||||||||||||||||||||||||||||||||
| conv_state_dtype: torch.dtype, | ||||||||||||||||||||||||||||||||||||||||
| ssm_state_dtype: torch.dtype, | ||||||||||||||||||||||||||||||||||||||||
| conv_kernel_size: int, | ||||||||||||||||||||||||||||||||||||||||
| num_linear_k_heads: int, | ||||||||||||||||||||||||||||||||||||||||
| num_linear_v_heads: int, | ||||||||||||||||||||||||||||||||||||||||
| head_linear_k_dim: int, | ||||||||||||||||||||||||||||||||||||||||
| head_linear_v_dim: int, | ||||||||||||||||||||||||||||||||||||||||
| ): | ||||||||||||||||||||||||||||||||||||||||
| self._buffers = buffers | ||||||||||||||||||||||||||||||||||||||||
| self._head_num = head_num | ||||||||||||||||||||||||||||||||||||||||
| self._full_attention_interval = full_attention_interval | ||||||||||||||||||||||||||||||||||||||||
| self.mamba_cache_manager = MambaCacheManager( | ||||||||||||||||||||||||||||||||||||||||
| size=mamba_cache_size, | ||||||||||||||||||||||||||||||||||||||||
| layer_num=linear_attn_layer_num, | ||||||||||||||||||||||||||||||||||||||||
| conv_state_dtype=conv_state_dtype, | ||||||||||||||||||||||||||||||||||||||||
| ssm_state_dtype=ssm_state_dtype, | ||||||||||||||||||||||||||||||||||||||||
| conv_kernel_size=conv_kernel_size, | ||||||||||||||||||||||||||||||||||||||||
| num_linear_k_heads=num_linear_k_heads, | ||||||||||||||||||||||||||||||||||||||||
| num_linear_v_heads=num_linear_v_heads, | ||||||||||||||||||||||||||||||||||||||||
| head_linear_k_dim=head_linear_k_dim, | ||||||||||||||||||||||||||||||||||||||||
| head_linear_v_dim=head_linear_v_dim, | ||||||||||||||||||||||||||||||||||||||||
| ) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def create_adapter(self): | ||||||||||||||||||||||||||||||||||||||||
| from .hybrid_kv_buffer_adapter import HybridKvBufferAdapter | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| return HybridKvBufferAdapter(self) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def get_mamba_cache(self, layer_idx: int): | ||||||||||||||||||||||||||||||||||||||||
| layer_idx_in_linear = layer_idx - (layer_idx // self._full_attention_interval) | ||||||||||||||||||||||||||||||||||||||||
| return self.mamba_cache_manager.get_mamba_cache(layer_idx_in_linear) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def __getitem__(self, item): | ||||||||||||||||||||||||||||||||||||||||
| return self._buffers[item] | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def copy_kv_to_mem_manager(self, layer_index: int, mem_index: torch.Tensor, kv: torch.Tensor) -> None: | ||||||||||||||||||||||||||||||||||||||||
| from lightllm.common.basemodel.triton_kernel.destindex_copy_kv import destindex_copy_kv | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| layer_buffer = self._buffers[layer_index] | ||||||||||||||||||||||||||||||||||||||||
| if layer_buffer is None: | ||||||||||||||||||||||||||||||||||||||||
| raise RuntimeError(f"layer {layer_index} does not have kv cache storage") | ||||||||||||||||||||||||||||||||||||||||
| destindex_copy_kv(kv, mem_index, layer_buffer) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def get_att_input_params(self, layer_index: int) -> Any: | ||||||||||||||||||||||||||||||||||||||||
| layer_buffer = self._buffers[layer_index] | ||||||||||||||||||||||||||||||||||||||||
| if layer_buffer is None: | ||||||||||||||||||||||||||||||||||||||||
| raise RuntimeError(f"layer {layer_index} does not have kv cache storage") | ||||||||||||||||||||||||||||||||||||||||
| k = layer_buffer[:, : self._head_num, :] | ||||||||||||||||||||||||||||||||||||||||
| v = layer_buffer[:, self._head_num :, :] | ||||||||||||||||||||||||||||||||||||||||
| return k, v | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def get_index_kv_buffer(self, index: Any) -> dict: | ||||||||||||||||||||||||||||||||||||||||
| return {"kv_buffer": [None if layer_buffer is None else layer_buffer[index] for layer_buffer in self._buffers]} | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def load_index_kv_buffer(self, index: Any, payload: dict) -> None: | ||||||||||||||||||||||||||||||||||||||||
| for layer_index, layer_payload in enumerate(payload["kv_buffer"]): | ||||||||||||||||||||||||||||||||||||||||
| if layer_payload is None: | ||||||||||||||||||||||||||||||||||||||||
| continue | ||||||||||||||||||||||||||||||||||||||||
| layer_buffer = self._buffers[layer_index] | ||||||||||||||||||||||||||||||||||||||||
| if layer_buffer is None: | ||||||||||||||||||||||||||||||||||||||||
| raise RuntimeError(f"layer {layer_index} does not have kv cache storage") | ||||||||||||||||||||||||||||||||||||||||
| layer_buffer[index].copy_(layer_payload) | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def get_device(self) -> int: | ||||||||||||||||||||||||||||||||||||||||
| for layer_buffer in self._buffers: | ||||||||||||||||||||||||||||||||||||||||
| if layer_buffer is not None: | ||||||||||||||||||||||||||||||||||||||||
| return layer_buffer.get_device() | ||||||||||||||||||||||||||||||||||||||||
| raise RuntimeError("HybridKvBuffer does not contain any kv cache tensor") | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| def find_layer_index(self, k: torch.Tensor, v: torch.Tensor) -> int: | ||||||||||||||||||||||||||||||||||||||||
| key = min(k.data_ptr(), v.data_ptr()) | ||||||||||||||||||||||||||||||||||||||||
| find_dict = { | ||||||||||||||||||||||||||||||||||||||||
| layer_buffer.data_ptr(): layer_index | ||||||||||||||||||||||||||||||||||||||||
| for layer_index, layer_buffer in enumerate(self._buffers) | ||||||||||||||||||||||||||||||||||||||||
| if layer_buffer is not None | ||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||
| assert key in find_dict | ||||||||||||||||||||||||||||||||||||||||
| return find_dict[key] | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+87
to
+95
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar to
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| from typing import Optional | ||
|
|
||
| import torch | ||
|
|
||
| from .hybrid_kv_buffer import HybridKvBuffer | ||
| from .kv_buffer_adapter import KvBufferAdapter | ||
|
|
||
|
|
||
| class HybridKvBufferAdapter(KvBufferAdapter): | ||
| def __init__(self, kv_buffer: HybridKvBuffer): | ||
| super().__init__(kv_buffer) | ||
|
|
||
| def write_to_page_buffer( | ||
| self, mem_indexes: torch.Tensor, page_tensor: torch.Tensor, tp_index: int, tp_world_size: int | ||
| ) -> None: | ||
| raise NotImplementedError(f"{self.__class__.__name__} does not support paged kv write") | ||
|
|
||
| def read_from_page_buffer( | ||
| self, mem_indexes: torch.Tensor, page_tensor: torch.Tensor, tp_index: int, tp_world_size: int | ||
| ) -> None: | ||
| raise NotImplementedError(f"{self.__class__.__name__} does not support paged kv read") | ||
|
|
||
| def write_from_mla_page_buffer(self, mem_indexes: torch.Tensor, page_tensor: torch.Tensor) -> None: | ||
| raise NotImplementedError(f"{self.__class__.__name__} does not support mla paged kv write") | ||
|
|
||
| def read_from_mla_page_buffer(self, mem_indexes: torch.Tensor, page_tensor: torch.Tensor) -> None: | ||
| raise NotImplementedError(f"{self.__class__.__name__} does not support mla paged kv read") | ||
|
|
||
| def load_from_cpu_cache( | ||
| self, | ||
| gpu_mem_indexes: torch.Tensor, | ||
| cpu_kv_cache: torch.Tensor, | ||
| cpu_kv_cache_scale: Optional[torch.Tensor], | ||
| page_indexes: torch.Tensor, | ||
| tp_index: int, | ||
| tp_world_size: int, | ||
| grid_num: int, | ||
| ) -> None: | ||
| raise NotImplementedError(f"{self.__class__.__name__} does not support cpu cache load") | ||
|
|
||
| def offload_to_cpu_cache( | ||
| self, | ||
| token_indexes: torch.Tensor, | ||
| cpu_kv_cache: torch.Tensor, | ||
| cpu_kv_cache_scale: Optional[torch.Tensor], | ||
| page_indexes: torch.Tensor, | ||
| page_readies: torch.Tensor, | ||
| tp_index: int, | ||
| tp_world_size: int, | ||
| grid_num: int, | ||
| ) -> None: | ||
| raise NotImplementedError(f"{self.__class__.__name__} does not support cpu cache offload") | ||
|
|
||
| def copy_kv_from_other_dp_ranks( | ||
| self, | ||
| mem_managers, | ||
| move_token_indexes: torch.Tensor, | ||
| token_dp_indexes: torch.Tensor, | ||
| mem_indexes: torch.Tensor, | ||
| dp_size_in_node: int, | ||
| rank_in_dp: int, | ||
| ) -> None: | ||
| raise NotImplementedError(f"{self.__class__.__name__} does not support dp kv copy") |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,65 @@ | ||||||||||||||||||||||||
| from typing import Any, Optional | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| import torch | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| class KvBuffer: | ||||||||||||||||||||||||
| """KV cache 的数据封装类。 | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| 这个类的职责是管理 kv buffer 本身的存储与访问语义,关注点是 | ||||||||||||||||||||||||
| "这块缓存里存了什么、怎么按层读写、怎么导入导出"。 | ||||||||||||||||||||||||
| 因此这里的方法应当主要围绕 kv buffer 自身的数据操作展开, | ||||||||||||||||||||||||
| 不承载 page io、cpu cache、dp 传输这类业务流程逻辑。 | ||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def __init__(self, buffer: torch.Tensor, head_num: int): | ||||||||||||||||||||||||
| self._buffer = buffer | ||||||||||||||||||||||||
| self._head_num = head_num | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def create_adapter(self): | ||||||||||||||||||||||||
| # 业务逻辑由 adapter 承接,KvBuffer 只负责提供底层存储对象。 | ||||||||||||||||||||||||
| from .kv_buffer_adapter import KvBufferAdapter | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| return KvBufferAdapter(self) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def __getitem__(self, item): | ||||||||||||||||||||||||
| return self._buffer[item] | ||||||||||||||||||||||||
|
Comment on lines
+25
to
+26
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| @property | ||||||||||||||||||||||||
| def shape(self): | ||||||||||||||||||||||||
| return self._buffer.shape | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def get_storage_tensor(self) -> torch.Tensor: | ||||||||||||||||||||||||
| return self._buffer | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def get_storage_data_ptr(self) -> int: | ||||||||||||||||||||||||
| return self._buffer.data_ptr() | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def get_scale_buffer(self) -> Optional[torch.Tensor]: | ||||||||||||||||||||||||
| return None | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def copy_kv_to_mem_manager(self, layer_index: int, mem_index: torch.Tensor, kv: torch.Tensor) -> None: | ||||||||||||||||||||||||
| from lightllm.common.basemodel.triton_kernel.destindex_copy_kv import destindex_copy_kv | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| destindex_copy_kv(kv, mem_index, self._buffer[layer_index]) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def get_att_input_params(self, layer_index: int) -> Any: | ||||||||||||||||||||||||
| layer_buffer = self._buffer[layer_index] | ||||||||||||||||||||||||
| k = layer_buffer[:, : self._head_num, :] | ||||||||||||||||||||||||
| v = layer_buffer[:, self._head_num :, :] | ||||||||||||||||||||||||
| return k, v | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def get_index_kv_buffer(self, index: Any) -> dict: | ||||||||||||||||||||||||
| return {"kv_buffer": self._buffer[:, index]} | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def load_index_kv_buffer(self, index: Any, payload: dict) -> None: | ||||||||||||||||||||||||
| self._buffer[:, index].copy_(payload["kv_buffer"]) | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def get_device(self) -> int: | ||||||||||||||||||||||||
| return self._buffer.get_device() | ||||||||||||||||||||||||
|
|
||||||||||||||||||||||||
| def find_layer_index(self, k: torch.Tensor, v: torch.Tensor) -> int: | ||||||||||||||||||||||||
| key = min(k.data_ptr(), v.data_ptr()) | ||||||||||||||||||||||||
| find_dict = {self._buffer[i].data_ptr(): i for i in range(len(self._buffer))} | ||||||||||||||||||||||||
| assert key in find_dict | ||||||||||||||||||||||||
| return find_dict[key] | ||||||||||||||||||||||||
|
Comment on lines
+61
to
+65
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
HybridKvBufferdoes not support multi-dimensional indexing or item assignment, which are used inMemoryManager(e.g., lines 234 and 271). Becauseself._buffersis a list, indexing it with a tuple or slice will fail. This will cause crashes in models usingHybridKvBuffer(like Qwen3Next) when features like PD separation are enabled.