Skip to content

[RFC] Integrating TransferQueue into Vime: Decoupling Rollout and Training with a Streaming Data Bus #227

Description

@miracle0517

Summary

This RFC proposes integrating TransferQueue into Vime as an optional data channel. TransferQueue would carry rollout samples across production, filtering, retry/backfill, and training consumption. The goal is not to replace vLLM, Megatron, or the existing rollout API. Instead, it adds a pluggable data bus outside the existing DataSource / RolloutFnTrainOutput boundaries, so Vime can gradually evolve from round-based rollout to streaming / near-on-policy / fully-async training modes.

Core goals:

  1. Streaming data consumption: once a rollout group is generated and has completed reward/filter processing, it can enter the training-side consumption queue without waiting for the full rollout round to finish.
  2. Controlled staleness: use policy_version / weight_version to bound the version gap between training samples and the current actor weights, supporting strict on-policy, near-on-policy, and fully-async modes.
  3. Preserve existing Vime contracts: the default path remains unchanged; integration points should first reuse --data-source-path, --rollout-function-path, Sample, RolloutDataSourceWithBuffer, and vime.rollout.vllm_rollout.
  4. Support future cross-cluster rollout: reserve a unified data plane for multiple rollout producers, elastic rollout engines, and training-sample production across Ray jobs or clusters.

Phased rollout:

  • Phase 1: use TransferQueue as an external Data Buffer, replacing only the backfill path for aborted/partial/overflow samples while keeping the training cadence unchanged.
  • Phase 2: use TransferQueue as a streaming rollout queue. Rollout producers continuously write samples, and the training side reads ready groups by batch.
  • Phase 3: use TransferQueue as a cross-service data bus, supporting external rollout clusters, multiple producers, elastic scaling, and more complete async training.

Motivation

Current Approach (Vime - Data Buffer + Round-based Rollout)

Vime's current main path uses RolloutManager to start the vLLM router/engine, calls vime.rollout.vllm_rollout.generate_rollout to produce a complete rollout batch, and then converts list[list[Sample]] into Megatron training data. RolloutDataSourceWithBuffer handles prompt reading and partial rollout backfill. fully_async_rollout already has an in-process background worker, but it still depends on the same data_buffer and the same RolloutManager lifecycle.

This design is simple and stable, and it works well for strict on-policy and synchronous training. However, in long-tail, agentic, multi-turn, multi-model, or external rollout scenarios, it has several bottlenecks:

Problem Impact
rollout round barrier The training side usually waits for the entire batch to finish generation; long-tail samples can block the current training round.
data buffer is bound inside RolloutManager Samples are hard to share across Ray actors, Ray jobs, or independent rollout clusters.
lack of per-field / sub-sample visibility Intermediate artifacts such as reference logprobs, rewards, loss masks, and metadata cannot be consumed independently by streaming tasks.
partial / aborted samples are backfilled only in the local buffer Failure recovery, resume behavior, and cross-process continued generation have unclear boundaries.
async staleness lacks unified data semantics A fully-async rollout function can be written, but sample versioning, consumption deduplication, leases, and replay must be implemented separately for each path.

Why TransferQueue

TransferQueue separates the control plane from the data plane. The control plane tracks the ready/consumed state of sample fields, while the data plane stores actual data through a pluggable storage backend. Its 2D row/column data model naturally fits RL post-training: each row is a sample, and each column is a task artifact, such as prompt, tokens, response, reward, logprobs, loss mask, or metadata.

For Vime, the value of TransferQueue is not changing the training or inference backends. It is adding a general streaming data plane:

  • rollout producers write completed Sample fields;
  • trainer consumers read trainable samples by group, version, and staleness;
  • partial / failed / aborted samples can be backfilled or retried via metadata;
  • multiple producers can scale independently while still appearing to the training side as one data source.

Proposed Solution

Overall Architecture

                 weight update / policy version
        +----------------------------------------------+
        |                                              |
        v                                              |
+-----------------+      write ready fields      +----------------------+
| Rollout Producer| ---------------------------> | TransferQueue         |
| vLLM + RM/filter|                              | controller + storage  |
+-----------------+                              +----------------------+
        ^                                                   |
        | prompt / retry                                    | get ready groups
        |                                                   v
+-----------------+                              +----------------------+
| Prompt Data     |                              | TransferQueueDataSource|
| local dataset   |                              | list[list[Sample]]     |
+-----------------+                              +----------------------+
                                                            |
                                                            v
                                                 +----------------------+
                                                 | Vime training path    |
                                                 | convert + Megatron    |
                                                 +----------------------+

