Skip to content

Commit 731336f

Browse files
fsecada01claude
andcommitted
feat: add SSE streaming endpoint for long-running component operations
Closes #12 — StreamingComponent subclass with async generator handlers that yield intermediate renders as Server-Sent Events: - core/streaming.py: StreamingComponent + async_stream_dispatch + format_sse_frame - FastAPI adapter: POST /components/{name}/stream endpoint - Litestar adapter: POST /components/{name}/stream endpoint - Non-generator handlers gracefully produce a single SSE frame - 21 new tests (12 core + 5 FastAPI + 4 Litestar) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent e207a5b commit 731336f

7 files changed

Lines changed: 690 additions & 6 deletions

File tree

src/component_framework/adapters/fastapi.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,14 @@
66
try:
77
from fastapi import HTTPException, Request
88
from fastapi.responses import JSONResponse
9+
from starlette.responses import StreamingResponse
910
except ImportError as e:
1011
from . import _require_extra
1112

1213
raise _require_extra("fastapi", "fastapi") from e
1314

1415
from ..core import StateSerializer, registry
16+
from ..core.streaming import StreamingComponent, format_sse_frame
1517

1618
logger = logging.getLogger(__name__)
1719

@@ -84,9 +86,74 @@ async def component_endpoint(name: str, request: Request) -> JSONResponse:
8486
raise HTTPException(status_code=500, detail="Internal server error")
8587

8688

