Skip to content
10 changes: 10 additions & 0 deletions areal/api/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,16 @@ class SchedulingSpec:
exclude: str | None = field(
default=None, metadata={"help": "sbatch/srun's `--exclude` option for slurm."}
)
ray_placement_strategy: str = field(
default="shared",
metadata={
"help": "Which placement strategy to use for Ray scheduling. "
"Shared will produce 1 placement group for all workers in the role (training). "
"Separate will 1 placement group per worker (rollout). "
"Deferred will do the same as separate but defers accelerator scheduling (multinode rollout). ",
"choices": ["shared", "separate", "deferred"],
},
)


@dataclass
Expand Down
5 changes: 5 additions & 0 deletions areal/infra/controller/rollout_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ def initialize(
sch_spec.mem *= alloc_mode.gen_instance_size
if sch_spec.gpu > 0:
sch_spec.gpu = alloc_mode.gen_instance_size

if sch_spec.ray_placement_strategy == "shared":
# do not support shared placement for rollout
sch_spec.ray_placement_strategy = "separate"

job = Job(
replicas=alloc_mode.gen.dp_size,
tasks=[sch_spec for _ in range(alloc_mode.gen.dp_size)],
Expand Down
199 changes: 49 additions & 150 deletions areal/infra/scheduler/ray.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
import asyncio
import math
import time
from collections import defaultdict
from dataclasses import dataclass, field
from typing import Any

import ray
import ray.exceptions
import torch
from ray.runtime_env import RuntimeEnv
from ray.util.placement_group import (
PlacementGroup,
placement_group,
remove_placement_group,
)
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
Expand All @@ -32,23 +29,18 @@
)
from areal.infra.utils.launcher import get_env_vars, get_thread_env_vars
from areal.infra.utils.ray import get_placement_group_master_ip_and_port
from areal.infra.utils.ray_placement_group import (
DeferredDeviceRayPlacementStrategy,
RayPlacementStrategy,
SeparatedRayPlacementStrategy,
SharedRayPlacementStrategy,
ray_resource_type,
)
from areal.utils import logging

logger = logging.getLogger("RayScheduler")


def ray_resource_type():
if torch.cuda.is_available():
return "GPU"

from areal.infra.platforms import is_npu_available

if is_npu_available:
return "NPU"

return "CPU"


@dataclass
class RayWorkerInfo:
worker: Worker
Expand Down Expand Up @@ -96,99 +88,6 @@ def _prepare_worker_specs(
f"schedulings length ({len(schedulings)}) must be 1 or equal to replicas ({num_workers})",
)

def _bundle_spec(self, cpu: int, gpu: int, mem: int) -> dict:
"""
define a bundle dict for a given cpu, gpu, mem requirement
"""
device = ray_resource_type()
if device == "CPU" and gpu > 0:
raise ValueError(
f"Current detected device is CPU but specified number of GPUs is {gpu}"
)
device_resource = device
if device == "CPU":
return {
"CPU": cpu,
"memory": mem * 1024**3, # convert gb to bytes
}
return {
"CPU": cpu,
device_resource: float(gpu),
"memory": mem * 1024**3, # convert gb to bytes
}

def _create_bundle_list_gpu(self, cpu: int, gpu: int, mem: int) -> list[dict]:
"""
for dividing out resources so that 1 bundle can be contained on 1 node and creates a list of bundles
"""
bundle_list = []

n_gpus_per_node = self.exp_config.cluster.n_gpus_per_node

if n_gpus_per_node == 0 and gpu > 0:
raise ValueError(
f"Requested {gpu} GPUs but number of GPUs per node is {n_gpus_per_node}"
)

if gpu < n_gpus_per_node:
return [self._bundle_spec(cpu, gpu, mem)]

gpu_remaining_to_be_assigned = gpu

while gpu_remaining_to_be_assigned > 0:
# do not want to take all gpus in node if we do not need that many
gpu_in_bundle = min(gpu_remaining_to_be_assigned, n_gpus_per_node)

# for scaling the amount of cpu and memory relative to gpu in bundle
resource_per_node_multiplier = min(gpu_in_bundle / gpu, 1)
cpu_in_bundle = math.ceil(cpu * resource_per_node_multiplier)
mem_in_bundle = math.ceil(mem * resource_per_node_multiplier)

bundle_list.append(
self._bundle_spec(cpu_in_bundle, gpu_in_bundle, mem_in_bundle)
)
gpu_remaining_to_be_assigned -= gpu_in_bundle

return bundle_list

def _actor_resource_spec(self, cpu: int, gpu: int, mem: int) -> dict:
"""
create a dictionary for passing into ray actor options specifying resource requirements
"""

device = ray_resource_type()
if device == "CPU" and gpu > 0:
raise ValueError(
f"Current detected device is CPU but specified number of GPUs is {gpu}"
)

res = {
"num_cpus": cpu,
"memory": mem * 1024**3,
}
if device == "CPU":
return res

# Use 0.9 GPUs to allow forked workers
if device == "GPU":
res["num_gpus"] = float(gpu) * 0.9
return res

return {
"num_cpus": cpu,
"resources": {device: float(gpu) * 0.9},
"memory": mem * 1024**3,
}

def _sum_resource_spec(
self, schedulings: list[SchedulingSpec]
) -> tuple[int, int, int]:
num_cpu = sum(spec.cpu for spec in schedulings)
num_gpu = sum(spec.gpu for spec in schedulings)
num_mem = sum(spec.mem for spec in schedulings)

return (num_cpu, num_gpu, num_mem)

def _ping_workers(self, role: str, timeout: float | None = None):
worker_info_list = self._workers[role]
timeout = timeout if timeout is not None else self.startup_timeout
Expand All @@ -215,21 +114,6 @@ def _ping_workers(self, role: str, timeout: float | None = None):
failed_worker = ref_to_worker[ref]
raise WorkerFailedError(failed_worker.worker.id, -1)

def _create_placement_group(self, role: str, bundles: list[dict]) -> PlacementGroup:
"""Helper to create and wait for a placement group."""
pg = placement_group(bundles=bundles, strategy="PACK")
try:
ray.get(pg.ready(), timeout=self.startup_timeout)
except ray.exceptions.GetTimeoutError:
logger.error(
f"Ray placement group timeout for role {role}\n"
f"ray.nodes(): {ray.nodes()}"
f"bundles: {bundles}"
)
raise
self._placement_groups.append(pg)
return pg

def _build_env_vars(self, spec: SchedulingSpec) -> dict[str, str]:
"""Helper to build environment variables for a worker."""
additional_envs_str = None
Expand All @@ -243,6 +127,27 @@ def _build_env_vars(self, spec: SchedulingSpec) -> dict[str, str]:
env.update(thread_env)
return env

def _get_placement_strategy(
self, schedulings: list[SchedulingSpec]
) -> RayPlacementStrategy:
placement_strategies = [spec.ray_placement_strategy for spec in schedulings]

if not all(ps == placement_strategies[0] for ps in placement_strategies):
raise RuntimeError(
f"Not every placement strategy in scheduling spec is the same: {placement_strategies}"
)

mode = placement_strategies[0]

if mode == "deferred":
return DeferredDeviceRayPlacementStrategy()
elif mode == "separate":
return SeparatedRayPlacementStrategy()
elif mode == "shared":
return SharedRayPlacementStrategy()
else:
raise RuntimeError(f"Ray scheduling mode {mode} is not supported")

def _create_ray_workers(
self, role: str, schedulings: list[SchedulingSpec]
) -> tuple[list[RayWorkerInfo], list[str]]:
Expand All @@ -255,38 +160,24 @@ def _create_ray_workers(
worker_info_list: list[RayWorkerInfo] = []
worker_ids: list[str] = []

# Create one PG per worker with explicit bundle_index=0
placement_groups = []
bundle_indices: list[int] = []
for spec in schedulings:
bundles = [self._bundle_spec(spec.cpu, spec.gpu, spec.mem)]
pg = self._create_placement_group(role, bundles)
placement_groups.append(pg)
bundle_indices.append(0) # Always use bundle_index=0
placement_strategy = self._get_placement_strategy(schedulings)
placement_groups = placement_strategy.create_placement_group(
role, schedulings, self.exp_config.cluster.n_gpus_per_node
)

master_ip, master_port = get_placement_group_master_ip_and_port(
placement_groups[0], placement_group_bundle_index=0
)

for idx, (spec, pg, bundle_idx) in enumerate(
zip(schedulings, placement_groups, bundle_indices)
):
for idx, spec in enumerate(schedulings):
options, pg_scheduling_strategy = placement_strategy.actor_resources(spec)
worker_id = f"{role}/{idx}"
env = self._build_env_vars(spec)
options = self._actor_resource_spec(spec.cpu, spec.gpu, spec.mem)

# Build scheduling strategy with explicit bundle index
strategy_kwargs: dict[str, Any] = {
"placement_group": pg,
"placement_group_capture_child_tasks": True,
"placement_group_bundle_index": bundle_idx, # Always 0
}

actor = RayRPCServer.options(
**options,
name=worker_id,
runtime_env=RuntimeEnv(env_vars=env),
scheduling_strategy=PlacementGroupSchedulingStrategy(**strategy_kwargs),
scheduling_strategy=pg_scheduling_strategy,
).remote()

# 0 needed to pad the list as the trainer takes index 1 for ports
Expand All @@ -299,8 +190,8 @@ def _create_ray_workers(
worker=worker,
actor=actor,
role=role,
placement_group=pg,
bundle_index=bundle_idx,
placement_group=pg_scheduling_strategy.placement_group,
bundle_index=pg_scheduling_strategy.placement_group_bundle_index,
created_at=time.time(),
env_vars=env,
)
Expand Down Expand Up @@ -340,6 +231,7 @@ def _create_forked_workers_internal(
list[str]
List of forked worker IDs
"""

worker_info_list: list[RayWorkerInfo] = []
worker_ids: list[str] = []

Expand All @@ -362,6 +254,10 @@ def _create_forked_workers_internal(
device = ray_resource_type()
additional_options = {}
if spec.gpu > 0:
if spec.gpu > 1:
raise NotImplementedError(
"Colocation of multi-GPU workers together is not supported by Ray"
)
if device == "GPU":
additional_options = dict(num_gpus=0.01)
else:
Expand Down Expand Up @@ -691,10 +587,13 @@ def _cleanup_workers(self, workers: list[RayWorkerInfo]):
# Asynchronously destroy actor
actor.destroy.remote()
except Exception:
logger.warning(
f"Could not destroy remote actor {actor}, force killing actor"
)
ray.kill(actor, no_restart=True)
try:
actor.__ray_terminate__.remote()
except Exception:
logger.warning(
f"Could not destroy remote actor {actor}, force killing actor"
)
ray.kill(actor, no_restart=True)
Comment on lines +594 to +600
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

While actor.destroy.remote() is the standard way to clean up actors, it can sometimes hang. The addition of a fallback to actor.__ray_terminate__.remote() before resorting to ray.kill() is a good improvement for robustness, as __ray_terminate__ provides a more graceful termination.

Suggested change
try:
actor.__ray_terminate__.remote()
except Exception:
logger.warning(
f"Could not destroy remote actor {actor}, force killing actor"
)
ray.kill(actor, no_restart=True)
except Exception:
try:
# Attempt a more graceful termination before force killing.
actor.__ray_terminate__.remote()
except Exception:
logger.warning(
f"Could not destroy remote actor {actor}, force killing actor"
)
ray.kill(actor, no_restart=True)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same code?


# Collect unique placement groups and remove them
unique_pgs = {wi.placement_group for wi in workers}
Expand Down
11 changes: 11 additions & 0 deletions areal/infra/utils/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,14 @@ def _master_ip_and_port():
),
)(_master_ip_and_port).remote()
return ray.get(future)


def create_resource_spec(device, cpu: int, gpu: int, mem_in_bytes: int):
res = {"num_cpus": cpu, "mem_in_bytes": mem_in_bytes}
if device == "CPU":
return res
if device == "GPU":
res["num_gpus"] = float(gpu)
return res

return {"num_cpus": cpu, "resources": {device: float(gpu)}, "memory": mem_in_bytes}
Loading