Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 117 additions & 0 deletions examples/autoround/ddp/ddp_qwen3_moe_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
DDP AutoRound quantization example for large MoE models.

Uses the standard compressed-tensors DDP path: load_offloaded_model()
broadcasts weights from rank 0 to rank 1. GPUS_PER_GROUP controls how
many GPUs each rank uses for per-block model parallelism.

Run with:
CUDA_VISIBLE_DEVICES=0,1,2,3 GPUS_PER_GROUP=2 torchrun \
--nproc_per_node=2 ddp_qwen3_moe_example.py
"""

import os
import time

import torch
import torch.distributed as dist
from compressed_tensors.offload import load_offloaded_model
from loguru import logger
from transformers import AutoModelForCausalLM, AutoTokenizer
from llmcompressor import oneshot

MODEL = "Qwen/Qwen3-235B-A22B-Instruct-2507"
SCHEME = "W4A16"
ITERS = 200
NSAMPLES = 256

###### DDP INIT #####
gpus_per_group = int(os.environ.get("GPUS_PER_GROUP", "1"))
local_rank = int(os.environ["LOCAL_RANK"])
main_gpu = local_rank * gpus_per_group
torch.cuda.set_device(main_gpu)
dist.init_process_group(
backend="nccl",
init_method="env://",
device_id=torch.device(f"cuda:{main_gpu}"),
)

rank = dist.get_rank()
world_size = dist.get_world_size()
logger.info(
f"[Rank {rank}/{world_size}] GPUs: {torch.cuda.device_count()}, "
f"main_gpu: {main_gpu}, group: [{main_gpu}-{main_gpu + gpus_per_group - 1}]"
)

###### MODEL LOAD #####
load_start = time.perf_counter()
with load_offloaded_model():
model = AutoModelForCausalLM.from_pretrained(
MODEL, dtype="auto", device_map="auto_offload",
)
logger.info(f"[Rank {rank}] Loaded in {time.perf_counter() - load_start:.1f}s")

tokenizer = AutoTokenizer.from_pretrained(MODEL)

###### DATASET #####
os.environ["AR_DISABLE_DATASET_SUBPROCESS"] = "1"
from auto_round.calib_dataset import get_dataset
from llmcompressor.modifiers.autoround import AutoRoundModifier

ds = get_dataset(tokenizer=tokenizer, seqlen=2048, nsamples=NSAMPLES)

###### RECIPE #####

recipe = AutoRoundModifier(
targets="Linear",
scheme=SCHEME,
ignore=["lm_head", "re:.*mlp.gate$"],
iters=ITERS,
enable_torch_compile=False,
)

###### QUANTIZE #####
logger.info(f"[Rank {rank}] Starting oneshot...")
quant_start = time.perf_counter()
oneshot(
model=model,
dataset=ds,
recipe=recipe,
max_seq_length=2048,
num_calibration_samples=NSAMPLES,
shuffle_calibration_samples=False,
)
logger.info(f"[Rank {rank}] Quantization done in {time.perf_counter() - quant_start:.1f}s")

###### SAVE #####
# Both ranks must participate — save_pretrained internally calls
# collectives (broadcast_object_list). Only rank 0 writes to disk.
save_dir = (
MODEL.rstrip("/").split("/")[-1]
+ f"-{SCHEME}-AutoRound"
+ f"-iters{ITERS}-nsamples{NSAMPLES}"
+ f"-DDP{world_size}"
)
logger.info(f"[Rank {rank}] Saving to {save_dir}...")
model.save_pretrained(save_dir, save_compressed=True)
if rank == 0:
tokenizer.save_pretrained(save_dir)
logger.info(f"[Rank {rank}] Saved to {save_dir}")

if dist.is_initialized():
dist.barrier()
dist.destroy_process_group()

###### SAMPLE GENERATION (rank 0 only) #####
if rank == 0:
from compressed_tensors.offload import dispatch_model

logger.info("========== SAMPLE GENERATION ==============")
dispatch_model(model)
sample = tokenizer("Hello my name is", return_tensors="pt")
sample = {key: value.to(model.device) for key, value in sample.items()}
output = model.generate(**sample, max_new_tokens=100)
logger.info(tokenizer.decode(output[0]))
logger.info("==========================================")

logger.info(f"[Rank {rank}] SUCCESS")
88 changes: 74 additions & 14 deletions src/llmcompressor/modifiers/autoround/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from contextlib import contextmanager
import os

import torch
import torch.nn as nn
Expand All @@ -8,6 +9,7 @@
from auto_round.utils import check_to_quantized
from auto_round.wrapper import WrapperWALayer
from compressed_tensors.offload import get_execution_device, get_offloaded_device
from compressed_tensors.offload.cache.base import OffloadCache
from compressed_tensors.offload.module import offload_module, remove_module_offload
from compressed_tensors.quantization import (
QuantizationMetadata,
Expand All @@ -16,6 +18,7 @@
enable_quantization,
)
from compressed_tensors.utils import align_module_device, match_named_modules
from llmcompressor.utils.dev import get_main_device
from loguru import logger
from pydantic import PrivateAttr

Expand All @@ -30,6 +33,15 @@
__all__ = ["AutoRoundModifier", "fix_batch_if_needed"]


def _get_local_gpu_group_size() -> int:
return int(
os.environ.get(
"GPUS_PER_GROUP",
os.environ.get("GPUS_PER_RANK", "1"),
)
)


class _LLModelWrapper(torch.nn.Module):
def __init__(self):
super().__init__()
Expand Down Expand Up @@ -79,7 +91,7 @@ def suspend_offloading(model: nn.Module):
"""
offloading_info = dict()
for name, module in model.named_modules():
if not hasattr(module, "weight"): # skip SiLU or other non-weight layers
if not isinstance(module._parameters, OffloadCache):
continue
offloading_info[name] = (
get_execution_device(module),
Expand All @@ -90,7 +102,7 @@ def suspend_offloading(model: nn.Module):
yield

for name, module in model.named_modules():
if not hasattr(module, "weight"): # skip SiLU or other non-weight layers
if name not in offloading_info:
continue
offload_module(module, *offloading_info[name])

Expand Down Expand Up @@ -176,9 +188,18 @@ def on_initialize(self, state: State, **kwargs) -> bool:

:param state: session state storing input model and calibration data
"""
# apply config to model and prepare calibration hooks
# apply config to model and prepare calibration hooks.
# Wrap in disable_onloading to suppress DistributedCPUCache's
# per-param broadcast+barrier when creating quant params (scale,
# zero_point). With GPUS_PER_GROUP > 1, modules have varying GPU
# execution devices, causing GPU→CPU copy timing to vary between
# ranks → broadcast deadlock. Quant params are deterministic —
# each rank computes identical values, no sync needed.
if QuantizationMixin.has_config(self):
QuantizationMixin.initialize_quantization(self, state.model)
from compressed_tensors.offload import disable_onloading

with disable_onloading():
QuantizationMixin.initialize_quantization(self, state.model)

# prepare module names
self._add_temporary_names(state.model)
Expand Down Expand Up @@ -299,14 +320,26 @@ def apply_autoround(self, state, modules):
ar_inputs = [((args, kwargs),) for args, kwargs in cur_inputs]
self._set_attention_masks(ar, decoding_layer, cur_inputs)
decoding_layer.tuning_device = device
# Leave offload for LLMC to handle if `device_ids` is not set
# Enable auto_offload when device_ids is explicitly set OR when
# GPUS_PER_GROUP > 1 (set by launch_multi_gpu.sh).
# This lets AutoRound load-balance the block's submodules
# across multiple GPUs within the rank.
auto_offload = False
if self.device_ids is not None:
# When device_ids is set, we move decoding layer to CPU first,
# then the submodules will be re-dispatched by AutoRound.
needs_multi_gpu = (
self.device_ids is not None
or _get_local_gpu_group_size() > 1
or torch.cuda.device_count() > 1
)
if needs_multi_gpu:
# Let AutoRound own placement within the rank-local GPU group.
device = get_main_device()
decoding_layer.to("cpu")
auto_offload = True

# Ensure cached inputs are on the same device as the block.
# Calibration forward may have run on a different GPU.
cur_inputs = self._move_inputs_to(cur_inputs, device)

q_input, _ = ar.quantize_block(
block=decoding_layer,
inputs=ar_inputs,
Expand Down Expand Up @@ -347,12 +380,21 @@ def get_unquantized_layer_names(self, wrapped_model: torch.nn.Module) -> list[st

def _update_device_map_for_dp(self, ar_kwargs):
if torch.distributed.is_initialized():
rank = torch.distributed.get_rank()
ar_kwargs["device_map"] = (
f"{torch.accelerator.current_accelerator().type}:{rank}"
if torch.accelerator.is_available()
else "cpu"
)
if self.device_ids is not None:
return # user explicitly set device_ids, respect it
gpus_per_group = _get_local_gpu_group_size()
if gpus_per_group > 1:
local_rank = int(os.environ.get("LOCAL_RANK", "0"))
start_gpu = local_rank * gpus_per_group
ar_kwargs["device_map"] = ",".join(
str(start_gpu + i) for i in range(gpus_per_group)
)
else:
ar_kwargs["device_map"] = (
f"{torch.accelerator.current_accelerator().type}:0"
if torch.accelerator.is_available()
else "cpu"
)

def _unwrapper_quantized_layer(self, model: torch.nn.Module):
# auto-round will return WrapperWALayer if activation is quantized
Expand All @@ -376,6 +418,24 @@ def _remove_temporary_names(self, model: torch.nn.Module):
if hasattr(mod, "_tmp_name"):
del mod._tmp_name

@staticmethod
def _move_inputs_to(
inputs: list[tuple[tuple, dict]], device: torch.device
) -> list[tuple[tuple, dict]]:
"""Move all tensors in cached forward inputs to *device*."""
return [
(
tuple(
x.to(device) if isinstance(x, torch.Tensor) else x for x in args
),
{
k: v.to(device) if isinstance(v, torch.Tensor) else v
for k, v in kwargs.items()
},
)
for args, kwargs in inputs
]

def _is_decoding_layer(self, module: torch.nn.Module) -> bool:
return module.__class__.__name__ in self._sequential_targets

Expand Down
2 changes: 1 addition & 1 deletion src/llmcompressor/utils/dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ def get_main_device() -> torch.device:

elif torch.accelerator.is_available():
accel_type = torch.accelerator.current_accelerator().type
return torch.device(accel_type, rank)
return torch.device(accel_type, torch.accelerator.current_device_index())
else:
logger.warning("No accelerator available! Compressing model on CPU instead")
return torch.device("cpu")
Expand Down
Loading