[Feature] TransferQueue Integration for Rollout-to-Training#265
[Feature] TransferQueue Integration for Rollout-to-Training#265miracle0517 wants to merge 4 commits into
Conversation
Documentation build overview
4 files changed± advanced/vllm-config.html± developer_guide/profiling.html± get_started/quick_start.html± get_started/usage.html |
There was a problem hiding this comment.
Code Review
This pull request integrates TransferQueue as an optional rollout-to-training data plane to optimize data transfer, introducing the TransferQueueBridge and updating the training loop, Ray actor/placement groups, and command-line arguments. Feedback on these changes suggests adding a sleep interval to the busy-wait polling loop in _get_rollout_data_from_transfer_queue to prevent CPU exhaustion, using getattr defensively when accessing actor node and GPU arguments to avoid potential AttributeErrors, and explicitly checking for NPU availability to prevent an inefficient CPU fallback on Ascend platforms.
| def _get_rollout_data_from_transfer_queue(self, rollout_id: int) -> tuple[RolloutBatch, Any]: | ||
| task_name = "critic_train" if self.role == "critic" else "actor_train" | ||
| rollout_data = None | ||
| batch_meta = None | ||
| data_fields = ( | ||
| TransferQueueBridge.actor_train_data_fields(self.args) | ||
| if self.role == "actor" | ||
| else TransferQueueBridge.default_train_data_fields(self.args) | ||
| ) | ||
| while rollout_data is None: | ||
| rollout_data, batch_meta = self.transfer_queue.get_data( | ||
| rollout_id, | ||
| task_name=task_name, | ||
| data_fields=data_fields, | ||
| ) | ||
| return self._postprocess_transfer_queue_rollout_data(rollout_data), batch_meta |
There was a problem hiding this comment.
The polling loop in _get_rollout_data_from_transfer_queue busy-waits continuously when rollout_data is None. Because this loop runs on all model-parallel ranks and performs distributed broadcasts (_broadcast_payload which uses dist.broadcast_object_list), busy-waiting without any sleep will completely exhaust CPU resources, cause massive synchronization overhead, and potentially lead to system hangs or instability.\n\nAdding a small sleep (e.g., time.sleep(1.0)) when no data is returned will prevent this high CPU usage and allow other processes to run efficiently.
def _get_rollout_data_from_transfer_queue(self, rollout_id: int) -> tuple[RolloutBatch, Any]:\n import time\n task_name = "critic_train" if self.role == "critic" else "actor_train"\n rollout_data = None\n batch_meta = None\n data_fields = (\n TransferQueueBridge.actor_train_data_fields(self.args)\n if self.role == "actor"\n else TransferQueueBridge.default_train_data_fields(self.args)\n )\n while rollout_data is None:\n rollout_data, batch_meta = self.transfer_queue.get_data(\n rollout_id,\n task_name=task_name,\n data_fields=data_fields,\n )\n if rollout_data is None:\n time.sleep(1.0)\n return self._postprocess_transfer_queue_rollout_data(rollout_data), batch_meta| @classmethod | ||
| def data_parallel_size(cls, args: Namespace) -> int: | ||
| """Return the Megatron DP size used by TQ before Megatron is initialized.""" | ||
| world_size = args.actor_num_nodes * args.actor_num_gpus_per_node |
There was a problem hiding this comment.
Accessing args.actor_num_nodes and args.actor_num_gpus_per_node directly can raise an AttributeError if the arguments are not fully populated or if a custom namespace is used. To match the defensive pattern used for other model parallel parameters in this method, consider using getattr with safe defaults.
| world_size = args.actor_num_nodes * args.actor_num_gpus_per_node | |
| world_size = getattr(args, "actor_num_nodes", 1) * getattr(args, "actor_num_gpus_per_node", 8) |
| device = torch.device(f"cuda:{torch.cuda.current_device()}") if torch.cuda.is_available() else torch.device( | ||
| "cpu" | ||
| ) |
There was a problem hiding this comment.
On Ascend NPU platforms, standard PyTorch uses torch.npu instead of torch.cuda. If torch.cuda.is_available() returns False (which is common on NPU setups without explicit CUDA hijacking), the device will silently fall back to cpu. This fallback will degrade the performance of distributed broadcasts in _broadcast_payload.\n\nConsider checking for NPU availability explicitly to ensure the correct accelerator device is selected.
if torch.cuda.is_available():\n device = torch.device(f"cuda:{torch.cuda.current_device()}")\n elif hasattr(torch, "npu") and torch.npu.is_available():\n device = torch.device(f"npu:{torch.npu.current_device()}")\n else:\n device = torch.device("cpu")
✨ Summary
Introduce an optional VIME TransferQueue data path for transferring rollout data to training.
When enabled via
--enable-vime-transfer-queue, rollout batches are written directly into the TransferQueue. Megatron actor/critic workers then fetch their DP-local training data straight from TQ, bypassing the previous Ray ObjectRef rollout payload path.🔧 What’s Changed
TransferQueueBridge – A new central component that handles:
Rollout Workers now publish normalized rollout batches directly into the TQ.
Megatron Actor Workers now fetch rollout data from the TQ instead of relying on Ray ObjectRefs.
Local Training Schedule Preservation – After fetching from TQ, the following fields are kept intact:
global_batch_sizesnum_microbatchesmicro_batch_indicesBackpressure & Cleanup – TQ staleness backpressure is enforced, and explicit partition cleanup runs once consumers have finished.
Extended Field Support – Both tensor and non‑tensor rollout fields are supported, including multimodal inputs, metadata, prompts, routing replay data, and extra configured fields.