Skip to content

Latest commit

 

History

History
286 lines (212 loc) · 8.98 KB

File metadata and controls

286 lines (212 loc) · 8.98 KB

RolloutWorkflow Reference

This document describes the RolloutWorkflow abstraction, the core interface for implementing rollout generation in AReaL's reinforcement learning pipeline.

Notes:

  1. This page targets developers seeking a deep understanding of the codebase. For agentic RL training, use the high-level API described in the Agentic RL Guide.

  2. Legacy pattern: Directly subclassing RolloutWorkflow is considered legacy and should not be used proactively. For new agentic RL workflows, use the agent workflow pattern with async def run() instead.

Overview

A RolloutWorkflow defines how to generate training trajectories from input data. It encapsulates the logic for:

  • Tokenizing prompts and preparing model inputs
  • Calling the inference engine to generate completions
  • Computing rewards for generated outputs
  • Packaging results into tensor dictionaries for training

Interface

from areal.api.workflow_api import RolloutWorkflow

class RolloutWorkflow(ABC):
    @abstractmethod
    async def arun_episode(
        self, engine: InferenceEngine, data: dict[str, Any]
    ) -> dict[str, Any] | None | dict[str, InteractionWithTokenLogpReward]:
        """Run a single episode of the workflow."""
        ...

Parameters

Parameter Type Description
engine InferenceEngine Inference engine for generating model responses
data dict[str, Any] A single sample from the dataloader

Return Types

The arun_episode method supports three return types:

Return Type Description
dict[str, torch.Tensor] Standard tensor format for training
dict[str, InteractionWithTokenLogpReward] Token-level interactions (auto-converted to tensors); produced by the high-level ArealOpenAI API
None Rejected trajectory, excluded from training

Tensor Dictionary Format

When returning a tensor dictionary, the following fields are expected:

Field Shape Type Required Description
input_ids [batch_size, seq_len] int32 Yes Token IDs (prompt + completion)
attention_mask [batch_size, seq_len] bool Yes Valid token mask
loss_mask [batch_size, seq_len] int32 No Completion token mask (1 = train)
logprobs [batch_size, seq_len] float32 No Log probabilities per token
rewards [batch_size] float32 No Per-sequence rewards
versions [batch_size, seq_len] int32 No Weight version when token generated

Example return value:

return {
    "input_ids": torch.tensor([[1, 2, 3, 4, 5]], dtype=torch.int32),
    "attention_mask": torch.ones(1, 5, dtype=torch.bool),
    "loss_mask": torch.tensor([[0, 0, 1, 1, 1]], dtype=torch.int32),
    "logprobs": torch.tensor([[0.0, 0.0, -0.5, -0.3, -0.2]], dtype=torch.float32),
    "rewards": torch.tensor([1.0], dtype=torch.float32),
    "versions": torch.tensor([[0, 0, 1, 1, 1]], dtype=torch.int32),
}

Workflow Context

Inside arun_episode, access the execution context via the workflow_context module. Each workflow instance has its own isolated context:

from areal.infra import workflow_context

async def arun_episode(self, engine, data):
    # Get current execution context
    ctx = workflow_context.get()

    # Check if running in evaluation mode
    if ctx.is_eval:
        # Use different parameters for evaluation
        ...

    # Get task ID for logging
    task_id = ctx.task_id

    # Get stats scope based on mode ("rollout" or "eval-rollout")
    scope = workflow_context.stat_scope()

Trajectory Dumping

When InferenceEngineConfig.dump_to_file=True, trajectories are automatically saved to disk for debugging and analysis.

Configuration

rollout:
  dump_to_file: true
  fileroot: "/path/to/logs"
  tokenizer_path: "model/tokenizer" # Required for text decoding

Output Location

Trajectories are saved to:

{fileroot}/{experiment_name}/{trial_name}/[rollout|eval-rollout]/{version}/{task_id}.jsonl

Example:

/tmp/areal/my_exp/trial1/rollout/5/42.jsonl

Output Format

Each line in the JSONL file contains:

{
  "task_id": 42,
  "sample_idx": 0,
  "seqlen": 256,
  "prompt_len": 128,
  "head_version": 5,
  "tail_version": 5,
  "reward": 1.0,
  "prompt": "<|im_start|>user\nWhat is 2+2?<|im_end|>\n<|im_start|>assistant\n",
  "completion": "The answer is 4.<|im_end|>"
}

Grouped Rollout

Grouped rollout runs the same workflow multiple times per input prompt, producing diverse completions for training. This is useful for algorithms like GRPO that benefit from multiple samples per prompt.

Configuration

Set group_size when submitting rollouts:

engine.submit(
    data=sample,
    workflow=MyWorkflow,
    workflow_kwargs={...},
    group_size=4,  # Run workflow 4 times per input
)

Or via CLI:

rollout:
  group_size: 4

How It Works

When group_size > 1, the workflow is wrapped in GroupedRolloutWorkflow:

  1. The wrapper runs arun_episode concurrently group_size times using asyncio.gather
  2. Results are merged based on their type:
    • Tensor dictionaries: Concatenated along the batch dimension
    • InteractionWithTokenLogpReward dicts: Merged into a single dictionary
  3. If some runs return None (rejected), only valid results are kept
  4. If all runs return None, the entire grouped result is None

Output Shape

With group_size=4 and a workflow returning [1, seq_len] tensors, the grouped output has shape [4, seq_len] (4 samples concatenated).

Implementation

From areal/infra/remote_inf_engine.py:

class GroupedRolloutWorkflow(RolloutWorkflow):
    async def arun_episode(self, engine, data):
        # Run N times concurrently
        results = await asyncio.gather(
            *[self.workflow.arun_episode(engine, data)
              for _ in range(self.group_size)]
        )

        # Filter None results
        valid_results = [r for r in results if r is not None]
        if not valid_results:
            return None

        # Merge based on result type
        if all_interaction_dicts(valid_results):
            return merge_dicts(valid_results)
        else:
            return concat_padded_tensors(valid_results)

Implementing Custom Workflows

To create a custom workflow:

  1. Subclass RolloutWorkflow:
from areal.api.workflow_api import RolloutWorkflow

class MyWorkflow(RolloutWorkflow):
    def __init__(self, tokenizer, gconfig, **kwargs):
        self.tokenizer = tokenizer
        self.gconfig = gconfig

    async def arun_episode(self, engine, data):
        # 1. Prepare input
        input_ids = self.tokenizer.encode(data["prompt"])

        # 2. Generate completion
        req = ModelRequest(
            rid=uuid.uuid4().hex,
            input_ids=input_ids,
            gconfig=self.gconfig,
            tokenizer=self.tokenizer,
        )
        resp = await engine.agenerate(req)

        # 3. Compute reward
        reward = self.compute_reward(resp, data)

        # 4. Return tensor dict (or None to reject)
        if reward < 0:
            return None

        return self.build_tensor_dict(resp, reward)
  1. Register with trainer:
trainer.train(
    workflow=MyWorkflow,
    workflow_kwargs={
        "tokenizer": tokenizer,
        "gconfig": config.gconfig,
    },
)

Workflow Resolution

Workflows can be specified in multiple ways:

Format Example Description
Instance MyWorkflow(...) Pre-instantiated workflow
Class MyWorkflow Class (requires kwargs)
String path "my_module.MyWorkflow" Dynamic import
Agent workflow Any class with async def run() Wrapped with proxy support

The training system automatically resolves these to RolloutWorkflow instances.

See Also