Skip to content

Latest commit

 

History

History
210 lines (154 loc) · 6.21 KB

File metadata and controls

210 lines (154 loc) · 6.21 KB

Custom Agent Workflows

This guide shows how to create custom agents for RL training. AReaL supports any agent framework (OpenAI Agents SDK, LangChain, CAMEL-AI, etc.) with minimal integration.

Notes:

  1. Agent workflows are supported on local and slurm schedulers only. The ray scheduler is incompatible with the HTTP proxy architecture.

  2. For internal architecture details, see the Agent Workflow Reference.

Quick Start

An agent workflow is any class with an async def run(data, **extra_kwargs) method that returns a reward. AReaL automatically wraps it for RL training.

class MyAgent:
    async def run(self, data, **extra_kwargs):
        # Get injected client and URL
        http_client = extra_kwargs.get("http_client")
        base_url = extra_kwargs.get("base_url") or os.getenv("OPENAI_BASE_URL")
        api_key = extra_kwargs.get("api_key") or os.getenv("OPENAI_API_KEY")

        # Use standard OpenAI SDK
        client = AsyncOpenAI(
            base_url=base_url,
            api_key=api_key,
            http_client=http_client,
            max_retries=0,
        )

        response = await client.chat.completions.create(
            model="default",
            messages=data["messages"],
        )

        # Return reward (float or dict[str, float])
        return compute_reward(response, data["answer"])

Pass the agent to the trainer:

trainer.train(workflow="my_module.MyAgent")

Method Signature

The run method must follow this signature:

async def run(self, data: dict, **extra_kwargs) -> float | dict[str, float]
Parameter Description
data A sample from your dataset (dict with your data keys)
extra_kwargs AReaL-injected arguments (see below)
Return float: reward for last completion
dict[str, float]: maps completion IDs to rewards

Injected Arguments

AReaL injects these arguments via extra_kwargs:

Key Type Description
base_url str URL to AReaL's proxy server
api_key str Session-wise API key to AReaL's proxy server
http_client httpx.AsyncClient Shared HTTP client (reduces overhead)

Execution Modes

AReaL supports two execution modes, configured via rollout.openai.mode:

Inline Mode (Default)

The agent runs in the same process as the rollout worker. Recommended for most use cases.

rollout:
  openai:
    mode: inline

Requirements:

  • The run method must be async
  • Use extra_kwargs["base_url"] for LLM calls
  • Optionally use extra_kwargs["http_client"] to reduce overhead

Advantages:

  • No serialization overhead
  • Direct access to shared HTTP client
  • Lower latency

Subprocess Mode

The agent runs in a separate process pool. Use this when your agent code is not async-compatible or uses libraries that conflict with the main process.

rollout:
  openai:
    mode: subproc
    subproc_max_workers: 4  # Process pool size

Requirements:

  • The agent class must be picklable (serializable)
  • Read OPENAI_BASE_URL from environment instead of extra_kwargs

Example:

import os
from openai import OpenAI  # Sync client is OK

class MySyncAgent:
    async def run(self, data, **extra_kwargs):
        # In subproc mode, base_url and api_key come from environment
        client = OpenAI(
            base_url=os.getenv("OPENAI_BASE_URL"),
            api_key=os.getenv("OPENAI_API_KEY"),
            api_key="DUMMY",  # Not used by AReaL
        )

        response = client.chat.completions.create(
            model="default",
            messages=data["messages"],
        )

        return compute_reward(response, data["answer"])

Note: The method signature remains async def run(...) even in subprocess mode, but AReaL wraps the call with asyncio.run() internally. You can use synchronous code inside the method.

Trade-offs:

  • Pickling overhead for agent and data
  • No access to shared HTTP client
  • Higher latency per call
  • Useful for non-async libraries or process isolation

Reward Assignment

Simple Reward

Return a single float to assign reward to the last LLM completion:

async def run(self, data, **extra_kwargs):
    # ... agent logic ...
    return 1.0 if is_correct else 0.0

Per-Completion Rewards

For multi-turn conversations, return a dict mapping completion IDs to rewards:

async def run(self, data, **extra_kwargs):
    # ... multi-turn agent logic ...
    return {
        "completion-id-1": 0.5,
        "completion-id-2": 1.0,
    }

Access completion IDs from the response:

response = await client.chat.completions.create(...)
completion_id = response.id  # Use this ID for reward mapping

Configuration

Agent workflow settings are in rollout.openai:

rollout:
  openai:
    mode: inline              # "inline" or "subproc"
    turn_discount: 0.9        # Reward discount for earlier turns
    export_style: individual  # "individual" or "concat"
    subproc_max_workers: 4    # Process pool size (subproc mode only)
Field Default Description
mode inline Execution mode
turn_discount 1.0 Geometric discount for multi-turn rewards
export_style individual How to export interactions for training
subproc_max_workers 4 Max worker processes for subprocess mode

See Also