Skip to content

Commit 6cb2289

Browse files
giles17Copilot
andauthored
Auto-finalize ResponseStream on iteration completion (#4478)
* Add multi-turn streaming sample and rename multi-turn samples - Rename 03_multi_turn.py to 03a_multi_turn.py - Add 03b_multi_turn_streaming.py showing streaming with session history - The new sample demonstrates calling get_final_response() after iterating the stream to persist conversation history - Update READMEs to reflect the new file names Closes #4447 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Auto-finalize ResponseStream on iteration completion When a ResponseStream is fully consumed via async iteration, automatically trigger finalization (finalizer + result hooks). This ensures session history is persisted in streaming multi-turn conversations without requiring an explicit get_final_response() call. - Add auto-finalize call in __anext__ on StopAsyncIteration - Guard inner stream finalization to prevent double-execution - Re-check _finalized after iteration in get_final_response() - Add tests for auto-finalization and streaming session history - Revert sample file renames from previous commit Closes #4447 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * README fix * Fix SIM102 lint: combine nested if statements Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 2aaca50 commit 6cb2289

5 files changed

Lines changed: 122 additions & 31 deletions

File tree

python/packages/core/agent_framework/_types.py

Lines changed: 34 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2776,6 +2776,7 @@ async def __anext__(self) -> UpdateT:
27762776
except StopAsyncIteration:
27772777
self._consumed = True
27782778
await self._run_cleanup_hooks()
2779+
await self.get_final_response()
27792780
raise
27802781
except Exception:
27812782
await self._run_cleanup_hooks()
@@ -2825,34 +2826,38 @@ async def get_final_response(self) -> FinalT:
28252826
await self._get_stream()
28262827
if self._inner_stream is None:
28272828
raise RuntimeError("Inner stream not available")
2828-
if not self._finalized:
2829+
if not self._finalized and not self._consumed:
28292830
# Consume outer stream (which delegates to inner) if not already consumed
2830-
if not self._consumed:
2831-
async for _ in self:
2832-
pass
2831+
async for _ in self:
2832+
pass
28332833

2834-
# First, finalize the inner stream and run its result hooks
2834+
# Re-check: __anext__ auto-finalization may have already finalized this stream
2835+
if not self._finalized:
28352836
# This ensures inner post-processing (e.g., context provider notifications) runs
2836-
inner_stream = self._inner_stream
2837-
inner_result: Any
2838-
if inner_stream._finalizer is not None:
2839-
inner_finalizer = inner_stream._finalizer
2840-
inner_result = inner_finalizer(inner_stream._updates)
2841-
if isawaitable(inner_result):
2842-
inner_result = await inner_result
2837+
# Skip if inner stream was already finalized (e.g., via auto-finalization on iteration)
2838+
if not self._inner_stream._finalized:
2839+
inner_stream = self._inner_stream
2840+
inner_result: Any
2841+
if inner_stream._finalizer is not None:
2842+
inner_finalizer = inner_stream._finalizer
2843+
inner_result = inner_finalizer(inner_stream._updates)
2844+
if isawaitable(inner_result):
2845+
inner_result = await inner_result
2846+
else:
2847+
inner_result = list(inner_stream._updates)
2848+
2849+
# Run inner stream's result hooks
2850+
inner_hooks = cast(list[Callable[[Any], Any | Awaitable[Any] | None]], inner_stream._result_hooks)
2851+
for hook in inner_hooks:
2852+
hooked_result = hook(inner_result)
2853+
if isawaitable(hooked_result):
2854+
hooked_result = await hooked_result
2855+
if hooked_result is not None:
2856+
inner_result = hooked_result
2857+
inner_stream._final_result = inner_result
2858+
inner_stream._finalized = True
28432859
else:
2844-
inner_result = list(inner_stream._updates)
2845-
2846-
# Run inner stream's result hooks
2847-
inner_hooks = cast(list[Callable[[Any], Any | Awaitable[Any] | None]], inner_stream._result_hooks)
2848-
for hook in inner_hooks:
2849-
hooked_result = hook(inner_result)
2850-
if isawaitable(hooked_result):
2851-
hooked_result = await hooked_result
2852-
if hooked_result is not None:
2853-
inner_result = hooked_result
2854-
inner_stream._final_result = inner_result
2855-
inner_stream._finalized = True
2860+
inner_result = self._inner_stream._final_result
28562861

28572862
# Now finalize the outer stream with its own finalizer
28582863
# If outer has no finalizer, use inner's result (preserves from_awaitable behavior)
@@ -2877,12 +2882,12 @@ async def get_final_response(self) -> FinalT:
28772882
self._finalized = True
28782883
return self._final_result # type: ignore[return-value]
28792884

2880-
if not self._finalized:
2881-
if not self._consumed:
2882-
async for _ in self:
2883-
pass
2885+
if not self._finalized and not self._consumed:
2886+
async for _ in self:
2887+
pass
28842888

2885-
# Use finalizer if configured, otherwise return collected updates
2889+
# Re-check: __anext__ auto-finalization may have already finalized this stream
2890+
if not self._finalized:
28862891
result: Any
28872892
if self._finalizer is not None:
28882893
result = self._finalizer(self._updates)

python/packages/core/tests/core/test_agents.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,40 @@ async def test_chat_client_agent_streaming_session_id_set_without_get_final_resp
357357
assert session.service_session_id == "resp_123"
358358

359359

360+
async def test_chat_client_agent_streaming_session_history_saved_without_get_final_response(
361+
chat_client_base: SupportsChatGetResponse,
362+
) -> None:
363+
"""Test that session history is saved after streaming iteration without get_final_response().
364+
365+
Auto-finalization on iteration completion should trigger after_run providers,
366+
persisting conversation history to the session.
367+
"""
368+
from agent_framework._sessions import InMemoryHistoryProvider
369+
370+
chat_client_base.streaming_responses = [
371+
[
372+
ChatResponseUpdate(
373+
contents=[Content.from_text("Hello Alice!")],
374+
role="assistant",
375+
response_id="resp_1",
376+
finish_reason="stop",
377+
),
378+
]
379+
]
380+
381+
agent = Agent(client=chat_client_base)
382+
session = agent.create_session()
383+
384+
# Only iterate — do NOT call get_final_response()
385+
async for _ in agent.run("My name is Alice", session=session, stream=True):
386+
pass
387+
388+
chat_messages: list[Message] = session.state.get(InMemoryHistoryProvider.DEFAULT_SOURCE_ID, {}).get("messages", [])
389+
assert len(chat_messages) == 2
390+
assert chat_messages[0].text == "My name is Alice"
391+
assert chat_messages[1].text == "Hello Alice!"
392+
393+
360394
async def test_chat_client_agent_update_session_messages(client: SupportsChatGetResponse) -> None:
361395
from agent_framework._sessions import InMemoryHistoryProvider
362396

python/packages/core/tests/core/test_types.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2666,6 +2666,58 @@ async def test_updates_property_returns_collected(self) -> None:
26662666
assert stream.updates[0].text == "update_0"
26672667
assert stream.updates[1].text == "update_1"
26682668

2669+
async def test_auto_finalize_on_iteration_completion(self) -> None:
2670+
"""Stream auto-finalizes when async iteration completes."""
2671+
stream = ResponseStream(_generate_updates(2), finalizer=_combine_updates)
2672+
2673+
async for _ in stream:
2674+
pass
2675+
2676+
assert stream._finalized is True
2677+
assert stream._final_result is not None
2678+
assert stream._final_result.text == "update_0update_1"
2679+
2680+
async def test_auto_finalize_runs_result_hooks(self) -> None:
2681+
"""Result hooks run automatically when iteration completes."""
2682+
hook_called = {"value": False}
2683+
2684+
def tracking_hook(response: ChatResponse) -> ChatResponse:
2685+
hook_called["value"] = True
2686+
response.additional_properties["auto_finalized"] = True
2687+
return response
2688+
2689+
stream = ResponseStream(
2690+
_generate_updates(2),
2691+
finalizer=_combine_updates,
2692+
result_hooks=[tracking_hook],
2693+
)
2694+
2695+
async for _ in stream:
2696+
pass
2697+
2698+
assert hook_called["value"] is True
2699+
final = await stream.get_final_response()
2700+
assert final.additional_properties["auto_finalized"] is True
2701+
2702+
async def test_get_final_response_idempotent_after_auto_finalize(self) -> None:
2703+
"""get_final_response returns cached result after auto-finalization."""
2704+
call_count = {"value": 0}
2705+
2706+
def counting_finalizer(updates: list[ChatResponseUpdate]) -> ChatResponse:
2707+
call_count["value"] += 1
2708+
return _combine_updates(updates)
2709+
2710+
stream = ResponseStream(_generate_updates(2), finalizer=counting_finalizer)
2711+
2712+
async for _ in stream:
2713+
pass
2714+
2715+
final1 = await stream.get_final_response()
2716+
final2 = await stream.get_final_response()
2717+
2718+
assert call_count["value"] == 1
2719+
assert final1.text == final2.text
2720+
26692721

26702722
class TestResponseStreamTransformHooks:
26712723
"""Tests for transform hooks (per-update processing)."""

python/samples/01-get-started/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME="gpt-4o" # optional, defaults to
2222
|---|------|-------------------|
2323
| 1 | [01_hello_agent.py](01_hello_agent.py) | Create your first agent and run it (streaming and non-streaming). |
2424
| 2 | [02_add_tools.py](02_add_tools.py) | Define a function tool with `@tool` and attach it to an agent. |
25-
| 3 | [03_multi_turn.py](03_multi_turn.py) | Keep conversation history across turns with `AgentThread`. |
25+
| 3 | [03_multi_turn.py](03_multi_turn.py) | Keep conversation history across turns with `AgentSession`. |
2626
| 4 | [04_memory.py](04_memory.py) | Add dynamic context with a custom `ContextProvider`. |
2727
| 5 | [05_first_workflow.py](05_first_workflow.py) | Chain executors into a workflow with edges. |
2828
| 6 | [06_host_your_agent.py](06_host_your_agent.py) | Host a single agent with Azure Functions. |

python/samples/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ Start with `01-get-started/` and work through the numbered files:
1818

1919
1. **[01_hello_agent.py](./01-get-started/01_hello_agent.py)** — Create and run your first agent
2020
2. **[02_add_tools.py](./01-get-started/02_add_tools.py)** — Add function tools with `@tool`
21-
3. **[03_multi_turn.py](./01-get-started/03_multi_turn.py)** — Multi-turn conversations with `AgentThread`
21+
3. **[03_multi_turn.py](./01-get-started/03_multi_turn.py)** — Multi-turn conversations with `AgentSession`
2222
4. **[04_memory.py](./01-get-started/04_memory.py)** — Agent memory with `ContextProvider`
2323
5. **[05_first_workflow.py](./01-get-started/05_first_workflow.py)** — Build a workflow with executors and edges
2424
6. **[06_host_your_agent.py](./01-get-started/06_host_your_agent.py)** — Host your agent via Azure Functions

0 commit comments

Comments
 (0)