diff --git a/examples/autoround/ddp/ddp_qwen3_moe_example.py b/examples/autoround/ddp/ddp_qwen3_moe_example.py new file mode 100644 index 0000000000..a5330a92ff --- /dev/null +++ b/examples/autoround/ddp/ddp_qwen3_moe_example.py @@ -0,0 +1,117 @@ +""" +DDP AutoRound quantization example for large MoE models. + +Uses the standard compressed-tensors DDP path: load_offloaded_model() +broadcasts weights from rank 0 to rank 1. GPUS_PER_GROUP controls how +many GPUs each rank uses for per-block model parallelism. + +Run with: + CUDA_VISIBLE_DEVICES=0,1,2,3 GPUS_PER_GROUP=2 torchrun \ + --nproc_per_node=2 ddp_qwen3_moe_example.py +""" + +import os +import time + +import torch +import torch.distributed as dist +from compressed_tensors.offload import load_offloaded_model +from loguru import logger +from transformers import AutoModelForCausalLM, AutoTokenizer +from llmcompressor import oneshot + +MODEL = "Qwen/Qwen3-235B-A22B-Instruct-2507" +SCHEME = "W4A16" +ITERS = 200 +NSAMPLES = 256 + +###### DDP INIT ##### +gpus_per_group = int(os.environ.get("GPUS_PER_GROUP", "1")) +local_rank = int(os.environ["LOCAL_RANK"]) +main_gpu = local_rank * gpus_per_group +torch.cuda.set_device(main_gpu) +dist.init_process_group( + backend="nccl", + init_method="env://", + device_id=torch.device(f"cuda:{main_gpu}"), +) + +rank = dist.get_rank() +world_size = dist.get_world_size() +logger.info( + f"[Rank {rank}/{world_size}] GPUs: {torch.cuda.device_count()}, " + f"main_gpu: {main_gpu}, group: [{main_gpu}-{main_gpu + gpus_per_group - 1}]" +) + +###### MODEL LOAD ##### +load_start = time.perf_counter() +with load_offloaded_model(): + model = AutoModelForCausalLM.from_pretrained( + MODEL, dtype="auto", device_map="auto_offload", + ) +logger.info(f"[Rank {rank}] Loaded in {time.perf_counter() - load_start:.1f}s") + +tokenizer = AutoTokenizer.from_pretrained(MODEL) + +###### DATASET ##### +os.environ["AR_DISABLE_DATASET_SUBPROCESS"] = "1" +from auto_round.calib_dataset import get_dataset +from llmcompressor.modifiers.autoround import AutoRoundModifier + +ds = get_dataset(tokenizer=tokenizer, seqlen=2048, nsamples=NSAMPLES) + +###### RECIPE ##### + +recipe = AutoRoundModifier( + targets="Linear", + scheme=SCHEME, + ignore=["lm_head", "re:.*mlp.gate$"], + iters=ITERS, + enable_torch_compile=False, +) + +###### QUANTIZE ##### +logger.info(f"[Rank {rank}] Starting oneshot...") +quant_start = time.perf_counter() +oneshot( + model=model, + dataset=ds, + recipe=recipe, + max_seq_length=2048, + num_calibration_samples=NSAMPLES, + shuffle_calibration_samples=False, +) +logger.info(f"[Rank {rank}] Quantization done in {time.perf_counter() - quant_start:.1f}s") + +###### SAVE ##### +# Both ranks must participate — save_pretrained internally calls +# collectives (broadcast_object_list). Only rank 0 writes to disk. +save_dir = ( + MODEL.rstrip("/").split("/")[-1] + + f"-{SCHEME}-AutoRound" + + f"-iters{ITERS}-nsamples{NSAMPLES}" + + f"-DDP{world_size}" +) +logger.info(f"[Rank {rank}] Saving to {save_dir}...") +model.save_pretrained(save_dir, save_compressed=True) +if rank == 0: + tokenizer.save_pretrained(save_dir) +logger.info(f"[Rank {rank}] Saved to {save_dir}") + +if dist.is_initialized(): + dist.barrier() + dist.destroy_process_group() + +###### SAMPLE GENERATION (rank 0 only) ##### +if rank == 0: + from compressed_tensors.offload import dispatch_model + + logger.info("========== SAMPLE GENERATION ==============") + dispatch_model(model) + sample = tokenizer("Hello my name is", return_tensors="pt") + sample = {key: value.to(model.device) for key, value in sample.items()} + output = model.generate(**sample, max_new_tokens=100) + logger.info(tokenizer.decode(output[0])) + logger.info("==========================================") + +logger.info(f"[Rank {rank}] SUCCESS") diff --git a/src/llmcompressor/modifiers/autoround/base.py b/src/llmcompressor/modifiers/autoround/base.py index 1058b2eece..d61859001c 100644 --- a/src/llmcompressor/modifiers/autoround/base.py +++ b/src/llmcompressor/modifiers/autoround/base.py @@ -1,4 +1,5 @@ from contextlib import contextmanager +import os import torch import torch.nn as nn @@ -8,6 +9,7 @@ from auto_round.utils import check_to_quantized from auto_round.wrapper import WrapperWALayer from compressed_tensors.offload import get_execution_device, get_offloaded_device +from compressed_tensors.offload.cache.base import OffloadCache from compressed_tensors.offload.module import offload_module, remove_module_offload from compressed_tensors.quantization import ( QuantizationMetadata, @@ -16,6 +18,7 @@ enable_quantization, ) from compressed_tensors.utils import align_module_device, match_named_modules +from llmcompressor.utils.dev import get_main_device from loguru import logger from pydantic import PrivateAttr @@ -30,6 +33,15 @@ __all__ = ["AutoRoundModifier", "fix_batch_if_needed"] +def _get_local_gpu_group_size() -> int: + return int( + os.environ.get( + "GPUS_PER_GROUP", + os.environ.get("GPUS_PER_RANK", "1"), + ) + ) + + class _LLModelWrapper(torch.nn.Module): def __init__(self): super().__init__() @@ -79,7 +91,7 @@ def suspend_offloading(model: nn.Module): """ offloading_info = dict() for name, module in model.named_modules(): - if not hasattr(module, "weight"): # skip SiLU or other non-weight layers + if not isinstance(module._parameters, OffloadCache): continue offloading_info[name] = ( get_execution_device(module), @@ -90,7 +102,7 @@ def suspend_offloading(model: nn.Module): yield for name, module in model.named_modules(): - if not hasattr(module, "weight"): # skip SiLU or other non-weight layers + if name not in offloading_info: continue offload_module(module, *offloading_info[name]) @@ -176,9 +188,18 @@ def on_initialize(self, state: State, **kwargs) -> bool: :param state: session state storing input model and calibration data """ - # apply config to model and prepare calibration hooks + # apply config to model and prepare calibration hooks. + # Wrap in disable_onloading to suppress DistributedCPUCache's + # per-param broadcast+barrier when creating quant params (scale, + # zero_point). With GPUS_PER_GROUP > 1, modules have varying GPU + # execution devices, causing GPU→CPU copy timing to vary between + # ranks → broadcast deadlock. Quant params are deterministic — + # each rank computes identical values, no sync needed. if QuantizationMixin.has_config(self): - QuantizationMixin.initialize_quantization(self, state.model) + from compressed_tensors.offload import disable_onloading + + with disable_onloading(): + QuantizationMixin.initialize_quantization(self, state.model) # prepare module names self._add_temporary_names(state.model) @@ -299,14 +320,26 @@ def apply_autoround(self, state, modules): ar_inputs = [((args, kwargs),) for args, kwargs in cur_inputs] self._set_attention_masks(ar, decoding_layer, cur_inputs) decoding_layer.tuning_device = device - # Leave offload for LLMC to handle if `device_ids` is not set + # Enable auto_offload when device_ids is explicitly set OR when + # GPUS_PER_GROUP > 1 (set by launch_multi_gpu.sh). + # This lets AutoRound load-balance the block's submodules + # across multiple GPUs within the rank. auto_offload = False - if self.device_ids is not None: - # When device_ids is set, we move decoding layer to CPU first, - # then the submodules will be re-dispatched by AutoRound. + needs_multi_gpu = ( + self.device_ids is not None + or _get_local_gpu_group_size() > 1 + or torch.cuda.device_count() > 1 + ) + if needs_multi_gpu: + # Let AutoRound own placement within the rank-local GPU group. + device = get_main_device() decoding_layer.to("cpu") auto_offload = True + # Ensure cached inputs are on the same device as the block. + # Calibration forward may have run on a different GPU. + cur_inputs = self._move_inputs_to(cur_inputs, device) + q_input, _ = ar.quantize_block( block=decoding_layer, inputs=ar_inputs, @@ -347,12 +380,21 @@ def get_unquantized_layer_names(self, wrapped_model: torch.nn.Module) -> list[st def _update_device_map_for_dp(self, ar_kwargs): if torch.distributed.is_initialized(): - rank = torch.distributed.get_rank() - ar_kwargs["device_map"] = ( - f"{torch.accelerator.current_accelerator().type}:{rank}" - if torch.accelerator.is_available() - else "cpu" - ) + if self.device_ids is not None: + return # user explicitly set device_ids, respect it + gpus_per_group = _get_local_gpu_group_size() + if gpus_per_group > 1: + local_rank = int(os.environ.get("LOCAL_RANK", "0")) + start_gpu = local_rank * gpus_per_group + ar_kwargs["device_map"] = ",".join( + str(start_gpu + i) for i in range(gpus_per_group) + ) + else: + ar_kwargs["device_map"] = ( + f"{torch.accelerator.current_accelerator().type}:0" + if torch.accelerator.is_available() + else "cpu" + ) def _unwrapper_quantized_layer(self, model: torch.nn.Module): # auto-round will return WrapperWALayer if activation is quantized @@ -376,6 +418,24 @@ def _remove_temporary_names(self, model: torch.nn.Module): if hasattr(mod, "_tmp_name"): del mod._tmp_name + @staticmethod + def _move_inputs_to( + inputs: list[tuple[tuple, dict]], device: torch.device + ) -> list[tuple[tuple, dict]]: + """Move all tensors in cached forward inputs to *device*.""" + return [ + ( + tuple( + x.to(device) if isinstance(x, torch.Tensor) else x for x in args + ), + { + k: v.to(device) if isinstance(v, torch.Tensor) else v + for k, v in kwargs.items() + }, + ) + for args, kwargs in inputs + ] + def _is_decoding_layer(self, module: torch.nn.Module) -> bool: return module.__class__.__name__ in self._sequential_targets diff --git a/src/llmcompressor/utils/dev.py b/src/llmcompressor/utils/dev.py index 9d6bd19953..97d3a8c1a2 100644 --- a/src/llmcompressor/utils/dev.py +++ b/src/llmcompressor/utils/dev.py @@ -160,7 +160,7 @@ def get_main_device() -> torch.device: elif torch.accelerator.is_available(): accel_type = torch.accelerator.current_accelerator().type - return torch.device(accel_type, rank) + return torch.device(accel_type, torch.accelerator.current_device_index()) else: logger.warning("No accelerator available! Compressing model on CPU instead") return torch.device("cpu")