# Copyright 2016-2023 Cerebras Systems
# SPDX-License-Identifier: BSD-3-Clause
import functools
from abc import ABC, abstractmethod
from collections import defaultdict
from collections.abc import Iterable
from typing import List
import torch
from torch.optim.optimizer import Optimizer
from .init import InitMethodType, make_init_method
[docs]class BaseSparsityOptimizer(Optimizer, ABC):
r"""
Abstract base class for a dynamic sparsity optimizer.
Subclasses must implement :meth:`_get_target_sparsity_level_of_group` and
:meth:`step`.
Args:
params (iterable): iterable of parameters to sparsify or dicts defining
parameter groups to sparsify
init_method (InitMethodType): method by which sparsity is initialized
defaults (dict): Additional defaults for param_groups
"""
[docs] def __init__(
self, params, init_method: InitMethodType = 'random', defaults=None
):
defaults = defaults or {}
defaults['init_method'] = init_method
super().__init__(params, defaults)
self.optimizers_and_state_names = []
self._init_sparsity_called = False
self._hook_module_called = False
self._apply_sparsity_called = False
self._param_grad_hooks = {}
from cerebras_pytorch.backend import current_backend_impl
self.backend = current_backend_impl()
self.backend.register_optimizer(self)
[docs] def initialize_sparsity(self):
"""
Compute the initial sparsity pattern for each parameter.
"""
if self._init_sparsity_called:
# Don't re-initialize
return
self._init_sparsity_called = True
num_params = sum(len(group["params"]) for group in self.param_groups)
# Set up intiailization progress bar
if self.backend.progress_tracker is not None:
self.backend.progress_tracker.reset(total=num_params)
self.backend.progress_tracker.set_postfix()
self.backend.progress_tracker.set_description(
"Initializing sparsity patterns"
)
with self.backend.device:
for group in self.param_groups:
self._init_sparsity_of_group(group)
self.visit_state(lambda x: x.to(self.backend.torch_device))
# After initializing new masks, we'll need to double check that
# apply_sparsity() gets called once before step()
self._apply_sparsity_called = False
def _init_sparsity_of_group(self, group):
"""
Compute the initial sparsity pattern for each of the parameters in the
given group.
"""
sparsity = self._get_target_sparsity_level_of_group(group)
initializer = group['init_method']
for name, p in zip(group["param_names"], group['params']):
if self.backend.progress_tracker is not None:
self.backend.progress_tracker.set_postfix(name=name)
if self.backend.is_csx:
# Sparsity and the sparsity pattern must be initialized on
# CPU during model pre-initalization, since the CSX device
# only captures the computation graph during training.
# These device copies from CSX to cpu and back are actually
# just shared-memory tricks used to influence which device
# performs the computations since tensors are only copied
# into the appliance once execution starts.
sparsity = sparsity.to("cpu")
else:
sparsity = sparsity.to(p.device)
self.state[p]['mask'] = initializer(p, sparsity).to(p.device)
if self.backend.progress_tracker is not None:
self.backend.progress_tracker.update()
@abstractmethod
def _get_target_sparsity_level_of_group(self, group) -> torch.FloatTensor:
"""
Returns the target sparsity level for parameters in the group.
Returns:
sparsity_level: a rankless FloatTensor holding the sparsity level
"""
[docs] def manage_optimizer_state_sparsity(
self, optimizer: Optimizer, state_names: List[str]
):
"""
Manage the sparsity of an optimizer's state. For any parameters that
this SparsityOptimizer manages, apply the sparsity pattern to all
states named `state_names`
"""
self.optimizers_and_state_names.append((optimizer, state_names))
def _yield_optimizer_states(self, p):
"""
Yield the given parameter's optimizer states which need sparsity
applied.
"""
for opt, state_names in self.optimizers_and_state_names:
if p in opt.state:
state = opt.state[p]
for s_name in state_names:
if s_name in state:
yield state[s_name]
[docs] def annotate_sparsity(self):
"""
Annotate sparsity as performance hints for the cerebras compiler
"""
for group in self.param_groups:
sparsity = group.get("csx_annotated_sparsity")
if sparsity is None:
continue
min_v, max_v, ending_v = sparsity
for p in group['params']:
self.backend.set_attribute(p, "min_sparsity", min_v)
self.backend.set_attribute(p, "max_sparsity", max_v)
self.backend.set_attribute(p, "sparsity", ending_v)
for state in self._yield_optimizer_states(p):
self.backend.set_attribute(state, "min_sparsity", min_v)
self.backend.set_attribute(state, "max_sparsity", max_v)
self.backend.set_attribute(state, "sparsity", ending_v)
[docs] def hook_module(self, module: torch.nn.Module):
"""
Hook the given module such that the sparsity pattern is applied to both
the parameters before forward() and gradients after backward()
"""
self._hook_module_called = True
def forward_pre_hook(module, input):
self.annotate_sparsity()
self.apply_sparsity()
module.register_forward_pre_hook(forward_pre_hook)
def _ensure_sparsity_applied(self):
if not self._apply_sparsity_called:
error = (
"apply_sparsity() must be called before forward() to apply "
"sparsity to parameters and optimizer state. "
)
if self._hook_module_called:
error += (
"A module hook was installed which should have taken care "
"of calling it, but did not. Check that you have not "
"disabled module hooks."
)
else:
error += (
"For your convenience, the SparsityOptimizer method "
"``.hook_module()`` can add a torch.nn.Module forward_pre "
"hook to automatically apply sparsity."
)
raise RuntimeError(error)
[docs] def zero_grad(self, set_to_none: bool = True):
"""
Override default torch.optim.Optimizer to never zero gradients: This
optimizer is slightly unique in that it isn't responsible for the
`main` weight update of the params it manages (and thus doesn't consult
or "maintain" their gradients), but it does manage the sparsity pattern
of the params.
Can be further overriden in other SparsityOptimizers if they deal with
gradients (like RigL).
"""
def state_dict(self):
# Adapted from torch.optim.Optimizer, but we use param_names
# param_names used in place of params
param_groups = []
# map parameter -> name
name_map = {}
for group in self.param_groups:
name_map.update(dict(zip(group["params"], group["param_names"])))
group = group.copy()
del group["params"]
# Some objects may themselves be stateful, so we store their state
# instead of them
for k, v in list(group.items()):
if hasattr(v, "state_dict"):
group[k] = v.state_dict()
elif callable(v):
# Don't serialize callable objects
del group[k]
param_groups.append(group)
state = {name_map[p]: v for p, v in self.state.items()}
return {"state": state, "param_groups": param_groups}
def load_state_dict(self, state_dict):
# Adapted from torch.optim.Optimizer, but we use param_names
# Validate the state_dict
groups = self.param_groups
saved_groups = state_dict['param_groups']
if len(groups) != len(saved_groups):
raise ValueError(
"loaded state dict has a different number of parameter groups"
)
# map name -> parameter
name_map = {}
for group in self.param_groups:
name_map.update(dict(zip(group["param_names"], group["params"])))
for group, saved_group in zip(groups, saved_groups):
if group["param_names"] != saved_group["param_names"]:
raise ValueError(
"loaded state dict contains different parameters than "
"the current optimizer"
)
def to_device(param, value):
"""
Transfer each value to the same device as param.
"""
if isinstance(value, torch.Tensor):
return value.to(param.device)
elif isinstance(value, dict):
return {k: to_device(param, v) for k, v in value.items()}
elif isinstance(value, Iterable):
return type(value)(to_device(param, v) for v in value)
else:
return value
# Copy state associated with params (moving tensors to param device).
state = defaultdict(dict)
for param_name, v in state_dict['state'].items():
param = name_map[param_name]
state[param] = to_device(param, v)
# Update parameter groups, resetting their 'params' value
def update_group(group, new_group):
new_group['params'] = group['params']
# Some Sparsity param_group entries are complex and need to be
# serialized specially.
for k, v in group.items():
if hasattr(v, "load_state_dict"):
# Use the old object, but with loaded state.
v.load_state_dict(new_group[k])
new_group[k] = v
elif k not in new_group:
# Some items were omitted from the state_dict. Keep their
# old value.
new_group[k] = v
return new_group
param_groups = [
update_group(g, ng) for g, ng in zip(groups, saved_groups)
]
self.__setstate__({'state': state, 'param_groups': param_groups})
# Loading state counts as initializing it, don't re-init
self._init_sparsity_called = True
[docs] def visit_state(self, fn):
"""
Applies a lambda to each stateful value.
"""
for state in self.state.values():
for key, val in state.items():
new_val = fn(val)
if new_val is not None:
state[key] = new_val
for group in self.param_groups:
for v in group.values():
if hasattr(v, "visit_state"):
v.visit_state(fn)
def add_param_group(self, param_group):
# SparsityOptimizer accepts named_params tuples instead
named_params = param_group["params"]
if isinstance(named_params, list):
# list of tuples
names, params = zip(*named_params)
elif isinstance(named_params, tuple):
# single tuple
names, params = named_params
params = [params]
names = [names]
param_group["params"] = params
param_group["param_names"] = names
super().add_param_group(param_group)
# Hydrate the initializer
param_group["init_method"] = make_init_method(
param_group["init_method"]
)
# Ensure every group has a name
if "name" not in param_group:
if len(names) == 1:
# Single weight group
param_group["name"] = names[0]
else:
param_group["name"] = f"group_{len(self.param_groups)}"
# Return the newly added param_group
return self.param_groups[-1]
@torch.no_grad()
def apply_sparsity(self):
"""
Apply the sparsity pattern to the parameters and optimizer states.
"""
if not self._init_sparsity_called:
if self.backend.is_csx:
raise RuntimeError(
"Sparsity must be initialized before execution"
)
# We can init lazily on CPU/GPU though.
self.initialize_sparsity()
self._apply_sparsity_called = True
self._apply_masks_to_params()
self._apply_masks_to_opt_state()
def _grad_hook(self, p, grad):
# In the case there any NaNs in the unused gradients that correspond to
# zero'd out weights, we use a selection to replace these NaNs with
# zeros. (multiplying with the mask would preserve them).
# DLS will skip a weight update if there is a NaN in the gradient, but
# we only want this to happen if there is a NaN in gradients
# corresponding to non-zero weights. This is the behavior of the CS2
# which doesn't even compute the full gradients on most steps.
zero = torch.zeros_like(grad)
mask = self.state[p]['mask']
# Return modified gradient.
return torch.where(mask, grad, zero)
@torch.no_grad()
def _apply_masks_to_params(self):
for group in self.param_groups:
for p in group['params']:
# Apply sparsity.
p.mul_(self.state[p]['mask'])
# Set up autograd to apply sparsity to gradients too.
if p not in self._param_grad_hooks:
self._param_grad_hooks[p] = p.register_hook(
functools.partial(self._grad_hook, p)
)
@torch.no_grad()
def _apply_masks_to_opt_state(self):
for group in self.param_groups:
for p in group['params']:
for state in self._yield_optimizer_states(p):
state.mul_(self.state[p]['mask'])