-
Notifications
You must be signed in to change notification settings - Fork 4.8k
fix(superoffload) preserve multi-group updates with shared cpu buffers (#7905) #7906
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a26e9af
f35fcc8
3d8ecb5
2895cd1
169f4b3
98e8f6d
6855b08
6ce2a69
fb98610
3d04899
17a3ac5
05d7f53
00732ed
7da1d33
e0e66a2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,7 +5,6 @@ | |
|
|
||
| import time | ||
| import torch | ||
| from collections import deque | ||
| from typing import List | ||
|
|
||
| from deepspeed.runtime.superoffload.superoffload_utils import SuperOffloadCPUOptimizer, TaskKeys, ResultKeys, EventTypes | ||
|
|
@@ -18,6 +17,13 @@ | |
| OPTIMIZER_STEP_TIMER = 'optimizer_step' | ||
|
|
||
|
|
||
| def _validate_superoffload_accelerator(): | ||
| """Validate that the current accelerator is compatible with SuperOffload.""" | ||
| accelerator = get_accelerator() | ||
| assert accelerator.device_name() == 'cuda', ( | ||
| f"SuperOffload only supports NVIDIA CUDA GPUs, but found accelerator '{accelerator.device_name()}'.") | ||
|
|
||
|
|
||
| class SuperOffloadOptimizer_Stage3(DeepSpeedZeroOptimizer_Stage3): | ||
|
|
||
| def __init__( | ||
|
|
@@ -29,24 +35,26 @@ def __init__( | |
| ds_config, | ||
| **kwargs, | ||
| ): | ||
| _validate_superoffload_accelerator() | ||
|
|
||
| self.sub_group_to_param_num = {} | ||
| self.params_in_ipg_bucket_buffer = deque() | ||
| self._cur_bucket_index = -1 | ||
| self.sub_group_grad_partition_counts = {} | ||
| self.async_cpuadam_num = 0 | ||
| self.max_grad_numel = 0 | ||
|
|
||
| super().__init__(module, init_optimizer, param_names, timers, ds_config, **kwargs) | ||
|
|
||
| optimizer_config = { | ||
| "lr": self.optimizer.param_groups[0]["lr"], | ||
| "betas": self.optimizer.param_groups[0]["betas"], | ||
| "eps": self.optimizer.param_groups[0]["eps"], | ||
| "weight_decay": self.optimizer.param_groups[0]["weight_decay"], | ||
| "amsgrad": self.optimizer.param_groups[0]["amsgrad"] | ||
| } | ||
| optimizer_configs = [] | ||
| for pg in self.optimizer.param_groups: | ||
| optimizer_configs.append({ | ||
| "lr": pg["lr"], | ||
| "betas": pg["betas"], | ||
| "eps": pg["eps"], | ||
| "weight_decay": pg["weight_decay"], | ||
| "amsgrad": pg["amsgrad"], | ||
| }) | ||
| cpuadam_cores_perc = kwargs.get("cpuadam_cores_perc", 0.8) | ||
| self.superoffload_cpu_optimizer = SuperOffloadCPUOptimizer(optimizer_config=optimizer_config, | ||
| self.superoffload_cpu_optimizer = SuperOffloadCPUOptimizer(optimizer_config=optimizer_configs, | ||
| cpuadam_cores_perc=cpuadam_cores_perc, | ||
| max_grad_numel=self.max_grad_numel) | ||
|
|
||
|
|
@@ -56,6 +64,9 @@ def _create_fp16_sub_groups(self, params_group): | |
| sub_group_size = self.sub_group_size | ||
|
|
||
| if sub_group_size is None or sub_group_size >= params_group_numel: | ||
| global_idx = len(self.sub_group_to_param_num) | ||
| self.sub_group_to_param_num[global_idx] = len(params_group) | ||
| self.max_grad_numel = max(self.max_grad_numel, params_group_numel) | ||
| return [params_group] | ||
|
|
||
| sub_groups = [] | ||
|
|
@@ -69,7 +80,8 @@ def _create_fp16_sub_groups(self, params_group): | |
| if local_sub_group_size >= sub_group_size or id(param) == id(params_group[-1]): | ||
| self.max_grad_numel = max(self.max_grad_numel, local_sub_group_size) | ||
| sub_groups.append(sub_group) | ||
| self.sub_group_to_param_num[len(sub_groups) - 1] = len(sub_group) | ||
| global_idx = len(self.sub_group_to_param_num) | ||
| self.sub_group_to_param_num[global_idx] = len(sub_group) | ||
|
|
||
| sub_group = [] | ||
| local_sub_group_size = 0 | ||
|
|
@@ -93,43 +105,16 @@ def step_with_gradscaler(optimizer): | |
| step_with_gradscaler(self.backup_optimizer) | ||
| self.backup_optimizer.param_groups[param_group_id]['params'] = [] | ||
|
|
||
| def reduce_independent_p_g_buckets_and_remove_grads(self, param): | ||
| comm_dtype = self.get_param_comm_dtype(param) | ||
| bucket = self.ipg_buckets[comm_dtype] | ||
| i, _, _ = self.grad_position[self.get_param_id(param)] | ||
|
|
||
| if len(bucket.params) == 0: | ||
| self._cur_bucket_index = i | ||
| if getattr(param, "ds_grad_is_ready", True): | ||
| self._DeepSpeedZeroOptimizer_Stage3__add_grad_to_ipg_bucket(param) | ||
|
|
||
| # If this is a single-parameter sub-group, reduce immediately | ||
| if self.sub_group_to_param_num[self._cur_bucket_index] == 1: | ||
| self._DeepSpeedZeroOptimizer_Stage3__reduce_and_partition_ipg_grads(comm_dtype) | ||
|
|
||
| elif i != self._cur_bucket_index: | ||
| # Parameter belongs to different sub-group, buffer it | ||
| self.params_in_ipg_bucket_buffer.append(param) | ||
| else: | ||
| # Parameter belongs to current bucket | ||
| if getattr(param, "ds_grad_is_ready", True): | ||
| self._DeepSpeedZeroOptimizer_Stage3__add_grad_to_ipg_bucket(param) | ||
|
|
||
| # Check if bucket is complete | ||
| if self.sub_group_to_param_num[self._cur_bucket_index] == len(bucket.params): | ||
| self._DeepSpeedZeroOptimizer_Stage3__reduce_and_partition_ipg_grads(comm_dtype) | ||
|
|
||
| # Process buffered parameters | ||
| while self.params_in_ipg_bucket_buffer: | ||
| buffered_param = self.params_in_ipg_bucket_buffer.popleft() | ||
| ci, _, _ = self.grad_position[self.get_param_id(buffered_param)] | ||
| self._cur_bucket_index = ci | ||
| if getattr(buffered_param, "ds_grad_is_ready", True): | ||
| self._DeepSpeedZeroOptimizer_Stage3__add_grad_to_ipg_bucket(buffered_param) | ||
| @instrument_w_nvtx | ||
| def independent_gradient_partition_epilogue(self): | ||
| super().independent_gradient_partition_epilogue() | ||
| self.sub_group_grad_partition_counts.clear() | ||
|
|
||
| @instrument_w_nvtx | ||
| def _reassign_or_swap_out_partitioned_parameters(self, sub_group_id): | ||
| if self.subgroup_to_device[sub_group_id] == 'cpu': | ||
| self.fp16_partitioned_groups_flat[sub_group_id].data.copy_( | ||
| self.fp32_partitioned_groups_flat[sub_group_id].data) | ||
| self._unflatten_partitioned_parameters(sub_group_id) | ||
| return | ||
|
|
||
|
|
@@ -147,77 +132,76 @@ def _reassign_or_swap_out_partitioned_parameters_async(self, sub_group_id, updat | |
|
|
||
| @instrument_w_nvtx | ||
| def partition_grads(self, params_to_release: List[Parameter], grad_partitions: List[Tensor]) -> None: | ||
| # print("[DEBUG] partition_grads called") | ||
| buffers = [] | ||
| device_buffers = {} | ||
| buffer_numel_min = {} | ||
| buffer_numel_max = {} | ||
| completed_sub_groups = [] | ||
|
|
||
| for param, grad_partition in zip(params_to_release, grad_partitions): | ||
| i, dest_offset, _ = self.grad_position[self.get_param_id(param)] | ||
|
|
||
| if self.is_gradient_accumulation_boundary: | ||
| self.norm_for_param_grads[self.get_param_id(param)] = self._constant_buffered_norm2(grad_partition) | ||
|
|
||
| buffer_numel = grad_partition.numel() | ||
| buffers.append(grad_partition) | ||
|
|
||
| if i not in device_buffers: | ||
| device_buffers[i] = [] | ||
| device_buffers[i].append(grad_partition) | ||
|
|
||
| if i not in buffer_numel_min: | ||
| buffer_numel_min[i] = dest_offset | ||
| buffer_numel_max[i] = dest_offset + buffer_numel | ||
| # Accumulate gradient into the grad_buffer, mirroring base class logic | ||
| grad_buffer = self._DeepSpeedZeroOptimizer_Stage3__param_id_to_grad_partition[param.ds_id].narrow( | ||
| 0, 0, grad_partition.numel()) | ||
| if self.micro_step_id == 0: | ||
| grad_buffer.copy_(grad_partition, non_blocking=True) | ||
| grad_buffer = grad_buffer.to(grad_partition.device, non_blocking=True) | ||
| elif get_accelerator().on_accelerator(grad_buffer): | ||
| grad_buffer.add_(grad_partition.to(self.gradient_accumulation_dtype).view(grad_buffer.shape)) | ||
| else: | ||
| buffer_numel_min[i] = min(buffer_numel_min[i], dest_offset) | ||
| buffer_numel_max[i] = max(buffer_numel_max[i], dest_offset + buffer_numel) | ||
| cuda_grad_buffer = grad_buffer.to(grad_partition.device, non_blocking=True) | ||
| cuda_grad_buffer.add_(grad_partition.to(self.gradient_accumulation_dtype).view(cuda_grad_buffer.shape)) | ||
| grad_buffer.copy_(cuda_grad_buffer, non_blocking=True) | ||
| grad_buffer = cuda_grad_buffer | ||
|
|
||
| if self.is_gradient_accumulation_boundary: | ||
| self.norm_for_param_grads[self.get_param_id(param)] = self._constant_buffered_norm2(grad_buffer) | ||
|
|
||
| if self.is_gradient_accumulation_boundary: | ||
| for i in buffer_numel_min.keys(): | ||
| fp32_grad_tensor = self.fp32_partitioned_groups_flat[i].grad.narrow( | ||
| 0, buffer_numel_min[i], buffer_numel_max[i] - buffer_numel_min[i]) | ||
| concatenated_buffer = torch.cat(device_buffers[i], dim=0).to(dtype=self.master_weights_and_grads_dtype) | ||
| 0, dest_offset, grad_buffer.numel()) | ||
| fp32_grad_tensor.copy_(grad_buffer.to(dtype=self.master_weights_and_grads_dtype), non_blocking=True) | ||
|
|
||
| if self.subgroup_to_device[i] == 'cpu': | ||
| # Trigger asynchronous CPU optimization | ||
| self.sub_group_grad_partition_counts[i] = self.sub_group_grad_partition_counts.get(i, 0) + 1 | ||
| if self.sub_group_grad_partition_counts[i] == self.sub_group_to_param_num[i]: | ||
| completed_sub_groups.append(i) | ||
|
|
||
| if self.is_gradient_accumulation_boundary and completed_sub_groups: | ||
| get_accelerator().current_stream().synchronize() | ||
| for i in completed_sub_groups: | ||
| if self.subgroup_to_device[i] == 'cpu' and not self.clip_grad: | ||
| param_group_id = self.sub_group_to_group_id[i] | ||
| fp32_param = self.fp32_partitioned_groups_flat[i] | ||
| current_lr = self.optimizer.param_groups[param_group_id]['lr'] | ||
|
|
||
| self.superoffload_cpu_optimizer.async_step(param_group_id, i, fp32_param.data, | ||
| concatenated_buffer.data) | ||
| self.superoffload_cpu_optimizer.async_step(param_group_id, | ||
| i, | ||
| fp32_param.data, | ||
| fp32_param.grad.data, | ||
| lr=current_lr) | ||
| self.async_cpuadam_num += 1 | ||
|
|
||
| # Check for completed async operations | ||
| result = self.superoffload_cpu_optimizer.get_result() | ||
| if result is not None: | ||
| self._reassign_or_swap_out_partitioned_parameters_async(result[TaskKeys.SUB_GROUP_ID], | ||
| result[ResultKeys.UPDATED_PARAM]) | ||
| self.async_cpuadam_num -= 1 | ||
|
|
||
| fp32_grad_tensor.copy_(concatenated_buffer, non_blocking=True) | ||
| else: | ||
| fp32_grad_tensor.copy_(concatenated_buffer, non_blocking=True) | ||
|
|
||
| # Clean up parameter gradients | ||
| for param in params_to_release: | ||
| if not get_accelerator().is_synchronized_device(): | ||
| param.grad.record_stream(get_accelerator().current_stream()) | ||
| if param.grad is not None: | ||
| param.grad.record_stream(get_accelerator().current_stream()) | ||
| param.grad = None | ||
|
|
||
| @instrument_w_nvtx | ||
| def step(self, closure=None): | ||
| """ | ||
| Not supporting closure. | ||
| """ | ||
| # Wait for any pending asynchronous CPU optimizer operations | ||
| self._wait_for_async_operations() | ||
|
|
||
| self._pre_step() | ||
| self._partition_all_parameters() | ||
|
|
||
| if self._overflow_check_and_loss_scale_update(): | ||
| self._handle_overflow_rollback() | ||
| if not self.clip_grad: | ||
| self._handle_overflow_rollback() | ||
| return | ||
|
|
||
| norm_groups = self._get_norm_groups() | ||
|
|
@@ -228,28 +212,45 @@ def step(self, closure=None): | |
| timer_names.add(OPTIMIZER_STEP_TIMER) | ||
| self.timers(OPTIMIZER_STEP_TIMER).start() | ||
|
|
||
| if self.check_clip_grads(scaled_global_grad_norm): | ||
| self._handle_gradient_clipping(scaled_global_grad_norm) | ||
| if self.clip_grad: | ||
| self._step_with_clipping(scaled_global_grad_norm, timer_names) | ||
| else: | ||
| self._step_without_clipping(scaled_global_grad_norm, timer_names) | ||
|
Comment on lines
+215
to
+218
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
DeepSpeed's default Useful? React with 👍 / 👎. |
||
|
|
||
| self.timers(OPTIMIZER_STEP_TIMER).stop() | ||
| self._post_step(timer_names) | ||
|
|
||
| def _step_without_clipping(self, scaled_global_grad_norm, timer_names): | ||
| """Fast path: async CPU steps already completed during backward.""" | ||
| for sub_group_id, group in enumerate(self.fp16_groups): | ||
| # Prepare optimizer states, gradients and fp32 parameters for update | ||
| self._prepare_sub_group(sub_group_id, timer_names) | ||
| self.unscale_and_clip_grads(sub_group_id, scaled_global_grad_norm) | ||
| self._optimizer_step(sub_group_id) | ||
| self._reassign_or_swap_out_partitioned_parameters(sub_group_id) | ||
| self._release_sub_group(sub_group_id, timer_names) | ||
|
|
||
| # Scale the fp32 gradients | ||
| def _step_with_clipping(self, scaled_global_grad_norm, timer_names): | ||
| """Clipping path: no async steps were done during backward, | ||
| so we unscale+clip first, then step all sub-groups.""" | ||
| for sub_group_id, group in enumerate(self.fp16_groups): | ||
| self._prepare_sub_group(sub_group_id, timer_names) | ||
| self.unscale_and_clip_grads(sub_group_id, scaled_global_grad_norm) | ||
|
|
||
| # Apply the optimizer step on the sub group and copy fp32 parameters to fp16 | ||
| self._optimizer_step(sub_group_id) | ||
| if self.subgroup_to_device[sub_group_id] == 'cpu': | ||
| param_group_id = self.sub_group_to_group_id[sub_group_id] | ||
| fp32_param = self.fp32_partitioned_groups_flat[sub_group_id] | ||
| current_lr = self.optimizer.param_groups[param_group_id]['lr'] | ||
| self._sync_cpu_optimizer_step(param_group_id, | ||
| sub_group_id, | ||
| fp32_param.data, | ||
| fp32_param.grad.data, | ||
| lr=current_lr) | ||
| else: | ||
| self._optimizer_step(sub_group_id) | ||
|
|
||
| # Put fp16 parameters in appropriate location | ||
| self._reassign_or_swap_out_partitioned_parameters(sub_group_id) | ||
|
|
||
| # Release memory or swap out optimizer states of fp32 parameters | ||
| self._release_sub_group(sub_group_id, timer_names) | ||
|
|
||
| self.timers(OPTIMIZER_STEP_TIMER).stop() | ||
| self._post_step(timer_names) | ||
|
|
||
| def _wait_for_async_operations(self, timeout_seconds=60): | ||
| """Wait for all pending asynchronous CPU optimizer operations to complete with timeout error. | ||
|
|
||
|
|
@@ -316,13 +317,15 @@ def _sync_cpu_optimizer_step(self, | |
| fp32_param_data, | ||
| fp32_grad_data, | ||
| rollback: bool = False, | ||
| lr: float = None, | ||
| timeout_seconds: int = 60): | ||
| event_type = EventTypes.ROLLBACK if rollback else EventTypes.ADAM_STEP | ||
| self.superoffload_cpu_optimizer.async_step(param_group_id, | ||
| sub_group_id, | ||
| fp32_param_data, | ||
| fp32_grad_data, | ||
| rollback=rollback) | ||
| rollback=rollback, | ||
| lr=lr) | ||
| # Wait for completion | ||
| self._wait_for_single_async_result(event_type, timeout_seconds) | ||
|
|
||
|
|
@@ -357,11 +360,13 @@ def _handle_gradient_clipping(self, scaled_global_grad_norm): | |
| # Clip gradients and re-optimize | ||
| self.unscale_and_clip_grads(sub_group_id, scaled_global_grad_norm) | ||
|
|
||
| current_lr = self.optimizer.param_groups[param_group_id]['lr'] | ||
| self._sync_cpu_optimizer_step(param_group_id, | ||
| sub_group_id, | ||
| fp32_param.data, | ||
| fp32_param.grad.data, | ||
| rollback=False) | ||
| rollback=False, | ||
| lr=current_lr) | ||
|
|
||
| @instrument_w_nvtx | ||
| def check_clip_grads(self, total_norm): | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bias_correctionwhen copying optimizer groupsThe worker-side
DeepSpeedCPUAdamstill readsgroup['bias_correction']duringstep_subgroup()(deepspeed/ops/adam/cpu_adam.py:199-202), but the new per-group config copy only forwardslr,betas,eps,weight_decay, andamsgrad. If a SuperOffload run has multiple optimizer groups with differentbias_correctionsettings, every mirrored worker group silently falls back to the constructor default instead of the original group's value, so those subgroups will use the wrong Adam update rule.Useful? React with 👍 / 👎.