Skip to content

Commit c5b7070

Browse files
committed
add asyncio timeout() context manager
This is an implementation of Python's `asyncio.timeout()` which was introduced in 3.11. We strongly encourage the use of the following approach instead of `asyncio.wait_for()`. async with timeout(delay): # Your async code In Python 3.12, `asyncio.wait_for()` is implemented in terms of `asyncio.timeout()` which fixed a bunch of issues. However, this was never backported (because of the lack of `async.timeout()`) and there are still many remainig issues, specially in Python 3.10, in `async.wait_for()`. See python/cpython#98518
1 parent 5286591 commit c5b7070

29 files changed

Lines changed: 184 additions & 72 deletions

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
99

1010
### Added
1111

12+
- Added `pipecat.utils.timeout.timeout()` asynchronous context manager. This
13+
should be preferred over `asyncio.wait_for()`.
14+
1215
- Allow passing custom pipeline sink and source processors to a
1316
`Pipeline`. Pipeline source and sink processors are used to know and control
1417
what's coming in and out of a `Pipeline` processor.
@@ -46,6 +49,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
4649

4750
### Fixed
4851

52+
- Replaced `asyncio.wait_for()` for `async with timeout(delay):`. In Python
53+
3.10, `asyncio.wait_for()` has issues regarding task cancellation
54+
(i.e. cancellation is never propagated).
55+
See https://bugs.python.org/issue42130
56+
4957
- Fixed an `AudioBufferProcessor` issues that would cause audio overlap when
5058
setting a max buffer size.
5159

scripts/evals/eval.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
from pipecat.services.llm_service import FunctionCallParams
4444
from pipecat.services.openai.llm import OpenAILLMService
4545
from pipecat.transports.services.daily import DailyParams, DailyTransport
46+
from pipecat.utils.asyncio.timeout import timeout
4647

4748
SCRIPT_DIR = Path(__file__).resolve().parent
4849

@@ -122,7 +123,8 @@ async def run_eval(self, example_file: str, prompt: EvalPrompt, eval: Optional[s
122123
logger.error(f"ERROR: Unable to run {example_file}: {e}")
123124

124125
try:
125-
result = await asyncio.wait_for(self._queue.get(), timeout=1.0)
126+
async with timeout(1.0):
127+
result = await self._queue.get()
126128
except asyncio.TimeoutError:
127129
result = False
128130

src/pipecat/pipeline/task.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
TaskManager,
5252
TaskManagerParams,
5353
)
54+
from pipecat.utils.asyncio.timeout import timeout
5455
from pipecat.utils.asyncio.watchdog_queue import WatchdogQueue
5556
from pipecat.utils.tracing.setup import is_tracing_available
5657
from pipecat.utils.tracing.turn_trace_observer import TurnTraceObserver
@@ -654,7 +655,8 @@ async def _heartbeat_monitor_handler(self):
654655
wait_time = HEARTBEAT_MONITOR_SECONDS
655656
while True:
656657
try:
657-
frame = await asyncio.wait_for(self._heartbeat_queue.get(), timeout=wait_time)
658+
async with timeout(wait_time):
659+
frame = await self._heartbeat_queue.get()
658660
process_time = (self._clock.get_time() - frame.timestamp) / 1_000_000_000
659661
logger.trace(f"{self}: heartbeat frame processed in {process_time} seconds")
660662
self._heartbeat_queue.task_done()
@@ -677,9 +679,8 @@ async def _idle_monitor_handler(self):
677679

678680
while running:
679681
try:
680-
frame = await asyncio.wait_for(
681-
self._idle_queue.get(), timeout=self._idle_timeout_secs
682-
)
682+
async with timeout(self._idle_timeout_secs):
683+
frame = await self._idle_queue.get()
683684

684685
if not isinstance(frame, InputAudioRawFrame):
685686
frame_buffer.append(frame)

src/pipecat/processors/aggregators/dtmf_aggregator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
TranscriptionFrame,
2626
)
2727
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
28+
from pipecat.utils.asyncio.timeout import timeout
2829
from pipecat.utils.time import time_now_iso8601
2930

3031

@@ -136,7 +137,8 @@ async def _aggregation_task_handler(self):
136137
"""Background task that handles timeout-based flushing."""
137138
while True:
138139
try:
139-
await asyncio.wait_for(self._digit_event.wait(), timeout=self._idle_timeout)
140+
async with timeout(self._idle_timeout):
141+
await self._digit_event.wait()
140142
self._digit_event.clear()
141143
except asyncio.TimeoutError:
142144
self.reset_watchdog()

src/pipecat/processors/aggregators/llm_response.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
OpenAILLMContextFrame,
6161
)
6262
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
63+
from pipecat.utils.asyncio.timeout import timeout
6364
from pipecat.utils.time import time_now_iso8601
6465

6566

@@ -660,17 +661,18 @@ async def _aggregation_task_handler(self):
660661
# consistent user experience between real VAD detection and
661662
# emulated VAD scenarios.
662663
if not self._emulating_vad:
663-
timeout = self._params.aggregation_timeout
664+
delay = self._params.aggregation_timeout
664665
elif self._turn_params:
665-
timeout = self._params.turn_emulated_vad_timeout
666+
delay = self._params.turn_emulated_vad_timeout
666667
else:
667668
# Use VAD stop_secs when no turn analyzer is present, fallback if no VAD params
668-
timeout = (
669+
delay = (
669670
self._vad_params.stop_secs
670671
if self._vad_params
671672
else self._params.turn_emulated_vad_timeout
672673
)
673-
await asyncio.wait_for(self._aggregation_event.wait(), timeout)
674+
async with timeout(delay):
675+
await self._aggregation_event.wait()
674676
await self._maybe_emulate_user_speaking()
675677
except asyncio.TimeoutError:
676678
if not self._user_speaking:

src/pipecat/processors/idle_frame_processor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from pipecat.frames.frames import Frame, StartFrame
1313
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
14+
from pipecat.utils.asyncio.timeout import timeout
1415
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
1516

1617

@@ -85,7 +86,8 @@ async def _idle_task_handler(self):
8586
"""Handle idle timeout monitoring and callback execution."""
8687
while True:
8788
try:
88-
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
89+
async with timeout(self._timeout):
90+
await self._idle_event.wait()
8991
except asyncio.TimeoutError:
9092
await self._callback(self)
9193
finally:

src/pipecat/processors/user_idle_processor.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
UserStoppedSpeakingFrame,
2323
)
2424
from pipecat.processors.frame_processor import FrameDirection, FrameProcessor
25+
from pipecat.utils.asyncio.timeout import timeout
2526
from pipecat.utils.asyncio.watchdog_event import WatchdogEvent
2627

2728

@@ -191,7 +192,8 @@ async def _idle_task_handler(self) -> None:
191192
"""
192193
while True:
193194
try:
194-
await asyncio.wait_for(self._idle_event.wait(), timeout=self._timeout)
195+
async with timeout(self._timeout):
196+
await self._idle_event.wait()
195197
except asyncio.TimeoutError:
196198
if not self._interrupted:
197199
self._retry_count += 1

src/pipecat/services/anthropic/llm.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
)
5454
from pipecat.processors.frame_processor import FrameDirection
5555
from pipecat.services.llm_service import FunctionCallFromLLM, LLMService
56+
from pipecat.utils.asyncio.timeout import timeout
5657
from pipecat.utils.asyncio.watchdog_async_iterator import WatchdogAsyncIterator
5758
from pipecat.utils.tracing.service_decorators import traced_llm
5859

@@ -185,9 +186,8 @@ async def _create_message_stream(self, api_call, params):
185186
"""
186187
if self._retry_on_timeout:
187188
try:
188-
response = await asyncio.wait_for(
189-
api_call(**params), timeout=self._retry_timeout_secs
190-
)
189+
async with timeout(self._retry_timeout_secs):
190+
response = await api_call(**params)
191191
return response
192192
except (APITimeoutError, asyncio.TimeoutError):
193193
# Retry, this time without a timeout so we get a response

src/pipecat/services/assemblyai/stt.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
from pipecat.processors.frame_processor import FrameDirection
3232
from pipecat.services.stt_service import STTService
3333
from pipecat.transcriptions.language import Language
34+
from pipecat.utils.asyncio.timeout import timeout
3435
from pipecat.utils.time import time_now_iso8601
3536
from pipecat.utils.tracing.service_decorators import traced_stt
3637

@@ -219,10 +220,8 @@ async def _disconnect(self):
219220
await self._websocket.send(json.dumps({"type": "Terminate"}))
220221

221222
try:
222-
await asyncio.wait_for(
223-
self._termination_event.wait(),
224-
timeout=5.0,
225-
)
223+
async with timeout(5.0):
224+
await self._termination_event.wait()
226225
except asyncio.TimeoutError:
227226
logger.warning("Timed out waiting for termination message from server")
228227

@@ -247,7 +246,8 @@ async def _receive_task_handler(self):
247246
try:
248247
while self._connected:
249248
try:
250-
message = await asyncio.wait_for(self._websocket.recv(), timeout=1.0)
249+
async with timeout(1.0):
250+
message = await self._websocket.recv()
251251
data = json.loads(message)
252252
await self._handle_message(data)
253253
except asyncio.TimeoutError:

src/pipecat/services/aws/llm.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
)
5353
from pipecat.processors.frame_processor import FrameDirection
5454
from pipecat.services.llm_service import LLMService
55+
from pipecat.utils.asyncio.timeout import timeout
5556
from pipecat.utils.tracing.service_decorators import traced_llm
5657

5758
try:
@@ -801,9 +802,8 @@ async def _create_converse_stream(self, client, request_params):
801802
"""
802803
if self._retry_on_timeout:
803804
try:
804-
response = await asyncio.wait_for(
805-
client.converse_stream(**request_params), timeout=self._retry_timeout_secs
806-
)
805+
async with timeout(self._retry_timeout_secs):
806+
response = await client.converse_stream(**request_params)
807807
return response
808808
except (ReadTimeoutError, asyncio.TimeoutError) as e:
809809
# Retry, this time without a timeout so we get a response

0 commit comments

Comments
 (0)