Skip to content

Commit 67b5662

Browse files
committed
docs: add streaming_chunking example (#901)
Adds docs/examples/streaming/streaming_chunking.py demonstrating stream_with_chunking() end-to-end: defining a custom stream_validate() override, consuming chunks via astream(), and awaiting acomplete() to inspect final_validations and streaming_failures. Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
1 parent c132854 commit 67b5662

1 file changed

Lines changed: 91 additions & 0 deletions

File tree

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
# pytest: ollama, integration
2+
3+
"""Streaming generation with per-chunk validation using stream_with_chunking().
4+
5+
Demonstrates:
6+
- Subclassing Requirement to override stream_validate() for early-exit checks
7+
- Calling stream_with_chunking() with sentence-level chunking
8+
- Consuming validated chunks via astream() as they arrive
9+
- Awaiting full completion with acomplete() to access final_validations and full_text
10+
"""
11+
12+
import asyncio
13+
14+
from mellea.core.backend import Backend
15+
from mellea.core.base import Context
16+
from mellea.core.requirement import (
17+
PartialValidationResult,
18+
Requirement,
19+
ValidationResult,
20+
)
21+
from mellea.stdlib.components import Instruction
22+
from mellea.stdlib.streaming import stream_with_chunking
23+
24+
25+
class MaxSentencesReq(Requirement):
26+
"""Fails if the model generates more than *limit* sentences mid-stream."""
27+
28+
def __init__(self, limit: int) -> None:
29+
self._limit = limit
30+
self._count = 0
31+
32+
def format_for_llm(self) -> str:
33+
return f"The response must be at most {self._limit} sentences long."
34+
35+
async def stream_validate(
36+
self, chunk: str, *, backend: Backend, ctx: Context
37+
) -> PartialValidationResult:
38+
sentence_count = chunk.count(".") + chunk.count("!") + chunk.count("?")
39+
if sentence_count > self._limit:
40+
return PartialValidationResult(
41+
"fail",
42+
reason=f"Response exceeded {self._limit} sentence limit mid-stream",
43+
)
44+
return PartialValidationResult("unknown")
45+
46+
async def validate(
47+
self,
48+
backend: Backend,
49+
ctx: Context,
50+
*,
51+
format: type | None = None,
52+
model_options: dict | None = None,
53+
) -> ValidationResult:
54+
return ValidationResult(result=True)
55+
56+
57+
async def main() -> None:
58+
from mellea.stdlib.session import start_session
59+
60+
m = start_session()
61+
backend = m.backend
62+
ctx = m.ctx
63+
64+
action = Instruction(
65+
"Write a short paragraph about the water cycle in exactly two sentences."
66+
)
67+
req = MaxSentencesReq(limit=3)
68+
69+
result = await stream_with_chunking(
70+
action, backend, ctx, quick_check_requirements=[req], chunking="sentence"
71+
)
72+
73+
print("Streaming chunks as they arrive:")
74+
async for chunk in result.astream():
75+
print(f" CHUNK: {chunk!r}")
76+
77+
await result.acomplete()
78+
79+
print(f"\nCompleted normally: {result.completed}")
80+
print(f"Full text: {result.full_text!r}")
81+
82+
if result.streaming_failures:
83+
for _req, pvr in result.streaming_failures:
84+
print(f"Streaming failure: {pvr.reason}")
85+
86+
if result.final_validations:
87+
for vr in result.final_validations:
88+
print(f"Final validation: {'PASS' if vr.as_bool() else 'FAIL'}")
89+
90+
91+
asyncio.run(main())

0 commit comments

Comments
 (0)