forked from googleapis/python-bigtable
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_mutate_rows.py
More file actions
234 lines (217 loc) · 9.63 KB
/
_mutate_rows.py
File metadata and controls
234 lines (217 loc) · 9.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations
from typing import Sequence, TYPE_CHECKING
import asyncio
from dataclasses import dataclass
import functools
from google.api_core import exceptions as core_exceptions
from google.api_core import retry_async as retries
import google.cloud.bigtable_v2.types.bigtable as types_pb
import google.cloud.bigtable.data.exceptions as bt_exceptions
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _convert_retry_deadline
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
# mutate_rows requests are limited to this number of mutations
from google.cloud.bigtable.data.mutations import _MUTATE_ROWS_REQUEST_MUTATION_LIMIT
if TYPE_CHECKING:
from google.cloud.bigtable_v2.services.bigtable.async_client import (
BigtableAsyncClient,
)
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data._async.client import TableAsync
@dataclass
class _EntryWithProto:
"""
A dataclass to hold a RowMutationEntry and its corresponding proto representation.
"""
entry: RowMutationEntry
proto: types_pb.MutateRowsRequest.Entry
class _MutateRowsOperationAsync:
"""
MutateRowsOperation manages the logic of sending a set of row mutations,
and retrying on failed entries. It manages this using the _run_attempt
function, which attempts to mutate all outstanding entries, and raises
_MutateRowsIncomplete if any retryable errors are encountered.
Errors are exposed as a MutationsExceptionGroup, which contains a list of
exceptions organized by the related failed mutation entries.
"""
def __init__(
self,
gapic_client: "BigtableAsyncClient",
table: "TableAsync",
mutation_entries: list["RowMutationEntry"],
operation_timeout: float,
attempt_timeout: float | None,
retryable_exceptions: Sequence[type[Exception]] = (),
):
"""
Args:
- gapic_client: the client to use for the mutate_rows call
- table: the table associated with the request
- mutation_entries: a list of RowMutationEntry objects to send to the server
- operation_timeout: the timeout to use for the entire operation, in seconds.
- attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
"""
# check that mutations are within limits
total_mutations = sum(len(entry.mutations) for entry in mutation_entries)
if total_mutations > _MUTATE_ROWS_REQUEST_MUTATION_LIMIT:
raise ValueError(
"mutate_rows requests can contain at most "
f"{_MUTATE_ROWS_REQUEST_MUTATION_LIMIT} mutations across "
f"all entries. Found {total_mutations}."
)
# create partial function to pass to trigger rpc call
metadata = _make_metadata(table.table_name, table.app_profile_id)
self._gapic_fn = functools.partial(
gapic_client.mutate_rows,
table_name=table.table_name,
app_profile_id=table.app_profile_id,
metadata=metadata,
retry=None,
)
# create predicate for determining which errors are retryable
self.is_retryable = retries.if_exception_type(
# RPC level errors
*retryable_exceptions,
# Entry level errors
bt_exceptions._MutateRowsIncomplete,
)
# build retryable operation
retry = retries.AsyncRetry(
predicate=self.is_retryable,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
)
retry_wrapped = retry(self._run_attempt)
self._operation = _convert_retry_deadline(
retry_wrapped, operation_timeout, is_async=True
)
# initialize state
self.timeout_generator = _attempt_timeout_generator(
attempt_timeout, operation_timeout
)
self.mutations = [_EntryWithProto(m, m._to_pb()) for m in mutation_entries]
self.remaining_indices = list(range(len(self.mutations)))
self.errors: dict[int, list[Exception]] = {}
async def start(self):
"""
Start the operation, and run until completion
Raises:
- MutationsExceptionGroup: if any mutations failed
"""
try:
# trigger mutate_rows
await self._operation()
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
incomplete_indices = self.remaining_indices.copy()
for idx in incomplete_indices:
self._handle_entry_error(idx, exc)
finally:
# raise exception detailing incomplete mutations
all_errors: list[Exception] = []
for idx, exc_list in self.errors.items():
if len(exc_list) == 0:
raise core_exceptions.ClientError(
f"Mutation {idx} failed with no associated errors"
)
elif len(exc_list) == 1:
cause_exc = exc_list[0]
else:
cause_exc = bt_exceptions.RetryExceptionGroup(exc_list)
entry = self.mutations[idx].entry
all_errors.append(
bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc)
)
if all_errors:
raise bt_exceptions.MutationsExceptionGroup(
all_errors, len(self.mutations)
)
async def _run_attempt(self):
"""
Run a single attempt of the mutate_rows rpc.
Raises:
- _MutateRowsIncomplete: if there are failed mutations eligible for
retry after the attempt is complete
- GoogleAPICallError: if the gapic rpc fails
"""
request_entries = [self.mutations[idx].proto for idx in self.remaining_indices]
# track mutations in this request that have not been finalized yet
active_request_indices = {
req_idx: orig_idx for req_idx, orig_idx in enumerate(self.remaining_indices)
}
self.remaining_indices = []
if not request_entries:
# no more mutations. return early
return
# make gapic request
try:
result_generator = await self._gapic_fn(
timeout=next(self.timeout_generator),
entries=request_entries,
)
async for result_list in result_generator:
for result in result_list.entries:
# convert sub-request index to global index
orig_idx = active_request_indices[result.index]
entry_error = core_exceptions.from_grpc_status(
result.status.code,
result.status.message,
details=result.status.details,
)
if result.status.code != 0:
# mutation failed; update error list (and remaining_indices if retryable)
self._handle_entry_error(orig_idx, entry_error)
# remove processed entry from active list
del active_request_indices[result.index]
except asyncio.CancelledError:
# when retry wrapper timeout expires, the operation is cancelled
# make sure incomplete indices are tracked,
# but don't record exception (it will be raised by wrapper)
# TODO: remove asyncio.wait_for in retry wrapper. Let grpc call handle expiration
self.remaining_indices.extend(active_request_indices.values())
raise
except Exception as exc:
# add this exception to list for each mutation that wasn't
# already handled, and update remaining_indices if mutation is retryable
for idx in active_request_indices.values():
self._handle_entry_error(idx, exc)
# bubble up exception to be handled by retry wrapper
raise
# check if attempt succeeded, or needs to be retried
if self.remaining_indices:
# unfinished work; raise exception to trigger retry
raise bt_exceptions._MutateRowsIncomplete
def _handle_entry_error(self, idx: int, exc: Exception):
"""
Add an exception to the list of exceptions for a given mutation index,
and add the index to the list of remaining indices if the exception is
retryable.
Args:
- idx: the index of the mutation that failed
- exc: the exception to add to the list
"""
entry = self.mutations[idx].entry
self.errors.setdefault(idx, []).append(exc)
if (
entry.is_idempotent()
and self.is_retryable(exc)
and idx not in self.remaining_indices
):
self.remaining_indices.append(idx)