Skip to content

refactor the async task submission logic in workflow executor into task runner#473

Merged
rchardx merged 25 commits intomainfrom
fw/async-task-runner
Oct 24, 2025
Merged

refactor the async task submission logic in workflow executor into task runner#473
rchardx merged 25 commits intomainfrom
fw/async-task-runner

Conversation

@garrett4wade
Copy link
Collaborator

@garrett4wade garrett4wade commented Oct 22, 2025

Description

For the future implementation of RolloutController, we need to separate the task submission logic into an independent class.

Related Issue

Discussion: #469

Issue: #260

Type of Change

  • Bug fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to not
    work as expected)
  • Documentation update
  • Code refactoring (no functional changes)
  • Performance improvement
  • Test coverage improvement

Checklist

  • I have read the Contributing Guide
  • I have run formatting tools (pre-commit or manual)
  • I have run relevant unit tests and they pass
  • I have added tests for new functionality
  • I have updated documentation if needed
  • My branch is up to date with main
  • This PR introduces breaking changes (if yes, fill out details below)
  • If this PR changes documentation, I have built and previewed it locally with
    jb build docs
  • No critical issues raised by AI reviewers (/gemini review)

Breaking Change Details (if applicable):

N/A

Additional Context

Tests passed:

  • test_sglang_engine.py
  • test_vllm_engine.py
  • test_examples.py

Learning curve of gsm8k example:

1761139960333_BCC04312-343D-4546-B2E3-A21B24689BCF 1761139966214_028DF5A7-1552-45b2-9335-9F655420BE1A

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @garrett4wade, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a significant refactoring to the WorkflowExecutor by extracting its core asynchronous task submission and execution capabilities into a new, standalone AsyncTaskRunner class. This change aims to modularize the system, making the WorkflowExecutor more focused on its domain-specific responsibilities (like staleness management and trajectory validation) and preparing the groundwork for future features such as the RolloutController. The AsyncTaskRunner provides a robust and generic mechanism for running concurrent async Python functions, improving the overall architecture and maintainability of the asynchronous workflow processing.

Highlights

  • New AsyncTaskRunner Class: A new generic AsyncTaskRunner class has been introduced, providing a reusable, thread-based async task executor. It manages an asyncio event loop in a background thread, handling task submission, result collection, and pause/resume functionality independently of AReaL-specific logic.
  • Refactored WorkflowExecutor: The WorkflowExecutor has been refactored to delegate its asynchronous task submission and execution logic to the new AsyncTaskRunner. This significantly simplifies the WorkflowExecutor by removing its direct management of threading, asyncio event loops, and internal task queues.
  • Improved Modularity and Maintainability: By extracting the core async execution into AsyncTaskRunner, the codebase becomes more modular. WorkflowExecutor now focuses solely on AReaL-specific concerns like staleness management and trajectory validation, making it cleaner and easier to maintain.
  • Updated Task Submission and Waiting: The submit and wait methods within WorkflowExecutor have been updated to interact with the AsyncTaskRunner. The wait method now handles filtering None results (representing rejected trajectories) and accumulates results from the runner's output.
  • New Unit Tests for AsyncTaskRunner: Comprehensive unit tests have been added for the AsyncTaskRunner to ensure its basic functionality, pause/resume control, timeout handling, concurrency, error handling, queue size tracking, and result ordering/shuffling work as expected.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request does a great job of refactoring the asynchronous task submission logic from WorkflowExecutor into a new, generic AsyncTaskRunner class. This significantly improves separation of concerns and makes the WorkflowExecutor cleaner and more focused on its domain-specific logic. The new AsyncTaskRunner is well-designed and comes with a comprehensive suite of unit tests.

My review includes a critical suggestion to improve the robustness of the AsyncTaskRunner by handling exceptions from individual tasks without terminating the entire runner. I've also suggested an improvement to the corresponding unit test to verify this resilient behavior. Overall, this is a high-quality refactoring that improves the codebase's maintainability.

@garrett4wade
Copy link
Collaborator Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is a significant refactoring that extracts the asynchronous task execution logic from WorkflowExecutor into a new, generic AsyncTaskRunner class. This is a great improvement for modularity and reusability. The new AsyncTaskRunner is well-designed with good error handling and clean shutdown procedures. The WorkflowExecutor is correctly updated to use the new runner, and the addition of comprehensive unit tests for AsyncTaskRunner is excellent.

My main feedback is to improve the exception handling between AsyncTaskRunner and WorkflowExecutor by using a custom exception type instead of relying on string matching of a generic RuntimeError, which will make the code more robust and maintainable.

@garrett4wade
Copy link
Collaborator Author

/gemini review

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a generic asynchronous task runner and refactors the workflow executor to use it. The new AsyncTaskRunner class provides a reusable, thread-based async task executor with queue management and pause/resume control. The WorkflowExecutor is modified to leverage this new class, replacing its internal thread management with the AsyncTaskRunner. This change improves modularity and reusability of the code. I have identified some areas where improvements can be made, particularly around error handling and queue management.

@garrett4wade garrett4wade marked this pull request as ready for review October 23, 2025 08:07
Copy link
Collaborator

@rchardx rchardx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@rchardx rchardx merged commit 04ab601 into main Oct 24, 2025
1 of 4 checks passed
@rchardx rchardx deleted the fw/async-task-runner branch October 24, 2025 06:35
zhshgmail pushed a commit to zhshgmail/AReaL that referenced this pull request Oct 24, 2025
…tecture

This commit integrates the segment-wise decoupled PPO feature (from feature/segment-wise-ppo-v2) with the latest main branch (04ab601), which includes the AsyncTaskRunner refactoring (inclusionAI#473).

## Integration Base
- **Source**: feature/segment-wise-ppo-v2 (8e7edcb7)
- **Target**: origin/main (04ab601)
- **Method**: Manual integration (rebase had conflicts due to AsyncTaskRunner refactoring)

## What's New - Segment-wise Decoupled PPO Feature

### Core Algorithm
Implements async RLHF with per-token policy version tracking to handle staleness in distributed PPO training:
- **Staleness Control**: Automatically filters stale samples when model updates
- **Proximal Recomputation**: Recomputes logprobs for head tokens with old policy
- **Dynamic Capacity**: Adjusts generation capacity based on filtered samples

### New Files Added (21 files)

**API Layer** (abstractions):
- areal/api/cache_api.py - RolloutCache interface
- areal/api/queue_api.py - RolloutQueue interface
- areal/api/staleness_control.py - StalenessControlStrategy base class
- areal/api/proximal_recomputer.py - ProximalRecomputer interface
- areal/api/workflow_factory.py - Factory for creating WorkflowExecutor with DI

**Core Layer** (implementations):
- areal/core/rollout_cache.py - LocalRolloutCache implementation
- areal/core/rollout_queue.py - LocalRolloutQueue implementation
- areal/core/staleness_strategies.py - StandardPPOStrategy & SegmentWisePPOStrategy
- areal/core/proximal_recomputer.py - Concrete proximal logprob recomputer
- areal/core/capacity_modifier.py - CapacityModifier interface
- areal/core/filtered_capacity_modifier.py - FilteredSamplesCapacityModifier

**Tests** (10 test files, 227 tests total):
- areal/tests/sdp/ - Complete test suite covering all components

**Example**:
- examples/math/gsm8k_grpo_segment_wise.yaml - Example configuration

## Code Quality Improvements

### 1. Removed Unnecessary Type Quotes
Fixed 24+ type annotations (with __future__ annotations, quotes are unnecessary)

### 2. Decoupled Factory Dependencies
Applied Dependency Injection pattern to eliminate tight coupling in RemoteInfEngine and engine classes

### 3. Comprehensive Observability
Added metrics and DEBUG/INFO logging for staleness filtering, capacity management, and system monitoring

## Testing

**Test Results**: 219 passed, 8 skipped (96% pass rate)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants