feat(stdlib): add stream_with_chunking() with per-chunk validation (#901)#942
feat(stdlib): add stream_with_chunking() with per-chunk validation (#901)#942planetf1 wants to merge 14 commits intogenerative-computing:mainfrom
Conversation
44025e4 to
76a3eb9
Compare
Addresses issues raised by independent review on top of PR generative-computing#942. Orchestrator (mellea/stdlib/streaming.py): - except Exception now calls mot.cancel_generation() before surfacing the exception to the consumer — previously the backend producer was left running, eventually blocking on mot._async_queue (maxsize=20). Cleanup failures are logged via MelleaLogger.warning with a TODO(generative-computing#902) marker; generative-computing#902 replaces the log with a proper ErrorEvent. - RuntimeError catch in the astream() loop now re-raises unless mot.is_computed() is true, so only the documented "already computed" race is swallowed. - astream() docstring now states the single-consumer contract explicitly; a second iteration blocks on an empty queue with no sentinel to deliver. - as_thunk docstring now flags the early-exit case: cancel_generation forces is_computed=True without running post_processing(), so generation.usage and related telemetry fields may be None. Chunker (mellea/stdlib/chunking.py): - SentenceChunker.flush switches from .strip() to .rstrip() with a comment explaining why: the loop's lstrip has already removed leading whitespace, and trailing whitespace on a sentence fragment is non-semantic (consistent with split() returning sentences without trailing whitespace). - ParagraphChunker.flush adds a docstring noting the deliberate asymmetry: paragraph fragments are returned byte-for-byte because internal whitespace (e.g. trailing \n of a list item) can be semantically meaningful. Tests (test/stdlib/test_streaming.py): - test_stream_validate_receives_individual_chunks now uses exact- match on the captured chunk list, which directly regresses if someone reverts to accumulated-text semantics. - test_multiple_chunks_in_one_batch_with_mid_batch_fail: response fed as one large token so split() yields 4 sentences at once; verifies chunk 1 emits, chunk 2 fails (not emitted), chunks 3 and 4 are neither validated nor emitted. - test_cancel_generation_invoked_on_fail: spies on ModelOutputThunk.cancel_generation and asserts it was called on the "fail" early-exit path. - test_exception_in_stream_validate_cancels_generation: a requirement that raises must cause cancel_generation to run and the exception to surface via astream()/acomplete() without hanging. Telemetry observability (orchestrator-level spans, metrics, span events) remains deferred to generative-computing#902 per the epic, which now has the acceptance criteria updated to cover event emission, the OTEL bridge, and the ErrorEvent type that will replace the MelleaLogger stopgap. Assisted-by: Claude Code
Adds an async cancel_generation() method that cancels in-progress _generate and _generate_extra tasks, drains the internal async queue to release any blocked put() calls, closes the open telemetry span, and sets _computed=True so the MOT is immediately usable. Required by the stream_with_chunking() orchestrator (generative-computing#901) for clean early-exit when a streaming requirement returns "fail". Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
…enerative-computing#901) Adds stream_with_chunking() — the core streaming orchestration primitive that consumes a ModelOutputThunk's async stream via a background asyncio.Task, applies a ChunkingStrategy to produce semantic chunks, and runs stream_validate() in parallel across all requirements at each chunk boundary. Key behaviours: - Early exit: if any requirement returns "fail" during streaming, generation is cancelled immediately via cancel_generation() and StreamChunkingResult.completed is set to False. - Final validation: after natural completion, validate() is called on all non-failed requirements. - Clone-per-call: requirements are cloned (copy(req)) before each invocation; originals are never mutated. - String aliases: "sentence", "word", "paragraph" map to the corresponding ChunkingStrategy subclasses. StreamChunkingResult exposes: - astream() — async iterator yielding individual validated chunks - acomplete() — await full completion including final validation - as_thunk — wrap full_text as a computed ModelOutputThunk - completed, full_text, final_validations, streaming_failures Re-exports StreamChunkingResult and stream_with_chunking from mellea.stdlib for day-to-day use. Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Adds test/stdlib/test_streaming.py with 9 unit tests covering: - Normal completion: validate() called at stream end, completed=True - Early exit on "fail": completed=False, streaming_failures populated - Clone isolation: originals never mutated across retries - quick_check_backend routing: validation uses alternate backend - Deadlock prevention: early exit with asyncio.wait_for timeout - as_thunk correctness: value=full_text, raises before acomplete() - astream() yields individual chunks (not accumulated text) - No requirements: streams without validation StreamingMockBackend subclasses Backend and feeds a fixed response string into a MOT queue char-by-char via asyncio.create_task, following the create_manual_mock_thunk() pattern from test_astream_mock.py. Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Adds docs/examples/streaming/streaming_chunking.py demonstrating stream_with_chunking() end-to-end: defining a custom stream_validate() override, consuming chunks via astream(), and awaiting acomplete() to inspect final_validations and streaming_failures. Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
Fixes [no_class_args] CI failure — the docs build-and-validate checker requires __init__ parameters to be documented in the class docstring (not __init__) per Option C convention. Assisted-by: Claude Code
Fixes second [no_raises] CI failure — stream_with_chunking raises ValueError for unknown chunking aliases but had no Raises: section. Assisted-by: Claude Code
…e call Aligns the orchestrator with the chunk-at-a-time design set out in the generative-computing#891 epic and generative-computing#900 spec. Previously _orchestrate_streaming passed the full accumulated text to stream_validate, and called it once per batch of new chunks rather than once per chunk. This masked the design intent of the chunking strategy and forced stateful requirements into the self._seen_len workaround. Behaviour changes: - stream_validate is called once per complete chunk produced by the chunking strategy (not once per astream() iteration) - The call receives that single chunk (not the accumulated text) - Multiple chunks from one astream() iteration are validated in order; early exit on a "fail" prevents later chunks in the same batch from being validated or emitted - On early exit, the failing chunk is no longer emitted to the consumer; consumers inspect StreamChunkingResult.streaming_failures instead (previous behaviour emitted whatever the current batch contained) Test changes: - FailAfterWordsReq now maintains a running word count on self, since each stream_validate call sees a one-word chunk rather than the growing accumulation - New test_stream_validate_receives_individual_chunks asserts the per-chunk contract directly by capturing the cloned requirement and checking the chunks it saw Docstring updated to describe the per-chunk contract, the in-order validation of a batch, the non-emission of failing chunks, and the MOT single-consumer constraint. Assisted-by: Claude Code
Two documentation fixes following the per-chunk semantics correction: - streaming_chunking.py: MaxSentencesReq previously counted sentence-end punctuation in the chunk, which worked under the old accumulated-text behaviour but returns at most 1 per sentence under delta semantics. Rewritten to increment self._count once per chunk -- the canonical pattern for a requirement that needs context beyond a single chunk. - stream_with_chunking docstring: add a Note that chunks are emitted to the consumer only after every active validator returns for that chunk. A slow stream_validate (e.g. an LLM-based one) therefore adds latency to every chunk. The invariant preserved is that the consumer never sees unvalidated content; a concurrent-emission fast path may be added in future if a concrete use case calls for it. Assisted-by: Claude Code
ChunkingStrategy.split() withholds the trailing fragment by design (generative-computing#899). Previously the orchestrator discarded it — it appeared in full_text and the final validate() saw it, but it was never yielded to astream() consumers and never seen by stream_validate. For a response that did not end in a chunk terminator (e.g. "Sentence one. Sentence two." with no trailing whitespace under SentenceChunker), the last sentence silently bypassed streaming validation. Adds ChunkingStrategy.flush(accumulated_text) -> list[str]: - Default in the ABC returns [] (backward-compatible — external chunkers retain the old discard behaviour until they opt in). - SentenceChunker, WordChunker, ParagraphChunker each override to return the withheld trailing fragment as a single-element list. _orchestrate_streaming calls chunking.flush(accumulated) after the main loop (only when the stream ended naturally, not on early exit — a cancelled stream's trailing fragment is by definition incomplete). Each flushed chunk goes through the same stream_validate / emit path as regular chunks, so the "no unvalidated content reaches the consumer" invariant extends to the trailing fragment, and a fail on the fragment still records a streaming failure and skips final validate(). Tests: - 13 new chunker tests covering the default-discard behaviour and each built-in's flush logic (empty input, fragment-present, already- terminated cases). - test_trailing_fragment_is_flushed_to_consumer: stream_validate sees the fragment and astream yields it. - test_early_exit_on_trailing_fragment: fail on the flushed fragment propagates to streaming_failures and skips final validation. Assisted-by: Claude Code
Addresses issues raised by independent review on top of PR generative-computing#942. Orchestrator (mellea/stdlib/streaming.py): - except Exception now calls mot.cancel_generation() before surfacing the exception to the consumer — previously the backend producer was left running, eventually blocking on mot._async_queue (maxsize=20). Cleanup failures are logged via MelleaLogger.warning with a TODO(generative-computing#902) marker; generative-computing#902 replaces the log with a proper ErrorEvent. - RuntimeError catch in the astream() loop now re-raises unless mot.is_computed() is true, so only the documented "already computed" race is swallowed. - astream() docstring now states the single-consumer contract explicitly; a second iteration blocks on an empty queue with no sentinel to deliver. - as_thunk docstring now flags the early-exit case: cancel_generation forces is_computed=True without running post_processing(), so generation.usage and related telemetry fields may be None. Chunker (mellea/stdlib/chunking.py): - SentenceChunker.flush switches from .strip() to .rstrip() with a comment explaining why: the loop's lstrip has already removed leading whitespace, and trailing whitespace on a sentence fragment is non-semantic (consistent with split() returning sentences without trailing whitespace). - ParagraphChunker.flush adds a docstring noting the deliberate asymmetry: paragraph fragments are returned byte-for-byte because internal whitespace (e.g. trailing \n of a list item) can be semantically meaningful. Tests (test/stdlib/test_streaming.py): - test_stream_validate_receives_individual_chunks now uses exact- match on the captured chunk list, which directly regresses if someone reverts to accumulated-text semantics. - test_multiple_chunks_in_one_batch_with_mid_batch_fail: response fed as one large token so split() yields 4 sentences at once; verifies chunk 1 emits, chunk 2 fails (not emitted), chunks 3 and 4 are neither validated nor emitted. - test_cancel_generation_invoked_on_fail: spies on ModelOutputThunk.cancel_generation and asserts it was called on the "fail" early-exit path. - test_exception_in_stream_validate_cancels_generation: a requirement that raises must cause cancel_generation to run and the exception to surface via astream()/acomplete() without hanging. Telemetry observability (orchestrator-level spans, metrics, span events) remains deferred to generative-computing#902 per the epic, which now has the acceptance criteria updated to cover event emission, the OTEL bridge, and the ErrorEvent type that will replace the MelleaLogger stopgap. Assisted-by: Claude Code
Three items from the second independent review:
cancel_generation(error=) — accept an optional Exception parameter.
When the orchestrator enters the except Exception path, it now passes
the caught exception to cancel_generation() so the backend telemetry
span records the real cause via set_span_error instead of a generic
RuntimeError("Generation cancelled"). The original exception still
surfaces to the consumer via astream()/acomplete(); this is purely an
OTEL accuracy fix. Backward-compatible: the default None preserves the
previous "Generation cancelled" message for the normal fail path.
stream_with_chunking docstring — the "After the stream ends (naturally
or via early exit), validate() is called" wording overstated behaviour.
The orchestrator actually skips final validate() on early exit
(test_early_exit_on_fail verifies final_validations == []). Docstring
now correctly says final validate() runs only on natural completion.
test_exception_in_stream_validate_cancels_generation docstring — the
test fails on chunk 1 so the queue never actually fills; it verifies
the cancel-on-exception path and the no-hang guarantee but does not
directly prove the worst-case "producer blocked on full queue"
scenario. Docstring now states what it actually covers and points at
test/core/ for the cancel_generation drain logic.
Assisted-by: Claude Code
The Docs CI docstring quality gate [no_class_args]-equivalent check requires every documented method with typed params to have an Args section and a Returns section matching the return annotation. SentenceChunker.flush, WordChunker.flush, and ParagraphChunker.flush all took accumulated_text and returned list[str] without the sections. Add both to each override, documenting each flush's specific semantics (rstrip for sentences, whitespace-split trailing fragment for words, byte-for-byte for paragraphs). Assisted-by: Claude Code
cb75fa7 to
74c009d
Compare
- _orchestrate_streaming: add cancel_generation() in finally block so the backend producer is stopped even on external CancelledError (BaseException bypasses except Exception, leaving _generate hung on a full queue) - cancel_generation: replace .get + del on _telemetry_span with .pop to prevent KeyError if two coroutines race before _computed is set - Example and test doubles: add super().__init__() to Requirement subclasses so description/validation_fn/_output are always initialised - docs/examples: fix pytest tier marker integration → e2e (Ollama example must be e2e per MARKERS_GUIDE; all peer examples use e2e) - test_quick_check_backend_routing: capture clone via __copy__ intercept and assert all seen_backends are val_backend, not just clone-isolation check Assisted-by: Claude Code
|
FYI @jakelorocco — next part of the streaming validation epic is ready for review. |
|
Resolved — all issues addressed across Wave 3 and Wave 4 fix rounds. Ready for review. |
AngeloDanducci
left a comment
There was a problem hiding this comment.
Few small nits.
3rd to last acceptance criteria mentions Real-backend tests marked @pytest.mark.integration and @pytest.mark.ollama but neither of these marks exist (missing tests or needs tagging somewhere? may also just be outdated criteria)
Indeed - outdated criteria. The original thought was they'd be needed to exercise the code, but later in the dev process I added a Mock backend for streaming - so the core tests don't need these flags (just example which needs ollama/e2e as in your comment above) Thanks for spotting the discrepancy though! |
Addresses review feedback on `_orchestrate_streaming` cleanup: - Exceptions caught by the orchestrator were only pushed to the chunk queue, so callers who skipped `astream()` and went straight to `acomplete()` saw the call return silently. Stash the exception on the result and raise it from `acomplete()` with raise-once semantics (cleared by whichever of astream/acomplete reads it first). - The finally cleanup caught `BaseException`, silently absorbing CancelledError/KeyboardInterrupt/SystemExit. Narrow to `except Exception` and switch the terminator to `put_nowait(None)` + `set()` so the sync ops always run even when the task is being cancelled (otherwise acomplete consumers hang). Two regression tests: - test_acomplete_surfaces_exception_without_astream - test_external_task_cancellation_releases_consumers Assisted-by: Claude Code Signed-off-by: Nigel Jones <jonesn@uk.ibm.com>
|
Thanks for the review @psschwei - resolved both points. |
| Safe to call at any point during streaming. After this method returns, | ||
| ``is_computed()`` is ``True`` and ``value`` contains whatever text was | ||
| accumulated before cancellation. Calling on an already-computed MOT | ||
| is a no-op. |
There was a problem hiding this comment.
We should also add a flag indicating that the generation was cancelled.
| "ChunkingStrategy", | ||
| "ParagraphChunker", | ||
| "SentenceChunker", | ||
| "StreamChunkingResult", | ||
| "WordChunker", | ||
| "stream_with_chunking", |
There was a problem hiding this comment.
(This comment is not actionable for the purposes of this PR.)
Some software engineering philosophy:
To make a streaming-based library pleasant to use, the library developers need to provide nice and easy ways to chunk the stream into semantically meaningful bits.
Historically this is the sort of thing that a library would achieve by getting a few "major" methods correct and then providing a... library... of methods that make it easy for devs to build what they need.
I wonder if this is one (sane) way in which coding agents change the way software gets built -- instead of shipping all the things, we instead ship a small number of most-common things (this is a reasonable list) together with a skill for the writing the chunking strategy for an existing requirement. And in this case writing a really good skill for that would be actually possible because you have a ground-truth on the I/O behavior, so the spec for the streaming version of the requirement checker is:
- the streaming requirement implementation should have the same ultimate return value as the non-streaming requirement implementation.
- the streaming requirement implementation should actually do incremental processing (this is a bit more hand-wavy but you could say every X% of tokens or something like that).
| """ | ||
| ... | ||
|
|
||
| def flush(self, accumulated_text: str) -> list[str]: |
There was a problem hiding this comment.
we'll have to think about what to do about streaming from multi-modal models. I'm okay with that not being supported for now.
There was a problem hiding this comment.
After adding a cancellation flag also test that code path.
Misc PR
Type of PR
Description
Implements
stream_with_chunking()— the core streaming orchestration primitive for the streaming validation epic (#891), closing issue #901.Builds on the now-merged #925 (squash-merged as upstream commit
7912a1df), which addedRequirement.stream_validate(). This branch has been rebased directly ontoupstream/main; the 13 Wave 3 commits are8128dfad..3fb501ef.What changed
mellea/core/base.py— addsModelOutputThunk.cancel_generation(error=None): cancels in-progress_generate/_generate_extratasks, drains the internal async queue (to release any blockedput()calls), closes the open telemetry span (recording the providederrorif given, else a genericRuntimeError("Generation cancelled")), and marks the MOT as computed. Uses.pop()on_meta["_telemetry_span"]to preventKeyErrorif two coroutines race before_computedis set.mellea/stdlib/streaming.py(new) —StreamChunkingResultandstream_with_chunking():asyncio.Taskthat consumes the MOT's async stream, splits accumulated text viaChunkingStrategy, and callsstream_validate()once per complete chunk, passing that single chunk (not the accumulation).astream()iteration produces multiple new chunks, they are validated sequentially in order so early exit prevents later chunks in the same batch from being validated or emitted to the consumer."fail"result,cancel_generation()is called andStreamChunkingResult.completedis set toFalse. The failing chunk is not emitted to the consumer; usestreaming_failuresto inspect what failed.ChunkingStrategy.flush()method and run throughstream_validateon the same terms as regular chunks. Skipped on early exit (the fragment is mid-token and incomplete).validate()is called on all non-failed requirements. Skipped on early exit.copy(req)clones each requirement before use; originals are never mutated."sentence","word","paragraph"resolve to the correspondingChunkingStrategysubclasses.stream_validate→ orchestrator callscancel_generation(error=exc)so backend telemetry records the real cause, then surfaces the exception to the consumer viaastream()/acomplete().finallyblock callscancel_generation()when not already computed, so the backend producer is stopped even on externalCancelledError(which bypassesexcept Exception).mellea/stdlib/chunking.py— addsChunkingStrategy.flush(accumulated_text) -> list[str](default returns[]— backward-compatible for external chunkers).SentenceChunker,WordChunker, andParagraphChunkereach override to return the withheld trailing fragment.mellea/stdlib/__init__.py— re-exportsStreamChunkingResultandstream_with_chunking.docs/examples/streaming/streaming_chunking.py(new) — end-to-end example with a statefulMaxSentencesReqshowing the canonical accumulate-on-self pattern. Marked# pytest: ollama, e2e.Spec adherence and deliberate variations
For reviewer attention. Items (a)–(d) are spec-compliant (explicit or, in (d), one of the options the spec names); items (e)–(h) are design decisions taken where the spec is silent.
(a) Chunk semantics — spec-compliant. Addresses @jakelorocco's review on #925 (now approved).
stream_validatereceives a single complete chunk from the chunking strategy, not the accumulated output. Matches the contract in the #891 epic and the #900 spec.(b) Clone-per-attempt — spec-compliant.
copy(req)clones each requirement before use; originals are never mutated (per #891 and #901).(c) Early-exit cancellation — spec-compliant.
cancel_generation()is called on first"fail"to stop the backend producer before it blocks onmot._async_queue(per #891).(d) End-of-stream flush — spec-compliant design choice. The #899 ABC docstring offers two options for the trailing fragment: "pass to a final validator or discard". This PR takes the first by adding
ChunkingStrategy.flush(accumulated_text)and running the returned fragment throughstream_validateon the same terms as regular chunks, so the "no unvalidated content reaches the consumer" invariant extends to the trailing fragment.(e) Exception handling in
stream_validate— variation; spec-silent. The spec coverscancel_generation()on explicit"fail"but not on unhandled exceptions. We extend the same resource-leak reasoning: any orchestrator exit must stop the producer. The original exception is passed tocancel_generation(error=...)so the backend telemetry span records the real cause. Ifcancel_generation()itself raises, we log viaMelleaLogger.warningwith aTODO(#902)marker and still surface the original exception to the consumer. Thefinallyblock additionally covers externalCancelledError(which bypassesexcept Exception) for the same reason.(f)
astream()single-consumer — variation; spec-silent. #901 does not say whether re-iteration is supported. Current implementation is single-consumer (queue drained to theNonesentinel). Docstring updated to state the contract explicitly.(g) Validator latency — variation; spec-silent. Chunks are emitted only after all active validators return for that chunk. A slow
stream_validatetherefore adds latency; the preserved invariant is that the consumer never sees unvalidated content. A concurrent-emission fast path may be added if a concrete use case drives it.(h) Orchestrator-level OTEL deferred to #902 — per the epic's explicit instruction: "these event types are the equivalent observability mechanism for the streaming path". Backend-layer instrumentation (
mot._meta["_telemetry_span"], token/latency/error metrics via plugins) remains intact. Not in this PR: application-level trace span forstream_with_chunking, span events for chunk lifecycle,record_requirement_check/record_requirement_failure/record_sampling_outcome/record_errorcalls, and theErrorEventthat will replace theMelleaLogger.warningstopgap. All enumerated in the #902 acceptance criteria.Testing
test/stdlib/test_streaming.py(new) — 12 unit tests viaStreamingMockBackend(no Ollama needed) covering normal completion, early exit on fail, clone isolation,quick_check_backendrouting (asserts clone sawval_backendfor every call), deadlock prevention,as_thunkcorrectness,astream()chunk granularity, no-requirements passthrough, per-chunk contract (exact-match capture of individual chunks), trailing-fragment flush, early exit on trailing fragment, multi-chunk batch with mid-batch fail, cancel-on-fail spy verification, and exception-in-validator cancellation.test/stdlib/test_chunking.py— 13 new tests coveringChunkingStrategy.flush(ABC default + each built-in chunker's fragment logic).test/core/test_stream_validate.py(on stacked feat: add stream_validate() hook to Requirement (#900) #925) — 9 tests.uv run pytest test/stdlib/test_streaming.py test/stdlib/test_chunking.py test/core/test_astream_mock.py -q).streaming.py(16 tests). Remaining uncovered lines are all error-within-error cleanup paths (cancel_generation raising during exception handling, external CancelledError cleanup failing, acomplete re-raising an orchestration exception) and the TOCTOU RuntimeError race — these require fault injection beyond unit scope and are acceptable gaps.granite4:micro) — example runs cleanly; both sentences of a non-terminated response reach the consumer via the flush path.Attribution