Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d007c64
feat(core): add _coalesce module skeleton with CoalesceOptions and stub
d-v-b Apr 24, 2026
abac6d3
test(core): add failing tests for coalesced_get basic cases
d-v-b Apr 24, 2026
f65018a
feat(core): implement coalesced_get for basic sequential cases
d-v-b Apr 24, 2026
9e1f1d2
test(core): cover Offset/Suffix/None and mixed-cluster cases in coale…
d-v-b Apr 24, 2026
cd5097b
feat(core): run coalesced fetches concurrently under max_concurrency
d-v-b Apr 24, 2026
3a85488
test(core): cover key-missing (start/mid) and fetch-raises in coalesc…
d-v-b Apr 24, 2026
4553523
test(core): cover max_coalesced_bytes cap in coalesced_get
d-v-b Apr 24, 2026
162dd6d
test(core): add coverage-invariant property test for coalesced_get
d-v-b Apr 24, 2026
401e28b
test(core): drop unused HEAVY_MERGE/NO_MERGE constants
d-v-b Apr 24, 2026
913928c
docs(core): shorten coalesced_get docstring summary line
d-v-b Apr 24, 2026
850b9cd
refactor(core): drop dead invariant checks in merged-group path
d-v-b Apr 24, 2026
b2ec638
test(core): cover key-missing on uncoalescable input
d-v-b Apr 24, 2026
a4c3330
fix(core): cancel pending fetches on early exit and stop-after-miss
d-v-b Apr 24, 2026
5b1d8cd
test(core): cover mid-stream miss with concurrency > 1
d-v-b Apr 24, 2026
6aa6f4b
test(core): verify consumer-break cancels pending fetches
d-v-b Apr 24, 2026
dded848
fix(core): type coalesced_get as AsyncGenerator
d-v-b Apr 24, 2026
865baf0
refactor(test): drop redundant pytestmark asyncio
d-v-b Apr 24, 2026
17d9f75
feat(storage): add private SupportsGetRanges protocol
d-v-b Apr 24, 2026
3ab711d
test(storage): add failing tests for FsspecStore.get_ranges
d-v-b Apr 24, 2026
913be10
feat(storage): add FsspecStore.get_ranges and coalesce_options kwarg
d-v-b Apr 24, 2026
0328e01
test(storage): add SupportsGetRanges conformance tests
d-v-b Apr 24, 2026
e7432c5
chore: add towncrier fragment for get_ranges
d-v-b Apr 24, 2026
349bd9c
docs: drop private-symbol mention from get_ranges changelog
d-v-b Apr 24, 2026
79e9927
test: refactor tests
d-v-b Apr 24, 2026
8754f85
test: skip fsspec-backed get_ranges tests on old fsspec
d-v-b Apr 24, 2026
8d86f05
Merge branch 'main' into feat/get-many
d-v-b Apr 30, 2026
287b00d
Merge branch 'main' into feat/get-many
d-v-b May 1, 2026
1607420
Merge branch 'main' into feat/get-many
d-v-b May 6, 2026
ba6fef2
Merge branch 'main' into feat/get-many
d-v-b May 6, 2026
49289de
Merge branch 'main' into feat/get-many
d-v-b May 7, 2026
f158af3
Update src/zarr/core/_coalesce.py
d-v-b May 7, 2026
ef65b65
Update src/zarr/core/_coalesce.py
d-v-b May 7, 2026
3e1f22c
Update src/zarr/core/_coalesce.py
d-v-b May 7, 2026
a4906cc
Merge branch 'main' into feat/get-many
d-v-b May 7, 2026
2d556d4
chore: lint
d-v-b May 7, 2026
0e5b2c5
refactor: better function design with explicit context
d-v-b May 7, 2026
6e6a1ad
Merge branch 'main' into feat/get-many
d-v-b May 8, 2026
a997792
Merge branch 'main' into feat/get-many
d-v-b May 8, 2026
17f8d64
refactor(coalesce): split coalescing from coalesced get
d-v-b May 8, 2026
5eb25bc
refactor(coalesce): use sequence instead of iterable
d-v-b May 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/0000.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `zarr.storage.FsspecStore.get_ranges` for concurrent, coalesced multi-range reads from a single key. A new keyword-only constructor argument `coalesce_options` on `FsspecStore` controls the max gap, max coalesced size, and max concurrency of the underlying requests.
251 changes: 251 additions & 0 deletions src/zarr/core/_coalesce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
# src/zarr/core/_coalesce.py
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Literal, NamedTuple, TypedDict

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Awaitable, Callable, Sequence

from zarr.abc.store import ByteRequest, RangeByteRequest
from zarr.core.buffer import Buffer

_CompletionEntry = (
tuple[Literal["ok"], Sequence[tuple[int, Buffer | None]]]
| tuple[Literal["missing"], None]
| tuple[Literal["error"], BaseException]
)


class _WorkerCtx(NamedTuple):
"""Shared state passed to the per-task worker coroutines.

Bundling these lets the workers declare their dependencies as one
parameter instead of capturing them implicitly via closure.
"""

fetch: Callable[[ByteRequest | None], Awaitable[Buffer | None]]
semaphore: asyncio.Semaphore
queue: asyncio.Queue[_CompletionEntry]


async def _fetch_single(ctx: _WorkerCtx, idx: int, req: ByteRequest | None) -> None:
try:
async with ctx.semaphore:
buf = await ctx.fetch(req)
if buf is None:
await ctx.queue.put(("missing", None))
return
await ctx.queue.put(("ok", ((idx, buf),)))
except asyncio.CancelledError:
raise
except Exception as exc:
await ctx.queue.put(("error", exc))


async def _fetch_group(ctx: _WorkerCtx, members: list[tuple[int, RangeByteRequest]]) -> None:
"""Fetch one merged byte range and slice it back into per-input buffers.

`members` must already be sorted by `start`; callers in this module
build it from the sorted mergeable list.
"""
from zarr.abc.store import RangeByteRequest

try:
start = members[0][1].start
end = max(r.end for _, r in members)
async with ctx.semaphore:
big = await ctx.fetch(RangeByteRequest(start, end))
Comment thread
d-v-b marked this conversation as resolved.
if big is None:
await ctx.queue.put(("missing", None))
return
sliced: list[tuple[int, Buffer | None]] = [
(idx, big[r.start - start : r.end - start]) for idx, r in members
]
await ctx.queue.put(("ok", tuple(sliced)))
except asyncio.CancelledError:
raise
except Exception as exc:
await ctx.queue.put(("error", exc))


class CoalesceOptions(TypedDict):
"""Knobs for coalescing contiguous byte ranges into fewer I/O requests.

All fields required. See DEFAULT_COALESCE_OPTIONS for a sensible default.
"""

max_gap_bytes: int
"""Two RangeByteRequests separated by at most this many bytes may be merged into one fetch."""
max_coalesced_bytes: int
"""Upper bound on the size of a single merged fetch (ignored for an already-oversized single request)."""
max_concurrency: int
"""Maximum number of merged fetches in flight at once."""


DEFAULT_COALESCE_OPTIONS: CoalesceOptions = {
"max_gap_bytes": 1 << 20, # 1 MiB
"max_coalesced_bytes": 16 << 20, # 16 MiB
"max_concurrency": 10,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does this need its own maximal concurrency? How will this interact with the global setting?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs a max concurrency because implementations may launch concurrent range reads. Right now this setting doesn't interact at all with the global setting, and that's hard to support given our current concurrency limit design -- codecs like sharding call concurrent_map recursively, so the global concurrency limit is alrady on shaky ground for sharding. We should do some testing with this parameter to see if there are any pathologies possible with too much concurrency.

}


def coalesce_ranges(
byte_ranges: Sequence[ByteRequest | None],
*,
options: CoalesceOptions,
) -> tuple[
list[list[tuple[int, RangeByteRequest]]],
list[tuple[int, ByteRequest | None]],
]:
"""Plan a set of byte-range fetches: which inputs merge, which stand alone.

Pure (no I/O). The result is the I/O plan a caller would execute: each
group corresponds to one fetch of a coalesced byte range, and each
uncoalescable item corresponds to one fetch of the original request.

Parameters
----------
byte_ranges
Input ranges. `None` means "the whole value".
options
Coalescing knobs (see `CoalesceOptions`).

Returns
-------
groups
List of merged groups. Each group is a list of
`(input_index, RangeByteRequest)` pairs sorted by `start`. A
single-element group represents a `RangeByteRequest` that did not
merge with any neighbor.
uncoalescable
List of `(input_index, request)` pairs for inputs that are not
`RangeByteRequest` (`OffsetByteRequest`, `SuffixByteRequest`,
`None`). Indices are preserved from the input order.

Notes
-----
Only `RangeByteRequest` inputs participate in coalescing. Two ranges
merge when both: their gap (next `start` minus current group's running
`end`) is `<= options["max_gap_bytes"]`, and the resulting merged
span is `<= options["max_coalesced_bytes"]`.
"""
# Local import to avoid cycles at module import time.
from zarr.abc.store import RangeByteRequest

indexed: list[tuple[int, ByteRequest | None]] = list(enumerate(byte_ranges))
mergeable: list[tuple[int, RangeByteRequest]] = [
(i, r) for i, r in indexed if isinstance(r, RangeByteRequest)
]
uncoalescable: list[tuple[int, ByteRequest | None]] = [
(i, r) for i, r in indexed if not isinstance(r, RangeByteRequest)
]

# Sort mergeables by start offset, then merge. Track running start/end of the
# current group so each merge step is O(1) instead of O(group size).
mergeable.sort(key=lambda pair: pair[1].start)
groups: list[list[tuple[int, RangeByteRequest]]] = []
group_start = 0
group_end = 0
for pair in mergeable:
_i, r = pair
if groups and r.start - group_end <= options["max_gap_bytes"]:
prospective_end = max(group_end, r.end)
if prospective_end - group_start <= options["max_coalesced_bytes"]:
groups[-1].append(pair)
group_end = prospective_end
continue
groups.append([pair])
group_start = r.start
group_end = r.end

return groups, uncoalescable


async def coalesced_get(
fetch: Callable[[ByteRequest | None], Awaitable[Buffer | None]],
byte_ranges: Sequence[ByteRequest | None],
*,
options: CoalesceOptions,
) -> AsyncGenerator[Sequence[tuple[int, Buffer | None]], None]:
"""Read many byte ranges through `fetch` with coalescing and concurrency.

Nearby ranges are merged into a single underlying I/O (subject to
`options`), and merged fetches are run concurrently. Each yield
corresponds to exactly one underlying I/O operation: a sequence of
`(input_index, result)` tuples for all input ranges served by that I/O.
Tuples within a yielded sequence are ordered by start offset. Yields across
groups are in completion order, not input order.

Parameters
----------
fetch
Callable that reads one byte range and returns a `Buffer` (or `None`
if the underlying key does not exist). Typically constructed via
`functools.partial(store.get, key, prototype)`.
byte_ranges
Input ranges. `None` means "the whole value".
options
Coalescing knobs.

Yields
------
Sequence[tuple[int, Buffer | None]]
Per-I/O batch of `(input_index, result)` tuples.

Notes
-----
- Only `RangeByteRequest` inputs are coalesced. `OffsetByteRequest`,
`SuffixByteRequest`, and `None` are each treated as uncoalescable
(one fetch, one single-tuple yield per input).
- If any fetch returns `None` the iterator stops scheduling further fetches
and completes without yielding the missing group. Groups completed before
the miss remain observable.
- If a fetch raises, the exception propagates on the yield that produced the
failing group; earlier-completed groups remain observable.
"""
groups, uncoalescable = coalesce_ranges(byte_ranges, options=options)
if not groups and not uncoalescable:
return

ctx = _WorkerCtx(
fetch=fetch,
semaphore=asyncio.Semaphore(options["max_concurrency"]),
queue=asyncio.Queue(),
)

# Launch all work as tasks. The semaphore bounds actual I/O concurrency.
tasks: set[asyncio.Task[None]] = set()
for group in groups:
tasks.add(asyncio.create_task(_fetch_group(ctx, group)))
for idx, single in uncoalescable:
tasks.add(asyncio.create_task(_fetch_single(ctx, idx, single)))
total_work = len(tasks)

try:
pending_error: BaseException | None = None
for _ in range(total_work):
entry = await ctx.queue.get()
if entry[0] == "ok":
yield entry[1]
continue
# "missing" or "error": stop scheduling and cancel pending work.
# Late arrivals that raced to enqueue before cancellation took
# effect sit in the completion queue and are discarded by the
# finally block (the queue is local and will be garbage-collected).
for t in tasks:
if not t.done():
t.cancel()
if entry[0] == "error":
pending_error = entry[1]
break
if pending_error is not None:
raise pending_error
finally:
# Best-effort cancellation for in-flight tasks (covers the consumer
# break / early-exit case where we did not proactively cancel).
for t in tasks:
if not t.done():
t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
33 changes: 32 additions & 1 deletion src/zarr/storage/_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import warnings
from contextlib import suppress
from functools import partial
from typing import TYPE_CHECKING, Any

from packaging.version import parse as parse_version
Expand All @@ -14,18 +15,24 @@
Store,
SuffixByteRequest,
)
from zarr.core._coalesce import (
DEFAULT_COALESCE_OPTIONS,
CoalesceOptions,
coalesced_get,
)
from zarr.core.buffer import Buffer
from zarr.errors import ZarrUserWarning
from zarr.storage._utils import _dereference_path

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable
from collections.abc import AsyncIterator, Iterable, Sequence

from fsspec import AbstractFileSystem
from fsspec.asyn import AsyncFileSystem
from fsspec.mapping import FSMap

from zarr.core.buffer import BufferPrototype
from zarr.storage._protocols import SupportsGetRanges


ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = (
Expand Down Expand Up @@ -124,11 +131,14 @@ def __init__(
read_only: bool = False,
path: str = "/",
allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
*,
coalesce_options: CoalesceOptions = DEFAULT_COALESCE_OPTIONS,
) -> None:
super().__init__(read_only=read_only)
self.fs = fs
self.path = path
self.allowed_exceptions = allowed_exceptions
self.coalesce_options = coalesce_options

if not self.fs.async_impl:
raise TypeError("Filesystem needs to support async operations.")
Expand Down Expand Up @@ -315,6 +325,22 @@ async def get(
else:
return value

async def get_ranges(
self,
key: str,
byte_ranges: Sequence[ByteRequest | None],
*,
prototype: BufferPrototype,
) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]:
"""Read many byte ranges from `key`, coalescing nearby ranges and fetching concurrently.

See `zarr.storage._protocols.SupportsGetRanges` for the contract and
`zarr.core._coalesce.coalesced_get` for the full semantics.
"""
fetch = partial(self.get, key, prototype)
async for group in coalesced_get(fetch, byte_ranges, options=self.coalesce_options):
yield group

async def set(
self,
key: str,
Expand Down Expand Up @@ -440,3 +466,8 @@ async def getsize(self, key: str) -> int:
else:
# fsspec doesn't have typing. We'll need to assume or verify this is true
return int(size)


# Module-level type assertion: FsspecStore structurally satisfies SupportsGetRanges.
# This line is a no-op at runtime but causes mypy/pyright to complain if the shape drifts.
_: type[SupportsGetRanges] = FsspecStore
34 changes: 34 additions & 0 deletions src/zarr/storage/_protocols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# src/zarr/storage/_protocols.py
from __future__ import annotations

from typing import TYPE_CHECKING, Protocol, runtime_checkable

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Sequence

from zarr.abc.store import ByteRequest
from zarr.core.buffer import Buffer, BufferPrototype


@runtime_checkable
class SupportsGetRanges(Protocol):
"""Stores that satisfy this protocol can efficiently read many byte ranges
from a single key in a single call, typically via coalescing and concurrent fetch.

Private / unstable. Shape may change before being made public.
"""

def get_ranges(
self,
key: str,
byte_ranges: Sequence[ByteRequest | None],
*,
prototype: BufferPrototype,
) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]:
"""Read many byte ranges from `key`.

Each yield corresponds to one underlying I/O operation.

See `zarr.core._coalesce.coalesced_get` for full semantics.
"""
...
Loading
Loading