Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8128dfa
feat(core): add cancel_generation() to ModelOutputThunk
planetf1 Apr 27, 2026
f26cce7
feat(stdlib): add stream_with_chunking() with per-chunk validation (#…
planetf1 Apr 27, 2026
93e7587
test(stdlib): add StreamingMockBackend and streaming orchestration tests
planetf1 Apr 27, 2026
a5d358c
docs: add streaming_chunking example (#901)
planetf1 Apr 27, 2026
39f18a4
docs(stdlib): add Args section to StreamChunkingResult class docstring
planetf1 Apr 28, 2026
36173cb
docs(stdlib): add Raises section to stream_with_chunking docstring
planetf1 Apr 28, 2026
ea6bdb0
fix(stdlib): stream_with_chunking passes one chunk per stream_validat…
planetf1 Apr 28, 2026
35df77f
docs(stdlib): fix example for delta semantics and note validator latency
planetf1 Apr 28, 2026
61448a9
feat(stdlib): flush trailing chunk fragment at end of stream
planetf1 Apr 28, 2026
def10b6
fix(stdlib): address review feedback on streaming validation
planetf1 Apr 28, 2026
da41a06
fix(stdlib): address second-round review feedback
planetf1 Apr 28, 2026
74c009d
docs(stdlib): add Args and Returns sections to chunker flush overrides
planetf1 Apr 28, 2026
3fb501e
fix(stdlib): address third-round review feedback
planetf1 Apr 29, 2026
a13d58b
feat(stdlib): add streaming event types, events() iterator, and OTEL …
planetf1 Apr 29, 2026
0f20577
test(stdlib): add event emission and OTEL bridge tests (#902)
planetf1 Apr 29, 2026
9291d5e
docs: add streaming validation sections to how-to and concepts (#902)
planetf1 Apr 29, 2026
b8c1126
docs: update streaming_chunking example to events() API (#902)
planetf1 Apr 29, 2026
240cd48
docs: add streaming/ to examples catalogue index (#902)
planetf1 Apr 29, 2026
e4035e5
docs(stdlib): fix event dataclass docstrings to use Args not Attribut…
planetf1 Apr 29, 2026
ab8dc45
fix(stdlib): address code review feedback on streaming events (#902)
planetf1 Apr 29, 2026
0537272
docs: add case _ fallback to streaming_chunking example match block (…
planetf1 Apr 29, 2026
b93474b
docs: add word, paragraph, and custom chunking examples (#902)
planetf1 Apr 29, 2026
c6d896e
fix(stdlib): address third-round review feedback (CancelledError fina…
planetf1 Apr 29, 2026
bd2a5ae
fix(stdlib): address fourth-round review findings on streaming events…
planetf1 Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/docs/concepts/requirements-system.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,30 @@ requirements = [
All requirements are validated after each generation attempt. The repair request lists
every requirement that failed, not just the first one, so the model can address all
issues in a single repair pass.

## Streaming validation

`stream_validate()` is the streaming counterpart to `validate()`. It is called
once per semantic chunk as tokens arrive from the model, before the full output
is available. Requirements that need to detect problems early — too many
sentences, a prohibited keyword in the first paragraph, unexpected JSON
structure mid-output — override `stream_validate()` to express that logic.

`stream_validate()` returns a `PartialValidationResult` with a tri-state `success`
field:

- `"unknown"` — no conclusion yet; the chunk is passed to the consumer and
`validate()` will be called at stream end.
- `"pass"` — the chunk looks valid so far; it is passed to the consumer and
`validate()` is still called at stream end (a streaming pass is informational,
not final).
- `"fail"` — the stream is cancelled immediately; no further chunks reach the
consumer; `validate()` is skipped for this requirement.

State isolation is per-clone: `stream_with_chunking()` copies each requirement
with `copy()` before starting the orchestrator, so the original objects are never
mutated. Requirements that accumulate state across chunks (e.g. a running word
count) should reassign mutable containers rather than mutate in place, since
clones share the original's `__dict__` values at copy time.

> **See also:** [Streaming with per-chunk validation](../how-to/use-async-and-streaming#streaming-with-per-chunk-validation)
1 change: 1 addition & 0 deletions docs/docs/examples/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ to run.
| `context/` | Context inspection, sampling with context trees, parallel context branches |
| `sessions/` | Custom session types and backend selection |
| `async/` | How to utilize basic async capabilities |
| `streaming/` | `stream_with_chunking()` with per-chunk validation, typed event vocabulary, early-exit on fail |

### Data and documents

Expand Down
122 changes: 122 additions & 0 deletions docs/docs/how-to/use-async-and-streaming.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,128 @@ asyncio.run(sequential_chat())

For parallel generation, use `SimpleContext`.

## Streaming with per-chunk validation

`stream_with_chunking()` adds per-chunk validation to a streaming generation.
It splits the accumulated text into semantic units (sentences, words, or
paragraphs), calls `stream_validate()` on each chunk in parallel, and can
exit early if any requirement returns `"fail"` — preventing the consumer from
seeing invalid content mid-stream.

The primary way to observe a `stream_with_chunking()` run is via typed
`StreamEvent` objects from `result.events()`:

```python
# Requires: mellea
# Returns: None
import asyncio

from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import PartialValidationResult, Requirement, ValidationResult
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import (
ChunkEvent,
CompletedEvent,
FullValidationEvent,
QuickCheckEvent,
StreamingDoneEvent,
stream_with_chunking,
)


class MaxSentencesReq(Requirement):
"""Fails if the model generates more than *limit* sentences."""

def __init__(self, limit: int) -> None:
super().__init__()
self._limit = limit
self._count = 0

def format_for_llm(self) -> str:
return f"The response must be at most {self._limit} sentences."

async def stream_validate(
self, chunk: str, *, backend: Backend, ctx: Context
) -> PartialValidationResult:
self._count += 1
if self._count > self._limit:
return PartialValidationResult("fail", reason="Too many sentences")
return PartialValidationResult("unknown")

async def validate(
self, backend: Backend, ctx: Context, *, format=None, model_options=None
) -> ValidationResult:
return ValidationResult(result=True)


async def main() -> None:
from mellea.stdlib.session import start_session

m = start_session()
action = Instruction("Write a two-sentence summary of the water cycle.")
req = MaxSentencesReq(limit=3)

result = await stream_with_chunking(
action, m.backend, m.ctx, quick_check_requirements=[req], chunking="sentence"
)

async for event in result.events():
match event:
case ChunkEvent():
print(f" chunk[{event.chunk_index}]: {event.text!r}")
case QuickCheckEvent(passed=False):
print(f" FAIL at chunk {event.chunk_index}: {event.results}")
case StreamingDoneEvent():
print(f" stream done — {len(event.full_text)} chars")
case FullValidationEvent():
print(f" final: {'pass' if event.passed else 'fail'}")
case CompletedEvent():
print(f" completed — success={event.success}")
case _:
pass # ErrorEvent and other future types

await result.acomplete()
print(f"completed={result.completed}, failures={len(result.streaming_failures)}")


asyncio.run(main())
```

If you only need the raw validated text without event metadata, use
`result.astream()` instead:

```python
result = await stream_with_chunking(
action, m.backend, m.ctx, quick_check_requirements=[req], chunking="sentence"
)
async for chunk in result.astream():
print(chunk)
await result.acomplete()
```

Both `astream()` (raw chunks) and `events()` are available on the same result
object. They use independent queues, so you can run them concurrently with
`asyncio.gather`. Both are **single-consumer** — a second iteration on either
will block indefinitely.

### The `stream_validate` tri-state

Each call to `stream_validate` returns a `PartialValidationResult` with one of
three values:

| Value | Meaning |
| ----- | ------- |
| `"unknown"` | No conclusion yet — wait for the full output before judging. |
| `"pass"` | This chunk is valid so far (informational; does not skip final `validate()`). |
| `"fail"` | Invalid — cancel the stream immediately and record a streaming failure. |

After a natural stream end, `validate()` is called on every non-`"fail"`
requirement (both `"pass"` and `"unknown"`). This means `"pass"` from
`stream_validate` does **not** replace the final `validate()` call.

> **See also:** [The Requirements System — Streaming validation](../concepts/requirements-system#streaming-validation)

---

**See also:** [Tutorial 02: Streaming and Async](../tutorials/02-streaming-and-async) | [act() and aact()](../how-to/act-and-aact)
183 changes: 183 additions & 0 deletions docs/examples/streaming/custom_chunking.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
# pytest: ollama, e2e

"""Streaming generation with a custom ChunkingStrategy subclass.

Demonstrates:
- Subclassing :class:`~mellea.stdlib.chunking.ChunkingStrategy` to define a
new splitting boundary
- Implementing ``split()`` (stateless, idempotent) and ``flush()`` (end-of-stream
release of any withheld trailing fragment)
- Using the custom chunker with ``stream_with_chunking()`` in place of a string alias
- Validating line-by-line output from a numbered-list prompt

``LineChunker`` splits on single newlines (``\\n``), emitting one line per
``stream_validate`` call. It sits between :class:`~mellea.stdlib.chunking.WordChunker`
(one word) and :class:`~mellea.stdlib.chunking.SentenceChunker` (one sentence) in
granularity, and is a natural fit for list-formatted model output.

Extension pattern:
1. Subclass ``ChunkingStrategy``.
2. Implement ``split(accumulated_text)`` — return all complete chunks found in
the accumulated text so far; withhold any trailing fragment. The method is
called on every new token delta, so it must be stateless and idempotent.
3. Override ``flush(accumulated_text)`` to release the withheld trailing fragment
when the stream ends naturally. The default base implementation returns ``[]``
(fragment discarded); override it when the trailing fragment is semantically
significant.
"""

import asyncio
import re

from mellea.core.backend import Backend
from mellea.core.base import Context
from mellea.core.requirement import (
PartialValidationResult,
Requirement,
ValidationResult,
)
from mellea.stdlib.chunking import ChunkingStrategy
from mellea.stdlib.components import Instruction
from mellea.stdlib.streaming import (
ChunkEvent,
CompletedEvent,
FullValidationEvent,
QuickCheckEvent,
StreamingDoneEvent,
stream_with_chunking,
)

# Matches a leading list marker: "1.", "1)", "1 .", or a bare number followed
# by a space — covers common model output formats.
_NUMBERED_LINE = re.compile(r"^\s*\d+[\.\)]\s")


class LineChunker(ChunkingStrategy):
"""Splits accumulated text on single newlines, emitting one line per chunk.

The line after the last ``\\n`` is withheld as a trailing fragment until
the stream ends and :meth:`flush` is called. Blank lines are skipped —
they carry no content for a line-level validator.

This chunker is a good fit for numbered-list output, code listings, and
any structured response where the model uses line breaks as separators
rather than sentence-ending punctuation or double newlines.
"""

def split(self, accumulated_text: str) -> list[str]:
"""Return all complete lines (up to the last newline).

Args:
accumulated_text: The full text accumulated so far.

Returns:
Non-empty lines found before the last newline character.
The text after the last newline is withheld as a trailing fragment.
"""
if "\n" not in accumulated_text:
return []
last_nl = accumulated_text.rfind("\n")
complete_section = accumulated_text[:last_nl]
return [line for line in complete_section.split("\n") if line.strip()]

def flush(self, accumulated_text: str) -> list[str]:
"""Release the trailing line fragment at end of stream.

Args:
accumulated_text: The full accumulated text at stream end.

Returns:
The text after the last newline as a single-element list (stripped),
or an empty list if the text ends with a newline or is empty.
"""
if not accumulated_text:
return []
last_nl = accumulated_text.rfind("\n")
trailing = (
accumulated_text if last_nl == -1 else accumulated_text[last_nl + 1 :]
).strip()
return [trailing] if trailing else []


class NumberedLineReq(Requirement):
"""Fails the stream if any line does not start with a list number.

Each ``stream_validate`` call receives one complete line (from
:class:`LineChunker`). This requirement enforces that every line follows
the ``N. item`` format, catching unstructured paragraphs or stray headers
that sneak into what should be a clean numbered list.
"""

def format_for_llm(self) -> str:
return "Every line must begin with a number followed by a period (e.g. '1. ')."

async def stream_validate(
self, chunk: str, *, backend: Backend, ctx: Context
) -> PartialValidationResult:
if not _NUMBERED_LINE.match(chunk):
return PartialValidationResult(
"fail", reason=f"Line does not start with a number: {chunk.strip()!r}"
)
return PartialValidationResult("unknown")

async def validate(
self,
backend: Backend,
ctx: Context,
*,
format: type | None = None,
model_options: dict | None = None,
) -> ValidationResult:
return ValidationResult(result=True)


async def main() -> None:
from mellea.stdlib.session import start_session

m = start_session()
backend = m.backend
ctx = m.ctx

action = Instruction(
"List five world capitals, one per line, numbered 1 through 5. "
"Use the format: '1. City'. Output only the numbered list, nothing else."
)
chunker = LineChunker()
req = NumberedLineReq()

result = await stream_with_chunking(
action, backend, ctx, quick_check_requirements=[req], chunking=chunker
)

print("Streaming events as they arrive (one ChunkEvent per line):")
async for event in result.events():
match event:
case ChunkEvent():
print(f" LINE[{event.chunk_index}]: {event.text!r}")
case QuickCheckEvent(passed=False):
print(
f" QUICK_CHECK[line {event.chunk_index}]: FAIL — "
f"{event.results[0].reason if event.results else 'unknown'}"
)
case QuickCheckEvent():
print(f" QUICK_CHECK[line {event.chunk_index}]: pass")
case StreamingDoneEvent():
print(f" STREAMING_DONE: {len(event.full_text)} chars accumulated")
case FullValidationEvent():
print(f" FULL_VALIDATION: {'PASS' if event.passed else 'FAIL'}")
case CompletedEvent():
print(f" COMPLETED: success={event.success}")
case _:
pass

await result.acomplete()

print(f"\nCompleted normally: {result.completed}")
if result.streaming_failures:
for _req, pvr in result.streaming_failures:
print(f"Streaming failure: {pvr.reason}")
else:
print(f"Full text:\n{result.full_text}")


asyncio.run(main())
Loading
Loading