Design principles:

  • Sample remains the canonical object inside Vime. TransferQueue is only the data carrier across actors or services.
  • The default path is unaffected. When TransferQueue is not enabled, the existing behavior of RolloutDataSourceWithBuffer, vllm_rollout, and fully_async_rollout remains unchanged.
  • Adapt data first, split services later. Phase 1 and Phase 2 can both be completed inside a single Ray job; Phase 3 introduces external rollout producers.
  • Prioritize group atomicity. Algorithms such as GRPO / DAPO compute advantages by prompt group, so all samples inside a group must use compatible policy_version and reward/filter semantics.

Data Model

Row Identity

Each training sample maps to one row in TransferQueue. Vime needs to explicitly write the following index fields:

Field Description
partition_id Logical namespace, such as train, eval/<name>, or retry.
global_index TransferQueue row index, used to uniquely locate data.
group_index Vime prompt group index, preserving n_samples_per_prompt grouping.
sample_index Vime Sample.index.
rollout_id Vime rollout id that produced this sample; in fan-out scenarios, subsamples from the same trajectory share it.
policy_version Actor weight version used by the rollout engine when generating this sample.
producer_id Rollout producer identifier for debugging and failure recovery.

Columns

Phase 1 can start with a sample_blob implementation to reduce integration risk. Starting in Phase 2, samples should be split into columnar fields so downstream consumers can read only what they need.

Column Vime Field Required For
prompt Sample.prompt prompt replay / debug
tokens Sample.tokens required for training
response Sample.response logging, debug, reward replay
response_length Sample.response_length required for training
reward Sample.reward required for training; supports float or dict
loss_mask Sample.loss_mask partial rollout / agentic loss masking
status Sample.status training filtering and retry
rollout_log_probs Sample.rollout_log_probs off-policy correction / rollout logprob
rollout_routed_experts Sample.rollout_routed_experts MoE routing replay
multimodal_train_inputs Sample.multimodal_train_inputs VLM / omni-modal training
metadata Sample.metadata trace, tool results, verifier details, debug information
weight_versions Sample.weight_versions staleness metrics and consistency checks

Codec

Add a lightweight codec layer:

Sample <--> TransferQueue row/columns

Suggested files:

File Role
vime/rollout/transferqueue_codec.py Bidirectional conversion between Sample and TransferQueue columns.
vime/rollout/transferqueue_data_source.py Implements DataSource, reading/backfilling groups from TransferQueue.
vime/rollout/transferqueue_rollout.py Phase 2 streaming consumer rollout entrypoint.
vime/utils/transferqueue_utils.py Client initialization, partition naming, and serialization helpers.

Phase 1: TransferQueue as External Data Buffer

Core Idea

Use TransferQueue as an optional external rollout training-data buffer between RolloutManager and Megatron training actors, while keeping the existing rollout-round cadence and training loop behavior unchanged.

Current:

RolloutManager.generate()
  -> convert samples to train data
  -> split train data by DP rank
  -> return Ray ObjectRefs to training actors

Phase 1:

RolloutManager.generate()
  -> convert samples to train data
  -> write full train batch to TransferQueue partition train_{rollout_id}
  -> training actors fetch DP-local data from TransferQueue

This phase does not introduce streaming rollout production, external rollout clusters, lease/ack semantics, or async staleness policy. It only replaces the rollout-data transfer path with a TQ-backed data buffer when explicitly enabled.

Activation Condition

--enable-vime-transfer-queue

TransferQueue is disabled by default. When disabled, Vime continues to use the existing Ray ObjectRef rollout-data path.

The current implementation also supports:

--num-data-storage-units
--max-staleness
--polling-mode / --no-polling-mode
--transfer-queue-staleness-poll-interval
--transfer-queue-extra-data-fields

--enable-vime-transfer-queue is intentionally VIME-specific to avoid conflicting with vLLM transfer queue arguments.

Key Code Changes

File Changes
vime/utils/arguments.py Add TQ arguments and validation. TQ is disabled by default and is not supported with --debug-train-only / --load-debug-rollout-data.
vime/utils/transfer_queue.py Add TransferQueueBridge, which owns TQ initialization, actor connection, partition naming, TensorDict conversion, field selection, staleness backpressure, fetch, and cleanup helpers.
train.py Initialize TQ once before Ray actors start. The driver owns partition lifecycle and clears train_{rollout_id} after training consumers finish.
vime/ray/rollout.py When TQ is enabled, write converted rollout train data to TQ and return None instead of Ray ObjectRefs.
vime/backends/megatron_utils/actor.py When TQ is enabled, fetch rollout data from TQ, convert it back to the existing RolloutBatch format, move tensors to GPU, and train through the existing actor path.
vime/ray/actor_group.py Propagate TQ environment variables to Ray actors.
vime/ray/placement_group.py Keep placement behavior compatible with TQ-enabled runs.

