Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions deepspeed/ops/adam/cpu_adam.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,14 +236,12 @@ def rollback_subgroup(self, sub_group_id: int, closure=None):
f"CPUAdam param is on {param.device} and must be 'cpu', "
f"make sure you enabled 'offload_optimizer': 'cpu' in your ZeRO config.")

# Decrement step count
subgroup_state['step'] -= 1

# Extract hyperparameters
beta1, beta2 = group['betas']

self.ds_opt_adam.adam_rollback(self.opt_id, subgroup_state['step'], group['lr'], beta1, beta2,
group['eps'], group['weight_decay'], group['bias_correction'],
param.data, param.grad.data, subgroup_state['exp_avg'],
subgroup_state['exp_avg_sq'])

subgroup_state['step'] -= 1
return loss
197 changes: 101 additions & 96 deletions deepspeed/runtime/superoffload/superoffload_stage3.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import time
import torch
from collections import deque
from typing import List

from deepspeed.runtime.superoffload.superoffload_utils import SuperOffloadCPUOptimizer, TaskKeys, ResultKeys, EventTypes
Expand All @@ -18,6 +17,13 @@
OPTIMIZER_STEP_TIMER = 'optimizer_step'


def _validate_superoffload_accelerator():
"""Validate that the current accelerator is compatible with SuperOffload."""
accelerator = get_accelerator()
assert accelerator.device_name() == 'cuda', (
f"SuperOffload only supports NVIDIA CUDA GPUs, but found accelerator '{accelerator.device_name()}'.")


class SuperOffloadOptimizer_Stage3(DeepSpeedZeroOptimizer_Stage3):

def __init__(
Expand All @@ -29,24 +35,26 @@ def __init__(
ds_config,
**kwargs,
):
_validate_superoffload_accelerator()

self.sub_group_to_param_num = {}
self.params_in_ipg_bucket_buffer = deque()
self._cur_bucket_index = -1
self.sub_group_grad_partition_counts = {}
self.async_cpuadam_num = 0
self.max_grad_numel = 0

super().__init__(module, init_optimizer, param_names, timers, ds_config, **kwargs)

optimizer_config = {
"lr": self.optimizer.param_groups[0]["lr"],
"betas": self.optimizer.param_groups[0]["betas"],
"eps": self.optimizer.param_groups[0]["eps"],
"weight_decay": self.optimizer.param_groups[0]["weight_decay"],
"amsgrad": self.optimizer.param_groups[0]["amsgrad"]
}
optimizer_configs = []
for pg in self.optimizer.param_groups:
optimizer_configs.append({
"lr": pg["lr"],
"betas": pg["betas"],
"eps": pg["eps"],
"weight_decay": pg["weight_decay"],
"amsgrad": pg["amsgrad"],
})
Comment on lines +49 to +55
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve bias_correction when copying optimizer groups

The worker-side DeepSpeedCPUAdam still reads group['bias_correction'] during step_subgroup() (deepspeed/ops/adam/cpu_adam.py:199-202), but the new per-group config copy only forwards lr, betas, eps, weight_decay, and amsgrad. If a SuperOffload run has multiple optimizer groups with different bias_correction settings, every mirrored worker group silently falls back to the constructor default instead of the original group's value, so those subgroups will use the wrong Adam update rule.

Useful? React with 👍 / 👎.

cpuadam_cores_perc = kwargs.get("cpuadam_cores_perc", 0.8)
self.superoffload_cpu_optimizer = SuperOffloadCPUOptimizer(optimizer_config=optimizer_config,
self.superoffload_cpu_optimizer = SuperOffloadCPUOptimizer(optimizer_config=optimizer_configs,
cpuadam_cores_perc=cpuadam_cores_perc,
max_grad_numel=self.max_grad_numel)

Expand All @@ -56,6 +64,9 @@ def _create_fp16_sub_groups(self, params_group):
sub_group_size = self.sub_group_size

if sub_group_size is None or sub_group_size >= params_group_numel:
global_idx = len(self.sub_group_to_param_num)
self.sub_group_to_param_num[global_idx] = len(params_group)
self.max_grad_numel = max(self.max_grad_numel, params_group_numel)
return [params_group]

sub_groups = []
Expand All @@ -69,7 +80,8 @@ def _create_fp16_sub_groups(self, params_group):
if local_sub_group_size >= sub_group_size or id(param) == id(params_group[-1]):
self.max_grad_numel = max(self.max_grad_numel, local_sub_group_size)
sub_groups.append(sub_group)
self.sub_group_to_param_num[len(sub_groups) - 1] = len(sub_group)
global_idx = len(self.sub_group_to_param_num)
self.sub_group_to_param_num[global_idx] = len(sub_group)

sub_group = []
local_sub_group_size = 0
Expand All @@ -93,43 +105,16 @@ def step_with_gradscaler(optimizer):
step_with_gradscaler(self.backup_optimizer)
self.backup_optimizer.param_groups[param_group_id]['params'] = []

def reduce_independent_p_g_buckets_and_remove_grads(self, param):
comm_dtype = self.get_param_comm_dtype(param)
bucket = self.ipg_buckets[comm_dtype]
i, _, _ = self.grad_position[self.get_param_id(param)]

if len(bucket.params) == 0:
self._cur_bucket_index = i
if getattr(param, "ds_grad_is_ready", True):
self._DeepSpeedZeroOptimizer_Stage3__add_grad_to_ipg_bucket(param)

# If this is a single-parameter sub-group, reduce immediately
if self.sub_group_to_param_num[self._cur_bucket_index] == 1:
self._DeepSpeedZeroOptimizer_Stage3__reduce_and_partition_ipg_grads(comm_dtype)

elif i != self._cur_bucket_index:
# Parameter belongs to different sub-group, buffer it
self.params_in_ipg_bucket_buffer.append(param)
else:
# Parameter belongs to current bucket
if getattr(param, "ds_grad_is_ready", True):
self._DeepSpeedZeroOptimizer_Stage3__add_grad_to_ipg_bucket(param)

# Check if bucket is complete
if self.sub_group_to_param_num[self._cur_bucket_index] == len(bucket.params):
self._DeepSpeedZeroOptimizer_Stage3__reduce_and_partition_ipg_grads(comm_dtype)

# Process buffered parameters
while self.params_in_ipg_bucket_buffer:
buffered_param = self.params_in_ipg_bucket_buffer.popleft()
ci, _, _ = self.grad_position[self.get_param_id(buffered_param)]
self._cur_bucket_index = ci
if getattr(buffered_param, "ds_grad_is_ready", True):
self._DeepSpeedZeroOptimizer_Stage3__add_grad_to_ipg_bucket(buffered_param)
@instrument_w_nvtx
def independent_gradient_partition_epilogue(self):
super().independent_gradient_partition_epilogue()
self.sub_group_grad_partition_counts.clear()

@instrument_w_nvtx
def _reassign_or_swap_out_partitioned_parameters(self, sub_group_id):
if self.subgroup_to_device[sub_group_id] == 'cpu':
self.fp16_partitioned_groups_flat[sub_group_id].data.copy_(
self.fp32_partitioned_groups_flat[sub_group_id].data)
self._unflatten_partitioned_parameters(sub_group_id)
return

Expand All @@ -147,77 +132,76 @@ def _reassign_or_swap_out_partitioned_parameters_async(self, sub_group_id, updat

@instrument_w_nvtx
def partition_grads(self, params_to_release: List[Parameter], grad_partitions: List[Tensor]) -> None:
# print("[DEBUG] partition_grads called")
buffers = []
device_buffers = {}
buffer_numel_min = {}
buffer_numel_max = {}
completed_sub_groups = []

for param, grad_partition in zip(params_to_release, grad_partitions):
i, dest_offset, _ = self.grad_position[self.get_param_id(param)]

if self.is_gradient_accumulation_boundary:
self.norm_for_param_grads[self.get_param_id(param)] = self._constant_buffered_norm2(grad_partition)

buffer_numel = grad_partition.numel()
buffers.append(grad_partition)

if i not in device_buffers:
device_buffers[i] = []
device_buffers[i].append(grad_partition)

if i not in buffer_numel_min:
buffer_numel_min[i] = dest_offset
buffer_numel_max[i] = dest_offset + buffer_numel
# Accumulate gradient into the grad_buffer, mirroring base class logic
grad_buffer = self._DeepSpeedZeroOptimizer_Stage3__param_id_to_grad_partition[param.ds_id].narrow(
0, 0, grad_partition.numel())
if self.micro_step_id == 0:
grad_buffer.copy_(grad_partition, non_blocking=True)
grad_buffer = grad_buffer.to(grad_partition.device, non_blocking=True)
elif get_accelerator().on_accelerator(grad_buffer):
grad_buffer.add_(grad_partition.to(self.gradient_accumulation_dtype).view(grad_buffer.shape))
else:
buffer_numel_min[i] = min(buffer_numel_min[i], dest_offset)
buffer_numel_max[i] = max(buffer_numel_max[i], dest_offset + buffer_numel)
cuda_grad_buffer = grad_buffer.to(grad_partition.device, non_blocking=True)
cuda_grad_buffer.add_(grad_partition.to(self.gradient_accumulation_dtype).view(cuda_grad_buffer.shape))
grad_buffer.copy_(cuda_grad_buffer, non_blocking=True)
grad_buffer = cuda_grad_buffer

if self.is_gradient_accumulation_boundary:
self.norm_for_param_grads[self.get_param_id(param)] = self._constant_buffered_norm2(grad_buffer)

if self.is_gradient_accumulation_boundary:
for i in buffer_numel_min.keys():
fp32_grad_tensor = self.fp32_partitioned_groups_flat[i].grad.narrow(
0, buffer_numel_min[i], buffer_numel_max[i] - buffer_numel_min[i])
concatenated_buffer = torch.cat(device_buffers[i], dim=0).to(dtype=self.master_weights_and_grads_dtype)
0, dest_offset, grad_buffer.numel())
fp32_grad_tensor.copy_(grad_buffer.to(dtype=self.master_weights_and_grads_dtype), non_blocking=True)

if self.subgroup_to_device[i] == 'cpu':
# Trigger asynchronous CPU optimization
self.sub_group_grad_partition_counts[i] = self.sub_group_grad_partition_counts.get(i, 0) + 1
if self.sub_group_grad_partition_counts[i] == self.sub_group_to_param_num[i]:
completed_sub_groups.append(i)

if self.is_gradient_accumulation_boundary and completed_sub_groups:
get_accelerator().current_stream().synchronize()
for i in completed_sub_groups:
if self.subgroup_to_device[i] == 'cpu' and not self.clip_grad:
param_group_id = self.sub_group_to_group_id[i]
fp32_param = self.fp32_partitioned_groups_flat[i]
current_lr = self.optimizer.param_groups[param_group_id]['lr']

self.superoffload_cpu_optimizer.async_step(param_group_id, i, fp32_param.data,
concatenated_buffer.data)
self.superoffload_cpu_optimizer.async_step(param_group_id,
i,
fp32_param.data,
fp32_param.grad.data,
lr=current_lr)
self.async_cpuadam_num += 1

# Check for completed async operations
result = self.superoffload_cpu_optimizer.get_result()
if result is not None:
self._reassign_or_swap_out_partitioned_parameters_async(result[TaskKeys.SUB_GROUP_ID],
result[ResultKeys.UPDATED_PARAM])
self.async_cpuadam_num -= 1

fp32_grad_tensor.copy_(concatenated_buffer, non_blocking=True)
else:
fp32_grad_tensor.copy_(concatenated_buffer, non_blocking=True)

# Clean up parameter gradients
for param in params_to_release:
if not get_accelerator().is_synchronized_device():
param.grad.record_stream(get_accelerator().current_stream())
if param.grad is not None:
param.grad.record_stream(get_accelerator().current_stream())
param.grad = None

@instrument_w_nvtx
def step(self, closure=None):
"""
Not supporting closure.
"""
# Wait for any pending asynchronous CPU optimizer operations
self._wait_for_async_operations()

self._pre_step()
self._partition_all_parameters()

if self._overflow_check_and_loss_scale_update():
self._handle_overflow_rollback()
if not self.clip_grad:
self._handle_overflow_rollback()
return

norm_groups = self._get_norm_groups()
Expand All @@ -228,28 +212,45 @@ def step(self, closure=None):
timer_names.add(OPTIMIZER_STEP_TIMER)
self.timers(OPTIMIZER_STEP_TIMER).start()

if self.check_clip_grads(scaled_global_grad_norm):
self._handle_gradient_clipping(scaled_global_grad_norm)
if self.clip_grad:
self._step_with_clipping(scaled_global_grad_norm, timer_names)
else:
self._step_without_clipping(scaled_global_grad_norm, timer_names)
Comment on lines +215 to +218
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Gate the synchronous path on actual clipping

DeepSpeed's default gradient_clipping is 1.0 (deepspeed/runtime/constants.py:251), so branching on self.clip_grad here sends the default SuperOffload configuration into _step_with_clipping() on every step, even when check_clip_grads(...) would be false. Because partition_grads() now only launches async CPUAdam work when not self.clip_grad, this change disables the new backward/CPU overlap for most users and turns the patch into a throughput regression unless they explicitly set clipping to 0.0.

Useful? React with 👍 / 👎.


self.timers(OPTIMIZER_STEP_TIMER).stop()
self._post_step(timer_names)

def _step_without_clipping(self, scaled_global_grad_norm, timer_names):
"""Fast path: async CPU steps already completed during backward."""
for sub_group_id, group in enumerate(self.fp16_groups):
# Prepare optimizer states, gradients and fp32 parameters for update
self._prepare_sub_group(sub_group_id, timer_names)
self.unscale_and_clip_grads(sub_group_id, scaled_global_grad_norm)
self._optimizer_step(sub_group_id)
self._reassign_or_swap_out_partitioned_parameters(sub_group_id)
self._release_sub_group(sub_group_id, timer_names)

# Scale the fp32 gradients
def _step_with_clipping(self, scaled_global_grad_norm, timer_names):
"""Clipping path: no async steps were done during backward,
so we unscale+clip first, then step all sub-groups."""
for sub_group_id, group in enumerate(self.fp16_groups):
self._prepare_sub_group(sub_group_id, timer_names)
self.unscale_and_clip_grads(sub_group_id, scaled_global_grad_norm)

# Apply the optimizer step on the sub group and copy fp32 parameters to fp16
self._optimizer_step(sub_group_id)
if self.subgroup_to_device[sub_group_id] == 'cpu':
param_group_id = self.sub_group_to_group_id[sub_group_id]
fp32_param = self.fp32_partitioned_groups_flat[sub_group_id]
current_lr = self.optimizer.param_groups[param_group_id]['lr']
self._sync_cpu_optimizer_step(param_group_id,
sub_group_id,
fp32_param.data,
fp32_param.grad.data,
lr=current_lr)
else:
self._optimizer_step(sub_group_id)

# Put fp16 parameters in appropriate location
self._reassign_or_swap_out_partitioned_parameters(sub_group_id)

# Release memory or swap out optimizer states of fp32 parameters
self._release_sub_group(sub_group_id, timer_names)

self.timers(OPTIMIZER_STEP_TIMER).stop()
self._post_step(timer_names)

def _wait_for_async_operations(self, timeout_seconds=60):
"""Wait for all pending asynchronous CPU optimizer operations to complete with timeout error.

Expand Down Expand Up @@ -316,13 +317,15 @@ def _sync_cpu_optimizer_step(self,
fp32_param_data,
fp32_grad_data,
rollback: bool = False,
lr: float = None,
timeout_seconds: int = 60):
event_type = EventTypes.ROLLBACK if rollback else EventTypes.ADAM_STEP
self.superoffload_cpu_optimizer.async_step(param_group_id,
sub_group_id,
fp32_param_data,
fp32_grad_data,
rollback=rollback)
rollback=rollback,
lr=lr)
# Wait for completion
self._wait_for_single_async_result(event_type, timeout_seconds)

Expand Down Expand Up @@ -357,11 +360,13 @@ def _handle_gradient_clipping(self, scaled_global_grad_norm):
# Clip gradients and re-optimize
self.unscale_and_clip_grads(sub_group_id, scaled_global_grad_norm)

current_lr = self.optimizer.param_groups[param_group_id]['lr']
self._sync_cpu_optimizer_step(param_group_id,
sub_group_id,
fp32_param.data,
fp32_param.grad.data,
rollback=False)
rollback=False,
lr=current_lr)

@instrument_w_nvtx
def check_clip_grads(self, total_norm):
Expand Down
Loading
Loading