[Feature] TransferQueue Integration for Rollout-to-Training#242
[Feature] TransferQueue Integration for Rollout-to-Training#242miracle0517 wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces TransferQueueBridge as an optional, high-performance rollout-to-training data plane (TransferQueue) to replace the existing Ray ObjectRef path. It integrates this bridge across the training loop, rollout manager, and Megatron actor/critic backends, while adding corresponding CLI arguments and validation. Feedback on these changes highlights two key areas for improvement: first, a busy-wait loop in _get_rollout_data_from_transfer_queue should include a small sleep to prevent high CPU utilization and network congestion during collective broadcasts; second, dict_to_tensordict should explicitly handle numpy.ndarray elements to avoid slow fallback serialization via NonTensorData.
| while rollout_data is None: | ||
| rollout_data, batch_meta = self.transfer_queue.get_data( | ||
| rollout_id, | ||
| task_name=task_name, | ||
| data_fields=data_fields, | ||
| ) |
There was a problem hiding this comment.
The while rollout_data is None: loop busy-waits infinitely when the TransferQueue does not have data ready yet. Because get_data performs collective broadcasts (_broadcast_payload) across all ranks in the model parallel group, this tight loop will cause all ranks to continuously execute collective communications (dist.broadcast_object_list). This will peg 100% CPU on all training processes, cause severe network congestion, and potentially lead to desynchronization or NCCL timeouts.\n\nAdding a small sleep (e.g., time.sleep(0.1)) inside the loop when rollout_data is None will gracefully poll the queue and prevent CPU/network saturation.
while rollout_data is None:\n rollout_data, batch_meta = self.transfer_queue.get_data(\n rollout_id,\n task_name=task_name,\n data_fields=data_fields,\n )\n if rollout_data is None:\n import time\n time.sleep(0.1)| if value and isinstance(value[0], torch.Tensor): | ||
| tensors = [] | ||
| for item in value: | ||
| tensor = item.detach() | ||
| if tensor.device.type != "cpu": | ||
| tensor = tensor.cpu() | ||
| if device is not None: | ||
| tensor = tensor.to(device) | ||
| tensors.append(tensor) | ||
| if tensors[0].ndim == 0: | ||
| result[key] = torch.stack(tensors) | ||
| else: | ||
| result[key] = torch.nested.as_nested_tensor(tensors, layout=torch.jagged) | ||
| continue |
There was a problem hiding this comment.
In dict_to_tensordict, list elements that are numpy.ndarray are not handled by the isinstance(value[0], torch.Tensor) check. Consequently, they fall back to the nesting_depth checks. If the numpy arrays have different shapes (e.g., variable sequence lengths), tensor_1d or tensor_2d will raise a ValueError, causing the field to silently fall back to non_tensor_stack (which wraps them in NonTensorData).\n\nNonTensorData uses standard Python pickling, which is extremely slow, lacks zero-copy serialization, and cannot be easily moved to GPU. Supporting numpy.ndarray directly in the tensor check by converting them to torch.Tensor using torch.from_numpy will keep them as highly efficient nested/jagged tensors.
if value and isinstance(value[0], (torch.Tensor, np.ndarray)):\n tensors = []\n for item in value:\n if isinstance(item, np.ndarray):\n tensor = torch.from_numpy(item)\n else:\n tensor = item.detach()\n if tensor.device.type != "cpu":\n tensor = tensor.cpu()\n if device is not None:\n tensor = tensor.to(device)\n tensors.append(tensor)\n if tensors[0].ndim == 0:\n result[key] = torch.stack(tensors)\n else:\n result[key] = torch.nested.as_nested_tensor(tensors, layout=torch.jagged)\n continueacfb259 to
63e5d80
Compare
|
Hi, thanks for the contribution, is there any experiment results? |
The test results are here: #244 |
|
This scale is not big enough, could you test with bigger vl models? |
We will conduct follow-up tests on multi-machine scenarios with the vl model to obtain performance data. Thank you. |
Cool, thank you. |
✨ Summary
Introduce an optional VIME TransferQueue data path for transferring rollout data to training.
When enabled via
--enable-vime-transfer-queue, rollout batches are written directly into the TransferQueue. Megatron actor/critic workers then fetch their DP-local training data straight from TQ, bypassing the previous Ray ObjectRef rollout payload path.🔧 What’s Changed
TransferQueueBridge – A new central component that handles:
Rollout Workers now publish normalized rollout batches directly into the TQ.
Megatron Actor Workers now fetch rollout data from the TQ instead of relying on Ray ObjectRefs.
Local Training Schedule Preservation – After fetching from TQ, the following fields are kept intact:
global_batch_sizesnum_microbatchesmicro_batch_indicesBackpressure & Cleanup – TQ staleness backpressure is enforced, and explicit partition cleanup runs once consumers have finished.
Extended Field Support – Both tensor and non‑tensor rollout fields are supported, including multimodal inputs, metadata, prompts, routing replay data, and extra configured fields.