Semantics

  • TransferQueueBridge.initialize(args) runs in the driver before rollout and training actors are created.
  • Ray actors call TransferQueueBridge.connect(args) and attach to the already-created TQ instance through args.tq_config.
  • Each rollout writes one partition named train_{rollout_id}.
  • Rollout data is written after RolloutManager converts Sample objects into train data.
  • Training actors request their DP-local batch from the same partition by dp_rank, task_name, and batch_index.
  • Only the model-parallel source rank fetches from TQ. The fetched payload is broadcast to the other TP / PP / CP ranks.
  • Fetched TensorDict payloads are converted back to Vime's list-based RolloutBatch structure before training.
  • The schedule fields global_batch_sizes, num_microbatches, and micro_batch_indices are generated on the training side when they are not present in the fetched payload.
  • The driver clears the TQ partition only after the rollout's training consumers finish.
  • --max-staleness limits how many unconsumed train_* partitions may exist before rollout waits.

The base rollout train-data contract requires:

tokens
response_lengths
loss_masks
rewards

The TQ bridge normalizes additional fields before write:

total_lengths
raw_reward
truncated
sample_indices

Mode-specific and optional fields are requested only when needed or configured:

rollout_log_probs
rollout_routed_experts
multimodal_train_inputs
teacher_log_probs
transfer_queue_extra_data_fields

Serialization rules:

  • scalar lists are stored as dense tensors;
  • variable-length numeric lists are stored as jagged nested tensors;
  • tensor lists are stacked or stored as jagged nested tensors;
  • rollout_routed_experts is normalized into a jagged int tensor representation;
  • non-tensor payloads such as metadata, multimodal_train_inputs, and prompt are stored with NonTensorData;
  • custom extra fields fall back to non-tensor storage when tensor conversion is not valid.

Phase 2: Streaming Rollout Queue

Core Idea

Split rollout production and training consumption into a producer-consumer model. The producer continuously calls the existing generate_and_rm_group to produce complete groups and writes them to TransferQueue. On the training side, generate_rollout no longer performs generation directly; instead, it reads ready groups from TransferQueue and returns RolloutFnTrainOutput.

TransferQueueRolloutProducer
  +- data_source.get_samples(over_sampling_batch_size)
  +- generate_and_rm_group(...)
  +- dynamic_sampling_filter(...)
  +- tq_put(group, policy_version)

transferqueue_rollout.generate_rollout
  +- tq_get_ready_groups(rollout_batch_size, max_staleness)
  +- optional rollout_sample_filter
  +- RolloutFnTrainOutput(samples=groups, metrics=...)

New Arguments

Argument Default Description
--transferqueue-mode {off,buffer,streaming,external} off TransferQueue integration mode.
--transferqueue-address None External TransferQueue controller / registry address; if empty, Vime starts it.
--transferqueue-partition train Training-sample partition.
--transferqueue-storage-backend simple Use SimpleStorage by default first; later support MooncakeStore / RayRDT and others.
--transferqueue-max-staleness 0 Maximum allowed gap between sample policy_version and current train version.
--transferqueue-lease-timeout-secs 600 Lease timeout after the training side takes samples.
--transferqueue-prefetch-groups 0 Number of groups to prefetch on the consumer side; 0 means no prefetch.
--transferqueue-ack-on-train-data true Ack after samples are converted to train data; failures wait for lease-timeout recovery.

Staleness Semantics

Definition:

staleness = current_train_policy_version - sample.policy_version

Consumption rules:

Mode Rule Use Case
strict on-policy max_staleness = 0; consume only samples generated by the current weights. Reproduce the existing synchronous semantics.
near-on-policy 0 < max_staleness <= K Trade a small amount of off-policy data for higher throughput.
fully async larger max_staleness; producers do not wait for every weight update. Long-running agentic / multi-turn scenarios.

Constraints:

  • Samples under the same group_index must have the same or compatible policy_version.
  • If --use-rollout-logprobs, --use-tis, or custom off-policy correction is enabled, policy_version and weight_versions should be passed to the post-processing logic.
  • When max_staleness=0, producers need to pause submission during weight updates, or place old-version samples into a non-consumable state.

Dynamic Filtering

Run the dynamic sampling filter on the producer side:

  1. finish generation + reward for a group;
  2. call dynamic_sampling_filter_path;
  3. write kept groups to the train partition;
  4. write dropped groups to dropped metadata or clear them directly.

This lets the training side see only ready trainable groups, without reimplementing DAPO / nonzero-std filtering logic.

Ack / Lease

The TransferQueue consumer needs to avoid duplicate training and sample loss on crashes:

  1. get_samples() obtains ready groups from TransferQueue and marks them as leased;
  2. after RolloutManager completes convert_samples_to_train_data, it calls optional ack_samples();
  3. if the process crashes or conversion fails, samples become ready again after the lease timeout;
  4. checkpoints save the current policy_version, partition, unacked leases, and sampler state.

