Skip to content

feat(stdlib): add standard streaming event types #902

@planetf1

Description

@planetf1

What

Add a standard event dataclass hierarchy to mellea/stdlib/streaming.py (the module created in #901).

@dataclass
class StreamEvent:
    timestamp: float  # auto-populated via __post_init__

@dataclass
class ChunkEvent(StreamEvent):
    text: str
    chunk_index: int
    attempt: int

@dataclass
class QuickCheckEvent(StreamEvent):
    chunk_index: int
    attempt: int
    passed: bool
    results: list[PartialValidationResult]

@dataclass
class StreamingDoneEvent(StreamEvent):
    attempt: int
    full_text: str

@dataclass
class FullValidationEvent(StreamEvent):
    attempt: int
    passed: bool
    results: list[ValidationResult]

@dataclass
class RetryEvent(StreamEvent):
    attempt: int
    reason: str

@dataclass
class CompletedEvent(StreamEvent):
    success: bool
    full_text: str
    attempts_used: int

Why

These are the shared vocabulary between the streaming orchestrator and its consumers — UIs, m serve, and plugins. Without a standard set of types in mellea, every consumer defines its own parallel types or takes a dependency on mellea-partial. The sampling strategy plugin hooks (SAMPLING_LOOP_START, SAMPLING_REPAIR, etc.) do not fire during streaming — these event types are the Phase 1 observability substitute. Note that on early exit via cancel_generation(), post_processing() does not run, so mot.generation.usage (token counts) will be None; the CompletedEvent should capture whether generation completed normally or was cancelled.

These are plain dataclasses with no logic and no transport dependency — the minimum mellea needs to own. Once this lands, Hendrik can update mellea-partial to import and emit these types directly.

Implementation

Add to mellea/stdlib/streaming.py. timestamp should be auto-populated in __post_init__ using time.time() so callers do not need to supply it. Export all event types from mellea.stdlib.

Acceptance criteria

  • All seven event types exist in mellea/stdlib/streaming.py
  • StreamEvent.timestamp is auto-populated in __post_init__ — callers do not set it
  • QuickCheckEvent.results typed as list[PartialValidationResult]
  • FullValidationEvent.results typed as list[ValidationResult]
  • All types exported from mellea.stdlib
  • Unit tests in test/stdlib/test_streaming.py covering construction of each event type
  • Google-style docstring on each event type describing when it is emitted

Additional scope: orchestrator emits events and bridges to OTEL

Deferred from #901 per the epic's explicit instruction that streaming observability lives in the event types, not in the sampling plugin hooks. This issue is the right home because event emission and OTEL recording happen at the same call sites.

Event emission from stream_with_chunking

The orchestrator must emit each event type at the appropriate point:

  • ChunkEvent — on each validated chunk yielded to the consumer (post-validation, including the flushed trailing fragment)
  • QuickCheckEvent — after each per-chunk stream_validate batch (one per chunk, covering all active requirements)
  • StreamingDoneEvent — when the raw token stream ends, before final validate()
  • FullValidationEvent — after the final validate() call on non-failed requirements
  • CompletedEvent — on orchestrator exit, including early-exit cases where completed=False
  • New: ErrorEvent — if stream_validate raises or any other unplanned exit. Replaces the temporary MelleaLogger.warning stopgap in the exception path of _orchestrate_streaming (introduced in feat(stdlib): add stream_with_chunking() with per-chunk validation (#901) #942 to satisfy "fail loud" while this issue lands).

RetryEvent stays deferred — v1 retry is caller-driven re-invocation of stream_with_chunking, so there is no orchestrator-side emission point yet. Revisit when an orchestrator-side retry mechanism is introduced.

OTEL bridge at the event-emission sites

  • Open an application trace span for stream_with_chunking (mellea.application scope). No orchestrator-level span exists today. Close with set_span_error on early-exit fail. This is separate from the backend span, which cancel_generation() already closes correctly (base.py:413-419).
  • Each event additionally becomes a span event on the application span, with event fields attached as attributes.
  • QuickCheckEvent records record_requirement_check; QuickCheckEvent with passed=False additionally records record_requirement_failure with the fail reason.
  • CompletedEvent records record_sampling_outcome("stream_with_chunking", success=...).
  • ErrorEvent records record_error(error_type, provider, model).

Open design questions to resolve during implementation

  • Delivery mechanism. Two candidates: (a) StreamChunkingResult.events() async iterator mirroring the existing astream(); (b) stream_with_chunking(..., on_event=callback). (a) is more consistent with the existing API but requires a second queue and consumer-single-consumer discipline. (b) is simpler to bridge synchronously into mellea-partial UI consumers. Pick one during implementation based on whichever is cleaner to bridge to mellea-partial.
  • RetryEvent scope. Confirmed deferred here because v1 retry is caller-driven. When/if an orchestrator-side retry lands (likely a future issue, not in the current epic), decide at that point whether RetryEvent fires from within the orchestrator or from the caller that drives the retry loop.

Additional acceptance criteria

  • Add ErrorEvent dataclass with exception_type: str and detail: str fields (in addition to inherited timestamp)
  • stream_with_chunking emits ChunkEvent, QuickCheckEvent, StreamingDoneEvent, FullValidationEvent, CompletedEvent, and ErrorEvent at the documented points
  • Exception cleanup in _orchestrate_streaming replaces the MelleaLogger.warning stopgap with an ErrorEvent emission (remove the # TODO(#902) comment at the same time)
  • Application trace span opened and closed by the orchestrator, set_span_error called on early-exit fail
  • record_requirement_check, record_requirement_failure, record_sampling_outcome, record_error called at the documented points
  • Unit tests for event emission order and content via a capturing subscriber
  • OTEL side-effect tests via existing metric/span test fixtures
  • Delivery mechanism (async iterator vs. callback) decided and documented in the orchestrator docstring

Blocked by #901
Part of #891

Metadata

Metadata

Assignees

Labels

enhancementNew feature or request

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions