This repository was archived by the owner on Mar 26, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 78
chore: increase async rpc performance #1755
Merged
Merged
Changes from 20 commits
Commits
Show all changes
47 commits
Select commit
Hold shift + click to select a range
3b4feff
added pre-wrapping to async client
daniel-sanche 4e9cc05
copied over optimization for input proto
daniel-sanche 90d41ec
avoid compiling regex on each call
daniel-sanche 8001304
Revert "avoid compiling regex on each call"
daniel-sanche 6a73c13
Merge branch 'main' into async-optimizations
daniel-sanche 1ac92d5
Merge branch 'main' into async-optimizations
daniel-sanche 6113869
use AsyncRetry
daniel-sanche 2ce9300
added missing imports
daniel-sanche 0b3daa0
use async_retries
daniel-sanche 7bdcee2
added missing if block
daniel-sanche 266fbb4
updated goldens
daniel-sanche b6835fe
Merge branch 'main' into async-optimizations
daniel-sanche 9e3bee5
added test for wrappers
daniel-sanche 6477533
regenerated goldens
daniel-sanche e7d6363
add fix for operation rpcs
daniel-sanche 25636ff
only generate rest test on supported clients
daniel-sanche af85299
updated goldens
daniel-sanche 0a402ad
updated method name calculation for tests
daniel-sanche 6cfe1fa
addressed comments
daniel-sanche 8f72ea4
updated goldens
daniel-sanche ac43d7f
Merge branch 'main' into async-optimizations
daniel-sanche bbddf0a
add bullets
daniel-sanche ce68134
improved indentation
daniel-sanche f0b3466
update docstring
daniel-sanche e26a910
fix comment
daniel-sanche a5bd36e
only add operation comment on operation tests
daniel-sanche 2c04216
changed sync comments to match async
daniel-sanche 5aebfb3
updated goldens
daniel-sanche 153d0b7
added asyncio mark to test
daniel-sanche 11366ca
fixed formatting
daniel-sanche 85b1645
added extended lro check
daniel-sanche 0796c82
improving async tests
daniel-sanche 8b9aa4c
remove full_extended_lro
daniel-sanche 95e51ac
trying different async mock method
daniel-sanche 7862e6c
changed async mock
daniel-sanche 61a23bf
capture args
daniel-sanche 4a80a44
updated goldens
daniel-sanche 8c5f860
call mock
daniel-sanche db060fb
use awaitable subclass
daniel-sanche 880432c
removed type hint
daniel-sanche 3336a01
simplified method calling
daniel-sanche 60b5fce
moved test
daniel-sanche f895f7d
use proper method names
daniel-sanche b0068f7
moved test
daniel-sanche 10fb69f
moved test back
daniel-sanche 7ecbe6f
updated goldens
daniel-sanche 73e006e
Merge branch 'main' into async-optimizations
daniel-sanche File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -7,6 +7,8 @@ from typing import Awaitable, Callable, Dict, Optional, Sequence, Tuple, Union | |||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| from google.api_core import gapic_v1 | ||||||||||||||||||||||||||||||||||||||||||||
| from google.api_core import grpc_helpers_async | ||||||||||||||||||||||||||||||||||||||||||||
| from google.api_core import exceptions as core_exceptions | ||||||||||||||||||||||||||||||||||||||||||||
| from google.api_core import retry_async as retries | ||||||||||||||||||||||||||||||||||||||||||||
| {% if service.has_lro %} | ||||||||||||||||||||||||||||||||||||||||||||
| from google.api_core import operations_v1 | ||||||||||||||||||||||||||||||||||||||||||||
| {% endif %} | ||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -378,6 +380,32 @@ class {{ service.grpc_asyncio_transport_name }}({{ service.name }}Transport): | |||||||||||||||||||||||||||||||||||||||||||
| return self._stubs["test_iam_permissions"] | ||||||||||||||||||||||||||||||||||||||||||||
| {% endif %} | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| def _prep_wrapped_messages(self, client_info): | ||||||||||||||||||||||||||||||||||||||||||||
| # Precompute the wrapped methods. | ||||||||||||||||||||||||||||||||||||||||||||
| # override base class to use async wrappers | ||||||||||||||||||||||||||||||||||||||||||||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||||||||
| self._wrapped_methods = { | ||||||||||||||||||||||||||||||||||||||||||||
| {% for method in service.methods.values() %} | ||||||||||||||||||||||||||||||||||||||||||||
| self.{{ method.transport_safe_name|snake_case }}: gapic_v1.method_async.wrap_method( | ||||||||||||||||||||||||||||||||||||||||||||
| self.{{ method.transport_safe_name|snake_case }}, | ||||||||||||||||||||||||||||||||||||||||||||
| {% if method.retry %} | ||||||||||||||||||||||||||||||||||||||||||||
| default_retry=retries.AsyncRetry( | ||||||||||||||||||||||||||||||||||||||||||||
| {% if method.retry.initial_backoff %}initial={{ method.retry.initial_backoff }},{% endif %} | ||||||||||||||||||||||||||||||||||||||||||||
| {% if method.retry.max_backoff %}maximum={{ method.retry.max_backoff }},{% endif %} | ||||||||||||||||||||||||||||||||||||||||||||
| {% if method.retry.backoff_multiplier %}multiplier={{ method.retry.backoff_multiplier }},{% endif %} | ||||||||||||||||||||||||||||||||||||||||||||
| predicate=retries.if_exception_type( | ||||||||||||||||||||||||||||||||||||||||||||
| {% for ex in method.retry.retryable_exceptions|sort(attribute='__name__') %} | ||||||||||||||||||||||||||||||||||||||||||||
| core_exceptions.{{ ex.__name__ }}, | ||||||||||||||||||||||||||||||||||||||||||||
| {% endfor %} | ||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||
| deadline={{ method.timeout }}, | ||||||||||||||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: to fix indentation in generated code (see my comment on the generated file), maybe something like this?
Suggested change
And maybe do this in |
||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||
| {% endif %} | ||||||||||||||||||||||||||||||||||||||||||||
| default_timeout={{ method.timeout }}, | ||||||||||||||||||||||||||||||||||||||||||||
| client_info=client_info, | ||||||||||||||||||||||||||||||||||||||||||||
| ), | ||||||||||||||||||||||||||||||||||||||||||||
| {% endfor %} {# precomputed wrappers loop #} | ||||||||||||||||||||||||||||||||||||||||||||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
| def close(self): | ||||||||||||||||||||||||||||||||||||||||||||
| return self.grpc_channel.close() | ||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -123,6 +123,89 @@ def test_{{ method_name }}_empty_call(): | |
| {% endif %} | ||
| {% endif %} | ||
|
|
||
| def test_{{ method_name }}_use_cached_wrapped_rpc(): | ||
| # Clients should use _prep_wrapped_messages to create cached wrapped rpcs, | ||
| # instead of constructing them on each call | ||
| with mock.patch("google.api_core.gapic_v1.method.wrap_method") as wrapper_fn: | ||
| client = {{ service.client_name }}( | ||
| credentials=ga_credentials.AnonymousCredentials(), | ||
| transport="grpc", | ||
| ) | ||
|
|
||
| # Should wrap all calls on client creation | ||
| assert wrapper_fn.call_count > 0 | ||
| wrapper_fn.reset_mock() | ||
|
|
||
| # Ensure method has been cached | ||
| assert client._transport.{{method.transport_safe_name|snake_case}} in client._transport._wrapped_methods | ||
|
|
||
| # Replace cached wrapped function with mock | ||
| mock_rpc = mock.Mock() | ||
| client._transport._wrapped_methods[client._transport.{{method.transport_safe_name|snake_case}}] = mock_rpc | ||
|
|
||
| request = {} | ||
| {% if method.client_streaming %} | ||
| requests = [request] | ||
| client.{{ method.safe_name|snake_case }}(iter(requests)) | ||
| {% else %} | ||
| client.{{ method_name }}(request) | ||
| {% endif %} | ||
|
|
||
| # Establish that the underlying gRPC stub method was called. | ||
| assert mock_rpc.call_count == 1 | ||
|
|
||
| # Operation methods build a cached wrapper on first rpc call | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we make this stanza conditional on the method in fact being an Operation method? |
||
| # subsequent calls should use the cached wrapper | ||
| wrapper_fn.reset_mock() | ||
| {% if method.client_streaming %} | ||
| client.{{ method.safe_name|snake_case }}(iter(requests)) | ||
| {% else %} | ||
| client.{{ method_name }}(request) | ||
| {% endif %} | ||
|
|
||
| # Establish that a new wrapper was not created for this call | ||
| assert wrapper_fn.call_count == 0 | ||
| assert mock_rpc.call_count == 2 | ||
|
|
||
|
|
||
| async def test_{{ method_name }}_async_use_cached_wrapped_rpc(transport: str = "grpc_asyncio"): | ||
| # Clients should use _prep_wrapped_messages to create cached wrapped rpcs, | ||
| # instead of constructing them on each call | ||
| with mock.patch("google.api_core.gapic_v1.method_async.wrap_method") as wrapper_fn: | ||
| client = {{ service.async_client_name }}( | ||
| credentials=ga_credentials.AnonymousCredentials(), | ||
| transport=transport, | ||
| ) | ||
|
|
||
| # Should wrap all calls on client creation | ||
| assert wrapper_fn.call_count > 0 | ||
| wrapper_fn.reset_mock() | ||
|
|
||
| # Ensure method has been cached | ||
| assert client._client._transport.{{method.transport_safe_name|snake_case}} in client._client._transport._wrapped_methods | ||
|
|
||
| # Replace cached wrapped function with mock | ||
| mock_rpc = mock.AsyncMock() | ||
| client._client._transport._wrapped_methods[client._client._transport.{{method.transport_safe_name|snake_case}}] = mock_rpc | ||
|
|
||
| request = {} | ||
| {% if method.client_streaming %} | ||
| requests = [request] | ||
| {% endif %} | ||
| {% if method.client_streaming and method.server_streaming %} | ||
| await client.{{ method.name|snake_case }}(iter(requests)) | ||
| {% elif method.client_streaming and not method.server_streaming %} | ||
| await (await client.{{ method.name|snake_case }}(iter(requests))) | ||
| {% else %} | ||
| await client.{{ method_name }}(request) | ||
| {% endif %} | ||
|
|
||
| # Establish that the underlying gRPC stub method was called. | ||
| assert mock_rpc.call_count == 1 | ||
|
|
||
| # Establish that a new wrapper was not created for this call | ||
| assert wrapper_fn.call_count == 0 | ||
|
|
||
| {% if not full_extended_lro %} | ||
| @pytest.mark.asyncio | ||
| async def test_{{ method_name }}_async(transport: str = 'grpc_asyncio', request_type={{ method.input.ident }}): | ||
|
|
@@ -1069,6 +1152,50 @@ def test_{{ method_name }}_rest(request_type): | |
| {% endfor %} | ||
| {% endif %} | ||
|
|
||
| def test_{{ method_name }}_rest_use_cached_wrapped_rpc(): | ||
| # Clients should use _prep_wrapped_messages to create cached wrapped rpcs, | ||
| # instead of constructing them on each call | ||
| with mock.patch("google.api_core.gapic_v1.method.wrap_method") as wrapper_fn: | ||
| client = {{ service.client_name }}( | ||
| credentials=ga_credentials.AnonymousCredentials(), | ||
| transport="rest", | ||
| ) | ||
|
|
||
| # Should wrap all calls on client creation | ||
| assert wrapper_fn.call_count > 0 | ||
| wrapper_fn.reset_mock() | ||
|
|
||
| # Ensure method has been cached | ||
| assert client._transport.{{method.transport_safe_name|snake_case}} in client._transport._wrapped_methods | ||
|
|
||
| # Replace cached wrapped function with mock | ||
| mock_rpc = mock.Mock() | ||
| client._transport._wrapped_methods[client._transport.{{method.transport_safe_name|snake_case}}] = mock_rpc | ||
|
|
||
| request = {} | ||
| {% if method.client_streaming %} | ||
| requests = [request] | ||
| client.{{ method.safe_name|snake_case }}(iter(requests)) | ||
| {% else %} | ||
| client.{{ method_name }}(request) | ||
| {% endif %} | ||
|
|
||
| # Establish that the underlying gRPC stub method was called. | ||
| assert mock_rpc.call_count == 1 | ||
|
|
||
| # Operation methods build a cached wrapper on first rpc call | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we make this stanza conditional on the method in fact being an Operation method? |
||
| # subsequent calls should use the cached wrapper | ||
| wrapper_fn.reset_mock() | ||
| {% if method.client_streaming %} | ||
| client.{{ method.safe_name|snake_case }}(iter(requests)) | ||
| {% else %} | ||
| client.{{ method_name }}(request) | ||
| {% endif %} | ||
|
|
||
| # Establish that a new wrapper was not created for this call | ||
| assert wrapper_fn.call_count == 0 | ||
| assert mock_rpc.call_count == 2 | ||
|
|
||
|
|
||
| {% if method.input.required_fields %} | ||
| def test_{{ method_name }}_rest_required_fields(request_type={{ method.input.ident }}): | ||
|
|
||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have the
-bullet to signify this is a sub-point of "Create or coerce a protobuf request object"Could you add similar bullets to "Quick check" and "The request isn't ..." above (for the same reason)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, but keep in mind this was taken directly from the sync version. Should we update this there too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I went ahead and changed the sync version to match. Let me know if that works