Vime's current DataSource interface has no ack method. Phase 2 can use a compatible design: TransferQueueDataSource exposes ack_samples(), and RolloutManager calls it via hasattr(data_source, "ack_samples"); non-TransferQueue data sources are unaffected.


Phase 3: External / Elastic Rollout

Core Idea

The training Ray job and rollout Ray job can be separated. The Vime trainer only handles Megatron training, weight updates, and consuming samples from TransferQueue. Rollout producers run as independent services, connect to the vLLM router/engine, read prompts or task streams, and write into the same TransferQueue partition.

Proposed Flow

Trainer Job
  +- Megatron actor
  +- weight sync publisher
  +- TransferQueue consumer

Rollout Job(s)
  +- vLLM router/engines
  +- custom_generate / RM / verifier
  +- TransferQueue producer

Additional Requirements

Requirement Description
producer registry producer online/offline state, current policy version, throughput, and health status
weight version handshake producers must confirm that a specific actor version has been loaded before writing samples for that version
partition cleanup after a successful checkpoint, clean up old partitions / old-version rows
cross-cluster auth external TransferQueue address, storage backend, access permissions, and cleanup policy
eval isolation keep eval partitions independent from train partitions to prevent evaluation samples from entering training

Comparison: Vime Data Flow Modes

Dimension Current Data Buffer fully_async_rollout Phase 1: TQ Buffer Phase 2: TQ Streaming Phase 3: TQ External
Status main branch main branch planned planned planned
Storage scope RolloutManager memory RolloutManager process TransferQueue storage TransferQueue storage shared service
Generation barrier full rollout round background queue but same job unchanged reduced reduced
Cross-job producer no no no optional later yes
Staleness control implicit sync limited unchanged explicit max_staleness explicit max_staleness
Partial retry local buffer local buffer TransferQueue retry partition TransferQueue retry partition TransferQueue retry partition
Risk baseline low/medium low medium high

Correctness Invariants

  1. Complete sample schema: before training, each sample must at least contain tokens, response_length, reward, and status; when masks are required, it must also contain loss_mask.
  2. Do not split groups: algorithms that train by group must consume a complete n_samples_per_prompt group at once, and must not scatter samples from the same prompt across different train steps.
  3. Traceable versions: every sample must record policy_version; if vLLM returns weight_version, it should also be written into Sample.weight_versions.
  4. No duplicate consumption: the same sample can be acked only once within the same training task; lease timeout can only make unacked samples visible again.
  5. Default behavior is unchanged: when --transferqueue-mode off, Vime does not import TransferQueue and does not change existing DataSource, rollout, eval, or debug-dump behavior.
  6. Replayability: samples saved by --save-debug-rollout-data should be equivalent to samples returned by the TransferQueue consumer, so --load-debug-rollout-data can reproduce training-side issues.

Observability

Add metrics with the transferqueue/ prefix:

Metric Description
transferqueue/ready_groups currently trainable groups
transferqueue/inflight_groups leased but unacked groups
transferqueue/dropped_groups groups dropped by dynamic filtering
transferqueue/retry_groups partial / aborted backfill groups
transferqueue/get_latency consumer batch-fetch latency
transferqueue/put_latency producer write latency
transferqueue/sample_staleness/mean mean staleness of consumed samples
transferqueue/sample_staleness/max max staleness of consumed samples
transferqueue/producer_lag how far the producer's current version lags behind the train version

On the tracing side, add spans where producers write and consumers read:

  • transferqueue_put_group
  • transferqueue_get_groups
  • transferqueue_ack_groups
  • transferqueue_requeue_group

Open Questions

  1. Dependency model: should TransferQueue be an optional extra, such as pip install vime[transferqueue], or should the documentation ask users to install it manually?
  2. CLI shape: should Vime explicitly provide --transferqueue-mode, or should this be enabled only through a combination of --data-source-path / --rollout-function-path?
  3. Ack interface: is it acceptable for RolloutManager to optionally call data_source.ack_samples(), or should ack happen inside get_samples() with at-most-once semantics after crashes?
  4. Default storage backend: should Phase 1 support only SimpleStorage, with MooncakeStore / RayRDT introduced in Phase 2?
  5. Eval integration: should eval rollout use a separate TransferQueue partition, or keep using the current synchronous eval path?
  6. Multi-model fields: should reference logprobs, reward model output, and judge output become standard columns, or remain inside metadata?
  7. Staleness and algorithms: which algorithms should allow max_staleness > 0 by default? Do PPO / GRPO / DAPO need different defaults?
  8. Debug dump boundary: should TransferQueue row metadata also be written into --save-debug-rollout-data, making it easier to debug duplicate consumption, version drift, and producer issues?

References

Additional Context

No response

Pre-submission Checklist

Todo

Metadata

Metadata

Assignees

No one assigned

    Labels

    RFCProposal requiring discussion & approval before implementation.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions