Skip to content

[Python] Python] Bound the memory used for fnapi outbound data messages and receiving messages.#38407

Open
scwhittle wants to merge 6 commits into
apache:masterfrom
scwhittle:queue_limit
Open

[Python] Python] Bound the memory used for fnapi outbound data messages and receiving messages.#38407
scwhittle wants to merge 6 commits into
apache:masterfrom
scwhittle:queue_limit

Conversation

@scwhittle
Copy link
Copy Markdown
Contributor

@scwhittle scwhittle commented May 7, 2026

Previously an unbounded queue was used for pending data outputs to
be sent over the fnapi to the runner. If outputs were being generated
faster than the runner was consuming them, this would lead to memory
growth and possible OOMs. This PR introduces a byte-limited queue
data structure that is used instead to limit the # of bytes in the
queue. This was preferred to just using a queue with max number of
elements because the size of elements can vary greatly. For batch
pipelines they are likely large while for stremaing pipelines there
may be more small outputs.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a capacity constraint on the data transmission queue within the gRPC data channel. By limiting the queue size, the system can better manage backpressure when the SDK produces data faster than the runner can consume it, preventing unbounded memory growth.

Highlights

  • Queue Capacity Limit: Updated the _to_send queue in _GrpcDataChannel to enforce a maximum size of 10 elements.
New Features

🧠 You can now enable Memory (public preview) to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a fixed size for the _to_send queue in the _GrpcDataChannel class. Feedback suggests that the hardcoded maxsize=10 should be made configurable to avoid potential performance bottlenecks in high-throughput scenarios and notes a discrepancy between the PR description and the implementation of the receive queue size.

Comment thread sdks/python/apache_beam/runners/worker/data_plane.py Outdated
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 7, 2026

Assigning reviewers:

R: @tvalentyn for label python.

Note: If you would like to opt out of this review, comment assign to next reviewer.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@tvalentyn tvalentyn marked this pull request as draft May 7, 2026 19:11
@tvalentyn
Copy link
Copy Markdown
Contributor

thanks, marking as draft for now

@tvalentyn
Copy link
Copy Markdown
Contributor

waiting on author

@scwhittle scwhittle changed the title Minimal change to push back to dofn generating output if the sdk is producing faster than runner is consuming. [Python] Python] Bound the memory used for fnapi outbound data messages and receiving messages. May 8, 2026
@scwhittle scwhittle force-pushed the queue_limit branch 3 times, most recently from 2a8a8d6 to e8f5bf7 Compare May 8, 2026 13:56
Previously an unbounded queue was used for pending data outputs to
be sent over the fnapi to the runner. If outputs were being generated
faster than the runner was consuming them, this would lead to memory
growth and possible OOMs. This PR introduces a byte-limited queue
data structure that is used instead to limit the # of bytes in the
queue. This was preferred to just using a queue with max number of
elements because the size of elements can vary greatly.  For batch
pipelines they are likely large while for stremaing pipelines there
may be more small outputs.
@scwhittle scwhittle marked this pull request as ready for review May 8, 2026 14:10
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a ByteLimitedQueue to the Apache Beam Python SDK to limit queue capacity by both element count and total byte weight, which is then utilized in the _GrpcDataChannel to manage memory usage. The review feedback highlights several critical improvements for the new queue implementation: using time.monotonic() instead of time.time() for more robust timeout handling, ensuring compatibility with Python 3.13's queue.Queue.shutdown() to prevent deadlocks, and switching from notify() to notify_all() in the _get method to avoid potential producer starvation caused by heterogeneous item weights.

Comment thread sdks/python/apache_beam/utils/byte_limited_queue.py Outdated
Comment thread sdks/python/apache_beam/utils/byte_limited_queue.py Outdated
@scwhittle
Copy link
Copy Markdown
Contributor Author

The failures I looked through were unrelated. Are the tests flaky?

@tvalentyn
Copy link
Copy Markdown
Contributor

tvalentyn commented May 8, 2026

When backpressure happens, would that look to users as a processing lull? If so, that is less than ideal, since we probably should be throttling reading inputs, and we might mislead users that processing is slow. Ideally the Runner would be smart enough to not send them if it cannot consume outputs fast enough...

Have you considered tracking unwritten data at the Data Channel? Something like #38422 perhaps (gemini assisted). I am a bit hesitant with us rewriting queue internals, but I can take a closer look if you think this approach would be better.

Copy link
Copy Markdown
Contributor

@tvalentyn tvalentyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to be conservative we could use regular queue and limit by 10k elements if that resolves this failure mode for the customer. but it might be helpful to have this utility class.

return True
return False

def put(self, item, block=True, timeout=None):
Copy link
Copy Markdown
Contributor