89+
async def stream_component_endpoint(name: str, request: Request) -> StreamingResponse:
90+
"""
91+
SSE streaming endpoint for long-running component operations.
92+
93+
POST /components/{name}/stream
94+
Body: same as component_endpoint
95+
96+
Returns: text/event-stream with one ``data:`` frame per intermediate render.
97+
"""
98+
try:
99+
component_cls = registry.get(name)
100+
if not component_cls:
101+
raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
102+
103+
if not issubclass(component_cls, StreamingComponent):
104+
raise HTTPException(
105+
status_code=400,
106+
detail=f"Component '{name}' does not support streaming",
107+
)
108+
109+
try:
110+
data = await request.json()
111+
except Exception as e:
112+
raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}")
113+
114+
params = data.get("params", {})
115+
event = data.get("event")
116+
payload_raw = data.get("payload", {})
117+
state_str = data.get("state")
118+
119+
if isinstance(payload_raw, str):
120+
try:
121+
payload = json.loads(payload_raw)
122+
except (json.JSONDecodeError, ValueError):
123+
payload = {}
124+
else:
125+
payload = payload_raw
126+
127+
state = None
128+
if state_str:
129+
try:
130+
state = StateSerializer.deserialize(state_str)
131+
except Exception as e:
132+
raise HTTPException(status_code=400, detail=f"Invalid state: {e}")
133+
134+
component = component_cls(**params)
135+
136+
async def event_generator():
137+
async for frame in component.async_stream_dispatch(
138+
event=event, payload=payload, state=state
139+
):
140+
frame["state"] = StateSerializer.serialize(frame["state"])
141+
yield format_sse_frame(frame)
142+
143+
return StreamingResponse(event_generator(), media_type="text/event-stream")
144+
145+
except HTTPException:
146+
raise
147+
except Exception:
148+
logger.exception(f"Error processing streaming component '{name}'")
149+
raise HTTPException(status_code=500, detail="Internal server error")
150+
151+
87152
def create_component_routes(app):
88153
"""
89-
Add component endpoint to FastAPI app.
154+
Add component endpoints to FastAPI app.
155+
156+
Registers both the standard POST endpoint and the SSE streaming endpoint.
90157
91158
Usage:
92159
from fastapi import FastAPI
@@ -99,3 +166,9 @@ def create_component_routes(app):
99166
methods=["POST"],
100167
name="component_endpoint",
101168
)
169+
app.add_api_route(
170+
"/components/{name}/stream",
171+
stream_component_endpoint,
172+
methods=["POST"],
173+
name="stream_component_endpoint",
174+
)

src/component_framework/adapters/litestar.py

Lines changed: 69 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,14 @@
66
try:
77
from litestar import Request, post
88
from litestar.exceptions import HTTPException
9-
from litestar.response import Response
9+
from litestar.response import Response, Stream
1010
except ImportError as e:
1111
from . import _require_extra
1212

1313
raise _require_extra("litestar", "litestar") from e
1414

1515
from ..core import StateSerializer, registry
16+
from ..core.streaming import StreamingComponent, format_sse_frame
1617

1718
logger = logging.getLogger(__name__)
1819

@@ -86,9 +87,68 @@ async def component_endpoint(name: str, request: Request) -> Response:
8687
raise HTTPException(status_code=500, detail="Internal server error")
8788

8889

90+
@post("/components/{name:str}/stream")
91+
async def stream_component_endpoint(name: str, request: Request) -> Stream:
92+
"""
93+
SSE streaming endpoint for long-running component operations.
94+
95+
POST /components/{name}/stream
96+
Body: same as component_endpoint
97+
98+
Returns: text/event-stream with one ``data:`` frame per intermediate render.
99+
"""
100+
component_cls = registry.get(name)
101+
if not component_cls:
102+
raise HTTPException(status_code=404, detail=f"Component '{name}' not found")
103+
104+
if not issubclass(component_cls, StreamingComponent):
105+
raise HTTPException(
106+
status_code=400,
107+
detail=f"Component '{name}' does not support streaming",
108+
)
109+
110+
try:
111+
data = await request.json()
112+
except Exception as e:
113+
raise HTTPException(status_code=400, detail=f"Invalid JSON: {e}")
114+
115+
params = data.get("params", {})
116+
event = data.get("event")
117+
payload_raw = data.get("payload", {})
118+
state_str = data.get("state")
119+
120+
if isinstance(payload_raw, str):
121+
try:
122+
payload = json.loads(payload_raw)
123+
except (json.JSONDecodeError, ValueError):
124+
payload = {}
125+
else:
126+
payload = payload_raw
127+
128+
state = None
129+
if state_str:
130+
try:
131+
state = StateSerializer.deserialize(state_str)
132+
except Exception as e:
133+
raise HTTPException(status_code=400, detail=f"Invalid state: {e}")
134+
135+
component = component_cls(**params)
136+
137+
async def event_generator():
138+
async for frame in component.async_stream_dispatch(
139+
event=event, payload=payload, state=state
140+
):
141+
frame["state"] = StateSerializer.serialize(frame["state"])
142+
yield format_sse_frame(frame)
143+
144+
return Stream(event_generator(), media_type="text/event-stream", status_code=200)
145+
146+
89147
def create_component_routes(app):
90148
"""
91-
Register the component endpoint handler with a Litestar app.
149+
Register component endpoint handlers with a Litestar app.
150+
151+
Registers both the standard POST endpoint and the SSE streaming endpoint.
92152
93153
Usage:
94154
from litestar import Litestar
@@ -97,9 +157,13 @@ def create_component_routes(app):
97157
app = Litestar(route_handlers=[])
98158
create_component_routes(app)
99159
100-
Alternatively, pass the handler directly at app creation:
101-
from component_framework.adapters.litestar import component_endpoint
160+
Alternatively, pass the handlers directly at app creation:
161+
from component_framework.adapters.litestar import (
162+
component_endpoint,
163+
stream_component_endpoint,
164+
)
102165
103-
app = Litestar(route_handlers=[component_endpoint])
166+
app = Litestar(route_handlers=[component_endpoint, stream_component_endpoint])
104167
"""
105168
app.register(component_endpoint)
169+
app.register(stream_component_endpoint)

src/component_framework/core/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from .registry import ComponentRegistry, registry
1515
from .renderer import Renderer
1616
from .state import InMemoryStateStore, StateStore
17+
from .streaming import StreamingComponent, format_sse_frame
1718
from .websocket import ComponentWebSocketManager, WebSocketConnection, ws_manager
1819

1920
__all__ = [
@@ -36,6 +37,8 @@
3637
"Renderer",
3738
"StateStore",
3839
"InMemoryStateStore",
40+
"StreamingComponent",
41+
"format_sse_frame",
3942
"ComponentWebSocketManager",
4043
"WebSocketConnection",
4144
"ws_manager",
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
"""Streaming component support for SSE-based progressive rendering."""
2+
3+
import inspect
4+
import json
5+
import logging
6+
from collections.abc import AsyncGenerator
7+
8+
from .component import Component, ComponentError, EventNotFoundError
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class StreamingComponent(Component):
14+
"""
15+
Component subclass that supports streaming intermediate renders via SSE.
16+
17+
Event handlers written as async generators will yield intermediate frames.
18+
Each ``yield`` triggers a render and emits the current state as an SSE frame.
19+
20+
Example::
21+
22+
@registry.register("rag_query")
23+
class RagQueryComponent(StreamingComponent):
24+
template_name = "rag_query.html"
25+
26+
async def on_analyze(self, query: str):
27+
async for step in rag_service.stream(query):
28+
self.state["step"] = step
29+
yield # emit intermediate render
30+
self.state["done"] = True
31+
32+
Non-generator handlers (sync or async) are also supported and produce a
33+
single frame, making the streaming endpoint backward-compatible.
34+
"""
35+
36+
async def async_stream_dispatch(
37+
self,
38+
event: str | None = None,
39+
payload: dict | None = None,
40+
state: dict | None = None,
41+
) -> AsyncGenerator[dict, None]:
42+
"""
43+
Async generator entry point for streaming component execution.
44+
45+
Yields one dict per intermediate render, plus a final frame with
46+
``stream_done=True``. Each frame has the same shape as ``dispatch()``
47+
output, with an added ``stream_done`` key.
48+
49+
Args:
50+
event: Event name to handle.
51+
payload: Event data.
52+
state: Serialized state to restore.
53+
54+
Yields:
55+
Dict with ``html``, ``state``, ``component_id``, ``slots``, and
56+
``stream_done`` keys.
57+
"""
58+
try:
59+
# Lifecycle: mount or hydrate
60+
if state:
61+
self.hydrate(state)
62+
else:
63+
self.mount()
64+
65+
if not event:
66+
# No event — just render once
67+
yield self._render_frame(stream_done=True)
68+
return
69+
70+
handler = getattr(self, f"on_{event}", None)
71+
if not handler:
72+
raise EventNotFoundError(f"No handler for event: {event}")
73+
74+
if inspect.isasyncgenfunction(handler):
75+
# Async generator handler — yield intermediate frames
76+
try:
77+
async for _ in handler(**(payload or {})):
78+
yield self._render_frame(stream_done=False)
79+
except TypeError as e:
80+
raise ComponentError(f"Invalid payload for {event}: {e}") from e
81+
except ComponentError:
82+
raise
83+
except Exception as e:
84+
logger.exception(f"Error handling {event} in {self.__class__.__name__}")
85+
raise ComponentError(f"Error handling {event}") from e
86+
87+
# Final frame after generator exhaustion
88+
yield self._render_frame(stream_done=True)
89+
90+
else:
91+
# Non-generator handler (sync or async) — single frame
92+
try:
93+
result = handler(**(payload or {}))
94+
if inspect.isawaitable(result):
95+
await result
96+
except TypeError as e:
97+
raise ComponentError(f"Invalid payload for {event}: {e}") from e
98+
except ComponentError:
99+
raise
100+
except Exception as e:
101+
logger.exception(f"Error handling {event} in {self.__class__.__name__}")
102+
raise ComponentError(f"Error handling {event}") from e
103+
104+
yield self._render_frame(stream_done=True)
105+
106+
except Exception:
107+
logger.exception(f"Error in {self.__class__.__name__}.async_stream_dispatch()")
108+
raise
109+
110+
def _render_frame(self, *, stream_done: bool) -> dict:
111+
"""Render the component and return a frame dict."""
112+
self.before_render()
113+
html = self.render()
114+
return {
115+
"html": html,
116+
"state": self.dehydrate(),
117+
"component_id": self.id,
118+
"slots": self.render_slots(),
119+
"stream_done": stream_done,
120+
}
121+
122+
123+
def format_sse_frame(frame: dict) -> str:
124+
"""Format a frame dict as an SSE ``data:`` line.
125+
126+
Args:
127+
frame: The frame dict from ``async_stream_dispatch``.
128+
129+
Returns:
130+
A string ending with two newlines (SSE message terminator).
131+
"""
132+
return f"data: {json.dumps(frame, default=str)}\n\n"

0 commit comments

Comments
 (0)