-
Notifications
You must be signed in to change notification settings - Fork 65
feat: add pooled grpc transport #748
Changes from 92 commits
8c5fd7d
393860c
11b3493
1ac0b7a
cf981d8
5ea2bc3
84fd9c3
43b17dd
32e8e45
65cb219
c8b8a5a
ff835ec
3207ef5
3b3d720
fa29ba1
4e792a5
35e8a58
dfab801
72f7d0e
be3de7a
f6c7f36
44e76c1
c4d537e
99a49a4
bd4fb5e
6b13c29
60841c5
0a3086c
08c3c42
05e10cd
895093f
1684274
75d276a
92752c0
79c82c3
f3b7fbd
aa37a31
17d731f
a0a5c57
64a05d8
741147d
005900c
d58fc74
9983e18
bfeb546
dba7a3c
3ae6722
b9f2b0d
e483370
136a8fe
af86f6b
cbe7062
909f889
5efa0ac
2b322e2
d4904d7
6917593
7b5ecbb
c0616dd
983d4c7
c19658a
c2d0da0
8b54a30
0dd981b
b0ecd3c
e47551f
96d526b
e997892
5c86f57
3bc4131
3c4e0b6
197bf95
9d8122b
2632b70
d4e052b
1aa694b
e2d4bd5
a91362f
d80a8c0
b888ee8
94c1187
4c02e6c
d65b432
4b63d87
4ccc421
8001240
7c9cea7
3bbebea
8bff9d0
4ae2146
b9dc2f7
19036d8
8a22d15
38e5662
5155800
522f7fa
74029c9
7f2be30
2b044ce
1743098
e5fa4b6
8955ec5
002bc5f
65f0d2f
dbf19c9
9f3e0c5
a0620ea
1486d5a
28d5a7a
70fbff9
383d8eb
018fe03
f0403e7
cb23d32
bc31ab8
f54dfde
46cfc49
573bbd1
4f2657d
59955be
377a8c9
8a29898
50aa5ba
42a52a3
50dc608
b116755
ec5eb07
55cdcc2
9e3b411
ab43138
cb1884d
9a89d74
1e62c71
d70c685
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| [submodule "gapic-generator-fork"] | ||
| path = gapic-generator-fork | ||
| url = git@github.com:googleapis/gapic-generator-python.git |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -15,12 +15,28 @@ | |
|
|
||
| from __future__ import annotations | ||
|
|
||
| from typing import Any, AsyncIterable, TYPE_CHECKING | ||
| from typing import cast, Any, Optional, AsyncIterable, Set, TYPE_CHECKING | ||
|
|
||
| from google.cloud.client import ClientWithProject | ||
| import asyncio | ||
| import grpc | ||
| import time | ||
| import warnings | ||
| import sys | ||
|
|
||
| from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta | ||
| from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient | ||
| from google.cloud.bigtable_v2.services.bigtable.async_client import DEFAULT_CLIENT_INFO | ||
| from google.cloud.bigtable_v2.services.bigtable.transports.pooled_grpc_asyncio import ( | ||
| PooledBigtableGrpcAsyncIOTransport, | ||
| ) | ||
| from google.cloud.client import _ClientProjectMixin | ||
| from google.api_core.exceptions import GoogleAPICallError | ||
|
|
||
|
|
||
| import google.auth.credentials | ||
| import google.auth._default | ||
| from google.api_core import client_options as client_options_lib | ||
|
|
||
|
|
||
| if TYPE_CHECKING: | ||
| from google.cloud.bigtable.mutations import Mutation, BulkMutationsEntry | ||
|
|
@@ -32,7 +48,7 @@ | |
| from google.cloud.bigtable.read_modify_write_rules import ReadModifyWriteRule | ||
|
|
||
|
|
||
| class BigtableDataClient(ClientWithProject): | ||
| class BigtableDataClient(BigtableAsyncClient, _ClientProjectMixin): | ||
| def __init__( | ||
| self, | ||
| *, | ||
|
|
@@ -47,6 +63,8 @@ def __init__( | |
| """ | ||
| Create a client instance for the Bigtable Data API | ||
|
|
||
| Client must be created within an async run loop context | ||
|
|
||
| Args: | ||
| project: the project which the client acts on behalf of. | ||
| If not passed, falls back to the default inferred | ||
|
|
@@ -62,29 +80,227 @@ def __init__( | |
| Client options used to set user options | ||
| on the client. API Endpoint should be set through client_options. | ||
| metadata: a list of metadata headers to be attached to all calls with this client | ||
| Raises: | ||
| - RuntimeError if called outside of an async run loop context | ||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||
| - ValueError if pool_size is less than 1 | ||
| """ | ||
| raise NotImplementedError | ||
| # set up transport in registry | ||
| transport_str = f"pooled_grpc_asyncio_{pool_size}" | ||
| transport = PooledBigtableGrpcAsyncIOTransport.with_fixed_size(pool_size) | ||
| BigtableClientMeta._transport_registry[transport_str] = transport | ||
|
Comment on lines
+87
to
+89
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please avoid making the pool fixed size. It can have negative effects on customers with fluctuating traffic:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have any examples of libraries that implement dynamic pooling like you're thinking? What sort of algorithm do you have in mind? From what I saw, it seems the other libraries generally use fixed pool sizes, so I was hoping to keep it simple for now, and then revisit dynamic sizing after the core client is complete
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can take a look at the java implementation: https://github.com/googleapis/gapic-generator-java/blob/main/gax-java/gax-grpc/src/main/java/com/google/api/gax/grpc/ChannelPool.java#L248 However I think it's ok to have a fix sized pool to start with, just make it extensible so that in the future we can make it dynamic? |
||
| # set up client info headers for veneer library | ||
| client_info = DEFAULT_CLIENT_INFO | ||
| client_info.client_library_version = client_info.gapic_version | ||
| # parse client options | ||
| if type(client_options) is dict: | ||
| client_options = client_options_lib.from_dict(client_options) | ||
| client_options = cast( | ||
| Optional[client_options_lib.ClientOptions], client_options | ||
| ) | ||
| mixin_args = {"project": project, "credentials": credentials} | ||
| # support google-api-core <=1.5.0, which does not have credentials | ||
| if "credentials" not in _ClientProjectMixin.__init__.__code__.co_varnames: | ||
| mixin_args.pop("credentials") | ||
| # initialize client | ||
| _ClientProjectMixin.__init__(self, **mixin_args) | ||
| # raises RuntimeError if called outside of an async run loop context | ||
| BigtableAsyncClient.__init__( | ||
| self, | ||
| transport=transport_str, | ||
| credentials=credentials, | ||
| client_options=client_options, | ||
| client_info=client_info, | ||
| ) | ||
| self.metadata = metadata or [] | ||
| # keep track of active instances to for warmup on channel refresh | ||
| self._active_instances: Set[str] = set() | ||
| # attempt to start background tasks | ||
| self._channel_init_time = time.time() | ||
| self._channel_refresh_tasks: list[asyncio.Task[None]] = [] | ||
| try: | ||
| self.start_background_channel_refresh() | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would be nice to await all of the channels to be warm before letting endusers use the client
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The problem is we don't have any instances to warm when the client is created, only when we start creating tables |
||
| except RuntimeError: | ||
| warnings.warn( | ||
| f"{self.__class__.__name__} should be started in an " | ||
| "asyncio event loop. Channel refresh will not be started", | ||
| RuntimeWarning, | ||
| ) | ||
|
|
||
| def start_background_channel_refresh(self) -> None: | ||
| """ | ||
| Starts a background task to ping and warm each channel in the pool | ||
| Raises: | ||
| - RuntimeError if not called in an asyncio event loop | ||
| """ | ||
| if not self._channel_refresh_tasks: | ||
| # raise RuntimeError if there is no event loop | ||
| asyncio.get_running_loop() | ||
| for channel_idx in range(len(self.transport.channel_pool)): | ||
| refresh_task = asyncio.create_task(self._manage_channel(channel_idx)) | ||
| if sys.version_info >= (3, 8): | ||
|
mutianf marked this conversation as resolved.
|
||
| refresh_task.set_name( | ||
| f"{self.__class__.__name__} channel refresh {channel_idx}" | ||
| ) | ||
| self._channel_refresh_tasks.append(refresh_task) | ||
|
|
||
| @property | ||
| def transport(self) -> PooledBigtableGrpcAsyncIOTransport: | ||
| """Returns the transport used by the client instance. | ||
| Returns: | ||
| BigtableTransport: The transport used by the client instance. | ||
| """ | ||
| return cast(PooledBigtableGrpcAsyncIOTransport, self._client.transport) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be exposed to end users?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is part of the Client spec used by all GCP libraries. The reason I override it here was to change the type annotation |
||
|
|
||
| async def close(self, timeout: float = 2.0): | ||
| """ | ||
| Cancel all background tasks | ||
| """ | ||
| for task in self._channel_refresh_tasks: | ||
| task.cancel() | ||
| group = asyncio.gather(*self._channel_refresh_tasks, return_exceptions=True) | ||
| await asyncio.wait_for(group, timeout=timeout) | ||
| await self.transport.close() | ||
| self._channel_refresh_tasks = [] | ||
|
|
||
| async def __aexit__(self, exc_type, exc_val, exc_tb): | ||
| """ | ||
| Cleanly close context manager on exit | ||
| """ | ||
| await self.close() | ||
|
|
||
| async def _ping_and_warm_instances( | ||
| self, channel: grpc.aio.Channel | ||
| ) -> list[GoogleAPICallError | None]: | ||
| """ | ||
| Prepares the backend for requests on a channel | ||
|
|
||
| Pings each Bigtable instance registered in `_active_instances` on the client | ||
|
|
||
| Args: | ||
| channel: grpc channel to ping | ||
| Returns: | ||
| - squence of results or exceptions from the ping requests | ||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||
| """ | ||
| ping_rpc = channel.unary_unary( | ||
| "/google.bigtable.v2.Bigtable/PingAndWarmChannel" | ||
| ) | ||
| tasks = [ping_rpc({"name": n}) for n in self._active_instances] | ||
| return await asyncio.gather(*tasks, return_exceptions=True) | ||
|
|
||
| async def _manage_channel( | ||
| self, | ||
| channel_idx: int, | ||
| refresh_interval: float = 60 * 45, | ||
| grace_period: float = 60 * 15, | ||
| ) -> None: | ||
| """ | ||
| Background coroutine that periodically refreshes and warms a grpc channel | ||
|
|
||
| The backend will automatically close channels after 60 minutes, so | ||
| `refresh_interval` + `grace_period` should be < 60 minutes | ||
|
|
||
| Runs continuously until the client is closed | ||
|
|
||
| Args: | ||
| channel_idx: index of the channel in the transport's channel pool | ||
| refresh_interval: interval before initiating refresh process in seconds | ||
| grace_period: time to allow previous channel to serve existing | ||
| requests before closing, in seconds | ||
| """ | ||
| first_refresh = self._channel_init_time + refresh_interval | ||
|
mutianf marked this conversation as resolved.
Outdated
|
||
| next_sleep = max(first_refresh - time.time(), 0) | ||
| if next_sleep > 0: | ||
| # warm the current channel immediately | ||
| channel = self.transport.channel_pool[channel_idx] | ||
| await self._ping_and_warm_instances(channel) | ||
|
jackwotherspoon marked this conversation as resolved.
|
||
| # continuously refresh the channel every `refresh_interval` seconds | ||
| while True: | ||
| await asyncio.sleep(next_sleep) | ||
| # prepare new channel for use | ||
| new_channel = self.transport.create_channel( | ||
| self.transport._host, | ||
| credentials=self.transport._credentials, | ||
| scopes=self.transport._scopes, | ||
| ssl_credentials=self.transport._ssl_channel_credentials, | ||
| quota_project_id=self.transport._quota_project_id, | ||
| options=[ | ||
| ("grpc.max_send_message_length", -1), | ||
| ("grpc.max_receive_message_length", -1), | ||
| ], | ||
| ) | ||
| await self._ping_and_warm_instances(new_channel) | ||
| # cycle channel out of use, with long grace window before closure | ||
| start_timestamp = time.time() | ||
| await self.transport.replace_channel(channel_idx, grace_period, new_channel) | ||
| # subtract the time spent waiting for the channel to be replaced | ||
| next_sleep = refresh_interval - (time.time() - start_timestamp) | ||
|
|
||
| async def register_instance(self, instance_id: str): | ||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||
| """ | ||
| Registers an instance with the client, and warms the channel pool | ||
| for the instance | ||
| The client will periodically refresh grpc channel pool used to make | ||
| requests, and new channels will be warmed for each registered instance | ||
| Channels will not be refreshed unless at least one instance is registered | ||
| """ | ||
| instance_name = self.instance_path(self.project, instance_id) | ||
| if instance_name not in self._active_instances: | ||
| self._active_instances.add(instance_name) | ||
| if self._channel_refresh_tasks: | ||
| # refresh tasks already running | ||
| # call ping and warm on all existing channels | ||
| for channel in self.transport.channel_pool: | ||
| await self._ping_and_warm_instances(channel) | ||
| else: | ||
| # refresh tasks aren't active. start them as background tasks | ||
| self.start_background_channel_refresh() | ||
|
|
||
| async def remove_instance_registration(self, instance_id: str) -> bool: | ||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||
| """ | ||
| Removes an instance from the client's registered instances, to prevent | ||
| warming new channels for the instance | ||
|
|
||
| If instance_id is not registered, returns False | ||
|
|
||
| Args: | ||
| instance_id: id of the instance to remove | ||
| Returns: | ||
| - True if instance was removed | ||
| """ | ||
| instance_name = self.instance_path(self.project, instance_id) | ||
| try: | ||
| self._active_instances.remove(instance_name) | ||
| return True | ||
| except KeyError: | ||
| return False | ||
|
|
||
| def get_table( | ||
| self, instance_id: str, table_id: str, app_profile_id: str | None = None | ||
| self, | ||
| instance_id: str, | ||
| table_id: str, | ||
| app_profile_id: str | None = None, | ||
| metadata: list[tuple[str, str]] | None = None, | ||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||
| ) -> Table: | ||
| """ | ||
| Return a Table instance to make API requests for a specific table. | ||
| Returns a table instance for making data API requests | ||
|
|
||
| Args: | ||
| instance_id: The ID of the instance that owns the table. | ||
| instance_id: The Bigtable instance ID to associate with this client | ||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||
| instance_id is combined with the client's project to fully | ||
| specify the instance | ||
| table_id: The ID of the table. | ||
| app_profile_id: (Optional) The app profile to associate with requests. | ||
| https://cloud.google.com/bigtable/docs/app-profiles | ||
| metadata: a list of metadata headers to be attached to all calls with this client | ||
| """ | ||
| raise NotImplementedError | ||
| return Table(self, instance_id, table_id, app_profile_id, metadata) | ||
|
|
||
|
|
||
| class Table: | ||
| """ | ||
| Main Data API surface | ||
|
|
||
| Table object maintains instance_id, table_id, and app_profile_id context, and passes them with | ||
| Table object maintains table_id, and app_profile_id context, and passes them with | ||
| each call | ||
| """ | ||
|
|
||
|
|
@@ -94,8 +310,40 @@ def __init__( | |
| instance_id: str, | ||
| table_id: str, | ||
| app_profile_id: str | None = None, | ||
| metadata: list[tuple[str, str]] | None = None, | ||
| ): | ||
| raise NotImplementedError | ||
| """ | ||
| Initialize a Table instance | ||
|
|
||
| Must be created within an async run loop context | ||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||
|
|
||
| Args: | ||
| instance_id: The Bigtable instance ID to associate with this client | ||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||
| instance_id is combined with the client's project to fully | ||
| specify the instance | ||
| table_id: The ID of the table. | ||
| app_profile_id: (Optional) The app profile to associate with requests. | ||
| https://cloud.google.com/bigtable/docs/app-profiles | ||
| metadata: a list of metadata headers to be attached to all calls with this client | ||
| Raises: | ||
| - RuntimeError if called outside of an async run loop context | ||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||
| """ | ||
| self.client = client | ||
| self.instance = instance_id | ||
| self.table_id = table_id | ||
| self.app_profile_id = app_profile_id | ||
| self.metadata = metadata or [] | ||
| # raises RuntimeError if called outside of an async run loop context | ||
|
daniel-sanche marked this conversation as resolved.
Outdated
|
||
| try: | ||
| self._register_instance_task = asyncio.create_task( | ||
| self.client.register_instance(instance_id) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should there a deregister call as well?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure I understand. This task is needed because we can't call await from |
||
| ) | ||
| except RuntimeError: | ||
| warnings.warn( | ||
| "Table should be created in an asyncio event loop." | ||
| " Instance will not be registered with client for refresh", | ||
| RuntimeWarning, | ||
| ) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why not bubble up this error?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've gone back and forth on it The question comes down to whether we should support creating these classes outside async context:
Let me know if you have thoughts |
||
|
|
||
| async def read_rows_stream( | ||
| self, | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,6 +53,7 @@ | |
| from .transports.base import BigtableTransport, DEFAULT_CLIENT_INFO | ||
| from .transports.grpc import BigtableGrpcTransport | ||
| from .transports.grpc_asyncio import BigtableGrpcAsyncIOTransport | ||
| from .transports.pooled_grpc_asyncio import PooledBigtableGrpcAsyncIOTransport | ||
| from .transports.rest import BigtableRestTransport | ||
|
|
||
|
|
||
|
|
@@ -67,6 +68,7 @@ class BigtableClientMeta(type): | |
| _transport_registry = OrderedDict() # type: Dict[str, Type[BigtableTransport]] | ||
| _transport_registry["grpc"] = BigtableGrpcTransport | ||
| _transport_registry["grpc_asyncio"] = BigtableGrpcAsyncIOTransport | ||
| _transport_registry["pooled_grpc_asyncio"] = PooledBigtableGrpcAsyncIOTransport | ||
| _transport_registry["rest"] = BigtableRestTransport | ||
|
|
||
| def get_transport_class( | ||
|
|
@@ -380,6 +382,9 @@ def __init__( | |
| transport (Union[str, BigtableTransport]): The | ||
| transport to use. If set to None, a transport is chosen | ||
| automatically. | ||
| NOTE: "rest" transport functionality is currently in a | ||
| beta state (preview). We welcome your feedback via an | ||
| issue in this library's source repository. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any way to opt out of rest for the data client?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I believe it's configured in the yaml somewhere. We can address at that in a future PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @igorbernstein2 Can you say more about that? We have gotten some customer feedback in other product areas that REST transport is important to them in some use cases, so we've typically preferred to include the option when we can and let users choose which they require with sensible defaults. Are you suggesting we disallow it entirely? (I see the previous cl/458022604 where this was disallowed, but I'm wondering if those are still relevant.)
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @meredithslota the rest transport doesn't support asyncio, so I think it makes sense to hard-code the transport to use our pooled transport for at least the async layer. When it comes time to add the synchronous layer, we could support rest if customers find it useful, but it just wouldn't be able to make use of the same channel management optimizations that we're adding to grpc side. Maybe @igorbernstein2 has some more context though |
||
| client_options (Optional[Union[google.api_core.client_options.ClientOptions, dict]]): Custom options for the | ||
| client. It won't take effect if a ``transport`` instance is provided. | ||
| (1) The ``api_endpoint`` property can be used to override the | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.