@tvalentyn tvalentyn May 8, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be worth calling out in the docstring that we don't guarantee that the element will land as soon as enough space opens up, since https://github.com/python/cpython/blob/45c47d26c230086163ac1ef0aa9f955f794fb69c/Lib/queue.py#L214-L215 will wake up one random thread that is waiting, which might not be the one that can fit. this is fine as long as we are continuously emptying the queue

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the queue to be fair as I was concerned that if we are limiting we could end up starving a bundle producing large items if there was a bundle producing small items continuously.

Comment thread sdks/python/apache_beam/utils/byte_limited_queue.py
"""A queue.Queue that limits by both element count and total weight.

A single element is allowed to exceed the maxweight to avoid deadlock.
Note that shutdown is only supported after there are no more put calls.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this because it is simpler or because we don't want to shutdown earlier to avoid data loss?
we could also raise NotImplemented error on shutdown if we don't want to implement this contract.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed to not subclass queue.Queue. We weren't using the interface and relying overriding some of internals was messy, in particular with shutdown which was added within the range of python versions beam supports. I also wanted the queue to be fair for reasons above.

I removed shutdown and changed the test to use a poison pill like data_plane does.

@scwhittle
Copy link
Copy Markdown
Contributor Author

When backpressure happens, would that look to users as a processing lull? If so, that is less than ideal, since we probably should be throttling reading inputs, and we might mislead users that processing is slow. Ideally the Runner would be smart enough to not send them if it cannot consume outputs fast enough...

Yes it does show in the logs as long bundle processing. However perhaps that is correct in that the bundle is slow to process due to this. I think that instead of trying to hide that with the runner we should just improve the lull logger to note that is the reason specifically instead of the stack trace.

Have you considered tracking unwritten data at the Data Channel? Something like #38422 perhaps (gemini assisted). I am a bit hesitant with us rewriting queue internals, but I can take a closer look if you think this approach would be better.

A downside of throttling the input is that it isn't sufficient if a single input which is already being processed is outputting a lot of elements. I think that can be the case if we are processing a state-backed iterable as the input element would be KV<k, iterable> where the iterable may be large. Or the input could be filenames and outputs are the file contents etc.

So I think blocking here is ok but we can try to improve the logging.

@scwhittle
Copy link
Copy Markdown
Contributor Author

/gemini review

gemini-code-assist[bot]

This comment was marked as outdated.

@scwhittle
Copy link
Copy Markdown
Contributor Author

If we want to be conservative we could use regular queue and limit by 10k elements if that resolves this failure mode for the customer. but it might be helpful to have this utility class.

Since the size of the items can vary a lot, I think that it is better to try to count the bytes.

@scwhittle
Copy link
Copy Markdown
Contributor Author

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces ByteLimitedQueue, a thread-safe queue that limits capacity based on both element count and total byte size, and integrates it into the _GrpcDataChannel to manage memory more effectively. The implementation includes a Cython interface and comprehensive unit tests. Feedback was provided regarding the use of int in the Cython definition, suggesting Py_ssize_t to prevent potential overflow issues when handling large data volumes.

Comment thread sdks/python/apache_beam/utils/byte_limited_queue.pxd Outdated
@scwhittle scwhittle requested a review from tvalentyn May 11, 2026 15:22
@scwhittle
Copy link
Copy Markdown
Contributor Author

@tvalentyn Could you take another look?

I changed it to no-longer subclass since that was a little gross and tricky to support shutdown properly as it was added within the python version range beam supports. I made publishing fair to deal with the wakeup issue you noticed as well as preventing starvation if we have a large output bundle and small output bundle continually outputting. Since it's no longer a subclass, a pxd file was added to help performance.

@scwhittle
Copy link
Copy Markdown
Contributor Author

errors appear unrelated.

raise ValueError('Unexpected output element type %s' % type(stream))
yield beam_fn_api_pb2.Elements(data=data_stream, timers=timer_stream)

def _get_element_bytes(self, element):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_get_element_bytes sounds as though it returns raw bytes, let's name _get_element_size_bytes ?

if self.max_weight > 0 and self._byte_size + item_size > self.max_weight:
return True
return False
def put(self, item, item_bytes, block=True, timeout=None):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: consider marking block and timeout as keyword-only args.

def put(self, item, item_bytes, *, block=True, timeout=None)

self.assertIn(('t4', 'success'), status)
self.assertIn(('t5', 'timeout'), status)


Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you could also have a test for get with a timeout

with self._mutex:
return len(self._queue)

def _is_full_locked(self, item_bytes):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: since it is used negated in the code, consider instead naming it _can_fit, reversing the logic below and removing negations.


# cython: overflowcheck=True

cdef class ByteLimitedQueue(object):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add the .py counterpart to

extensions = cythonize([
for the cythonized extension to be built.

you can also test performance diffs with regular queue to make sure no surprises there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants