Skip to content

Fix deadlock in AsyncWrapper thread pool shutdown#38427

Open
shunping wants to merge 3 commits intoapache:masterfrom
shunping:fix-async-dofn
Open

Fix deadlock in AsyncWrapper thread pool shutdown#38427
shunping wants to merge 3 commits intoapache:masterfrom
shunping:fix-async-dofn

Conversation

@shunping
Copy link
Copy Markdown
Collaborator

@shunping shunping commented May 9, 2026

During test execution or pipeline teardown, calling reset_state on AsyncWrapper intermittently hangs the pipeline indefinitely, leading to test runner timeouts (https://github.com/apache/beam/actions/runs/25590690572/job/75127752557?pr=38425).

The traceback of timeout is shown below:

________________________ AsyncTest_1.test_duplicates __________________________
[gw0] darwin -- Python 3.13.13 /Users/runner/work/beam/beam/sdks/python/target/.tox/py313-macos/bin/python

self = <apache_beam.transforms.async_dofn_test.AsyncTest_1 testMethod=test_duplicates>

    def setUp(self):
      super().setUp()
>     async_lib.AsyncWrapper.reset_state()

apache_beam/transforms/async_dofn_test.py:102: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox/py313-macos/lib/python3.13/site-packages/apache_beam/transforms/async_dofn.py:169: in reset_state
    pool.acquire(AsyncWrapper.initialize_pool(1)).shutdown(
/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/concurrent/futures/thread.py:239: in shutdown
    t.join()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <Thread(ThreadPoolExecutor-180_0, stopped 14025928704)>, timeout = None

    def join(self, timeout=None):
        """Wait until the thread terminates.
    
        This blocks the calling thread until the thread whose join() method is
        called terminates -- either normally or through an unhandled exception
        or until the optional timeout occurs.
    
        When the timeout argument is present and not None, it should be a
        floating-point number specifying a timeout for the operation in seconds
        (or fractions thereof). As join() always returns None, you must call
        is_alive() after join() to decide whether a timeout happened -- if the
        thread is still alive, the join() call timed out.
    
        When the timeout argument is not present or None, the operation will
        block until the thread terminates.
    
        A thread can be join()ed many times.
    
        join() raises a RuntimeError if an attempt is made to join the current
        thread as that would cause a deadlock. It is also an error to join() a
        thread before it has been started and attempts to do so raises the same
        exception.
    
        """
        if not self._initialized:
            raise RuntimeError("Thread.__init__() not called")
        if not self._started.is_set():
            raise RuntimeError("cannot join thread before it is started")
        if self is current_thread():
            raise RuntimeError("cannot join current thread")
    
        # the behavior of a negative timeout isn't documented, but
    self._bootstrap_inner()
  File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py", line 1044, in _bootstrap_inner
    self.run()
  File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py", line 995, in run
    self._target(*self._args, **self._kwargs)
  File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/multiprocessing/managers.py", line 179, in serve_forever
    self.stop_event.wait(1)
  File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py", line 660, in wait
    signaled = self._cond.wait(timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.13/lib/python3.13/threading.py", line 363, in wait
    gotit = waiter.acquire(True, timeout)

The deadlock occurs in the following order:

  • The main thread executing reset_state acquires AsyncWrapper._lock and calls pool.shutdown(wait=True). It remains blocked waiting for active worker threads to finish while holding the lock.
  • When an active worker thread finishes processing an item, the underlying future completes and tries to invoke its done callbacks on that same worker thread. The registered callback decrement_items_in_buffer, however, attempts to acquire AsyncWrapper._lock to decrement the buffer count. This leads to a deadlock.

To fix it, we move the shutdown call outside of the lock scope, so while the main thread is waiting for worker threads to complete, it won't hold the lock.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request addresses an intermittent deadlock issue occurring during the teardown of AsyncWrapper thread pools. By decoupling the shutdown process from the internal state lock, the main thread can now safely wait for worker threads to complete without blocking the callbacks that require the same lock. Additionally, defensive checks and a robust reproduction test have been added to ensure stability.

Highlights

  • Deadlock Resolution: Refactored the reset_state method in AsyncWrapper to perform thread pool shutdowns outside of the internal lock scope, preventing deadlocks during cleanup.
  • Safety Improvements: Added a safety check in decrement_items_in_buffer to ensure the UUID exists in the buffer before attempting to decrement, avoiding potential key errors.
  • Regression Testing: Introduced a new test case test_reset_state_hang_reproduction that uses a separate process to verify that reset_state no longer hangs the pipeline.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

@github-actions github-actions Bot added the python label May 9, 2026
@shunping
Copy link
Copy Markdown
Collaborator Author

shunping commented May 9, 2026

r: @dustin12

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request addresses a deadlock issue in AsyncWrapper.reset_state by moving the pool shutdown process outside of the global lock and adds a regression test to detect such hangs. A review comment correctly identifies that the pool.acquire() call remains inside the lock, which could still cause a deadlock if the pool is at capacity, and provides a suggestion to move it outside the lock scope.

Comment thread sdks/python/apache_beam/transforms/async_dofn.py Outdated
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 9, 2026

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant