Skip to content
This repository was archived by the owner on Apr 1, 2026. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
1d02154
added initial implementation of mutate_rows
daniel-sanche Apr 24, 2023
ab63cba
implemented mutation models
daniel-sanche Apr 24, 2023
cf9daa5
added retries to mutate_row
daniel-sanche Apr 24, 2023
1247da4
return exception group if possible
daniel-sanche Apr 24, 2023
3b3ed8c
check for idempotence
daniel-sanche Apr 24, 2023
5d20037
initial implementation for bulk_mutations
daniel-sanche Apr 24, 2023
3d322a1
include successes in bulk mutation error message
daniel-sanche Apr 24, 2023
a31232b
fixed style checks
daniel-sanche Apr 24, 2023
8da2d65
added basic system tests
daniel-sanche Apr 24, 2023
2b89d9c
added unit tests for mutate_row
daniel-sanche Apr 25, 2023
47c5985
ran blacken
daniel-sanche Apr 25, 2023
38fdcd7
improved exceptions
daniel-sanche Apr 25, 2023
504d2d8
added bulk_mutate_rows unit tests
daniel-sanche Apr 25, 2023
b16067f
ran blacken
daniel-sanche Apr 25, 2023
3ab1405
support __new___ for exceptions for python3.11+
daniel-sanche Apr 25, 2023
0a6c0c6
added exception unit tests
daniel-sanche Apr 25, 2023
ec043cf
makde exceptions tuple
daniel-sanche Apr 26, 2023
518530e
got exceptions to print consistently across versions
daniel-sanche Apr 26, 2023
9624729
added test for 311 rich traceback
daniel-sanche Apr 27, 2023
3087081
moved retryable row mutations to new file
daniel-sanche Apr 27, 2023
9df588f
use index map
daniel-sanche Apr 27, 2023
7ed8be3
added docstring
daniel-sanche Apr 27, 2023
2536cc4
added predicate check to failed mutations
daniel-sanche Apr 27, 2023
1f6875c
added _mutate_rows tests
daniel-sanche Apr 27, 2023
1ea24e6
improved client tests
daniel-sanche Apr 27, 2023
25ca2d2
refactored to loop by raising exception
daniel-sanche Apr 28, 2023
c0787db
refactored retry deadline logic into shared wrapper
daniel-sanche Apr 28, 2023
3ed5c3d
ran black
daniel-sanche Apr 28, 2023
a91fbcb
pulled in table default timeouts
daniel-sanche Apr 28, 2023
df8a058
added tests for shared deadline parsing function
daniel-sanche Apr 28, 2023
b866b57
added tests for mutation models
daniel-sanche Apr 28, 2023
54a4d43
fixed linter errors
daniel-sanche Apr 28, 2023
bd51dc4
added tests for BulkMutationsEntry
daniel-sanche Apr 28, 2023
921b05a
improved mutations documentation
daniel-sanche Apr 28, 2023
82ea61f
refactored mutate_rows logic into helper function
daniel-sanche May 2, 2023
fa42b86
implemented callbacks for mutate_rows
daniel-sanche May 2, 2023
01a16f3
made exceptions into a tuple
daniel-sanche May 5, 2023
6140acb
remove aborted from retryable errors
daniel-sanche May 22, 2023
36ba2b6
improved SetCell mutation
daniel-sanche May 22, 2023
b3c9017
fixed mutations tests
daniel-sanche May 22, 2023
cac9e2d
SetCell timestamps use millisecond precision
daniel-sanche May 22, 2023
34b051f
renamed BulkMutationsEntry to RowMutationEntry
daniel-sanche May 22, 2023
63ac35c
Merge branch 'v3' into mutate_rows
daniel-sanche May 24, 2023
a51201c
added metadata to mutate rows and bulk mutate rows
daniel-sanche May 25, 2023
a21bebf
moved _convert_retry_deadline wrapper from exceptions into _helpers
daniel-sanche May 25, 2023
4ca89d9
fixed system tests
daniel-sanche May 25, 2023
b240ee1
only handle precision adjustment when creating timestamp
daniel-sanche May 26, 2023
cb0e951
added _from_dict for mutation models
daniel-sanche May 26, 2023
a9cf385
rpc timeouts adjust when approaching operation_timeout
daniel-sanche May 26, 2023
eddc1c9
pass table instead of request dict
daniel-sanche May 26, 2023
f8b26aa
refactoring mutate rows
daniel-sanche May 27, 2023
5b80dc5
made on_terminal_state into coroutine
daniel-sanche May 27, 2023
9e5b80a
fixed style issues
daniel-sanche May 27, 2023
f7539f6
moved callback rewriting into retryable attempt
daniel-sanche May 27, 2023
e77a4fa
fixed tests
daniel-sanche May 27, 2023
4e19ed0
pop successful mutations from error dict
daniel-sanche May 30, 2023
920e4b7
removed unneeded check
daniel-sanche May 30, 2023
725f5ff
refactoring
daniel-sanche May 30, 2023
1054bc4
pass list of exceptions in callback
daniel-sanche May 30, 2023
f39a891
raise error in unexpected state
daniel-sanche May 30, 2023
1d97135
removed callback
daniel-sanche May 31, 2023
88e2bf5
refactoring mutation attempt into class
daniel-sanche May 31, 2023
a3c0166
use partial function
daniel-sanche May 31, 2023
70c35ef
renamed class
daniel-sanche May 31, 2023
e00f592
added comments
daniel-sanche Jun 1, 2023
18af78a
added tests
daniel-sanche Jun 1, 2023
23e84f5
improved helpers
daniel-sanche Jun 1, 2023
56fdf7c
refactored operation into class only
daniel-sanche Jun 5, 2023
aca31f0
restructured how remaining indices are tracked
daniel-sanche Jun 5, 2023
5a5d541
fixed tests
daniel-sanche Jun 5, 2023
afed731
added docstrings
daniel-sanche Jun 5, 2023
2396ec8
moved index deletion to end of block
daniel-sanche Jun 6, 2023
3d441a2
added comment to exception types
daniel-sanche Jun 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions google/cloud/bigtable/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from google.cloud.bigtable.mutations_batcher import MutationsBatcher
from google.cloud.bigtable.mutations import Mutation
from google.cloud.bigtable.mutations import BulkMutationsEntry
from google.cloud.bigtable.mutations import RowMutationEntry
from google.cloud.bigtable.mutations import SetCell
from google.cloud.bigtable.mutations import DeleteRangeFromColumn
from google.cloud.bigtable.mutations import DeleteAllFromFamily
Expand All @@ -47,7 +47,7 @@
"RowRange",
"MutationsBatcher",
"Mutation",
"BulkMutationsEntry",
"RowMutationEntry",
"SetCell",
"DeleteRangeFromColumn",
"DeleteAllFromFamily",
Expand Down
112 changes: 112 additions & 0 deletions google/cloud/bigtable/_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

from typing import Callable, Any
from inspect import iscoroutinefunction
import time

from google.api_core import exceptions as core_exceptions
from google.cloud.bigtable.exceptions import RetryExceptionGroup

"""
Helper functions used in various places in the library.
"""


def _make_metadata(
table_name: str | None, app_profile_id: str | None
Comment thread
daniel-sanche marked this conversation as resolved.
Outdated
) -> list[tuple[str, str]]:
"""
Create properly formatted gRPC metadata for requests.
"""
params = []
if table_name is not None:
params.append(f"table_name={table_name}")
if app_profile_id is not None:
params.append(f"app_profile_id={app_profile_id}")
params_str = ",".join(params)
return [("x-goog-request-params", params_str)]


def _attempt_timeout_generator(
per_request_timeout: float | None, operation_timeout: float
):
"""
Generator that yields the timeout value for each attempt of a retry loop.

Will return per_request_timeout until the operation_timeout is approached,
at which point it will return the remaining time in the operation_timeout.

Args:
- per_request_timeout: The timeout value to use for each request. If None,
the operation_timeout will be used for each request.
- operation_timeout: The timeout value to use for the entire operation.
Yields:
- The timeout value to use for the next request.
"""
Comment thread
daniel-sanche marked this conversation as resolved.
per_request_timeout = (
per_request_timeout if per_request_timeout is not None else operation_timeout
)
deadline = operation_timeout + time.monotonic()
while True:
yield max(0, min(per_request_timeout, deadline - time.monotonic()))


def _convert_retry_deadline(
func: Callable[..., Any],
timeout_value: float | None = None,
retry_errors: list[Exception] | None = None,
):
"""
Decorator to convert RetryErrors raised by api_core.retry into
DeadlineExceeded exceptions, indicating that the underlying retries have
exhaused the timeout value.
Optionally attaches a RetryExceptionGroup to the DeadlineExceeded.__cause__,
detailing the failed exceptions associated with each retry.

Supports both sync and async function wrapping.

Args:
- func: The function to decorate
- timeout_value: The timeout value to display in the DeadlineExceeded error message
- retry_errors: An optional list of exceptions to attach as a RetryExceptionGroup to the DeadlineExceeded.__cause__
"""
timeout_str = f" of {timeout_value:.1f}s" if timeout_value is not None else ""
error_str = f"operation_timeout{timeout_str} exceeded"

def handle_error():
new_exc = core_exceptions.DeadlineExceeded(
error_str,
)
source_exc = None
if retry_errors:
source_exc = RetryExceptionGroup(retry_errors)
new_exc.__cause__ = source_exc
raise new_exc from source_exc

# separate wrappers for async and sync functions
async def wrapper_async(*args, **kwargs):
try:
return await func(*args, **kwargs)
except core_exceptions.RetryError:
handle_error()

def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except core_exceptions.RetryError:
handle_error()

return wrapper_async if iscoroutinefunction(func) else wrapper
239 changes: 239 additions & 0 deletions google/cloud/bigtable/_mutate_rows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import annotations

from typing import Iterator, Callable, Any, Coroutine, TYPE_CHECKING

from google.api_core import exceptions as core_exceptions
from google.api_core import retry_async as retries
import google.cloud.bigtable.exceptions as bt_exceptions
from google.cloud.bigtable._helpers import _make_metadata
from google.cloud.bigtable._helpers import _convert_retry_deadline
from google.cloud.bigtable._helpers import _attempt_timeout_generator

if TYPE_CHECKING:
from google.cloud.bigtable_v2.services.bigtable.async_client import (
BigtableAsyncClient,
)
from google.cloud.bigtable.client import Table
from google.cloud.bigtable.mutations import RowMutationEntry


class _MutateRowsIncomplete(RuntimeError):
"""
Exception raised when a mutate_rows call has unfinished work.
"""

pass


async def _mutate_rows_operation(
gapic_client: "BigtableAsyncClient",
Comment thread
igorbernstein2 marked this conversation as resolved.
Outdated
table: "Table",
mutation_entries: list["RowMutationEntry"],
operation_timeout: float,
per_request_timeout: float | None,
on_terminal_state: Callable[
[int, "RowMutationEntry", list[Exception] | None], Coroutine[None, None, None]
]
| None = None,
):
"""
Helper function for managing a single mutate_rows operation, end-to-end.

Args:
- gapic_client: the client to use for the mutate_rows call
- request: A request dict containing table name, app profile id, and other details to inclide in the request
Comment thread
daniel-sanche marked this conversation as resolved.
Outdated
- mutation_entries: a list of RowMutationEntry objects to send to the server
- operation_timeout: the timeout to use for the entire operation, in seconds.
- per_request_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
- on_terminal_state: If given, this function will be called as soon as a mutation entry
reaches a terminal state (success or failure).
Comment thread
igorbernstein2 marked this conversation as resolved.
Outdated
"""
mutations_dict: dict[int, RowMutationEntry] = {
idx: mut for idx, mut in enumerate(mutation_entries)
}

error_dict: dict[int, list[Exception]] = {idx: [] for idx in mutations_dict.keys()}
Comment thread
igorbernstein2 marked this conversation as resolved.
Outdated

predicate = retries.if_exception_type(
core_exceptions.DeadlineExceeded,
core_exceptions.ServiceUnavailable,
_MutateRowsIncomplete,
)

def on_error_fn(exc):
if predicate(exc) and not isinstance(exc, _MutateRowsIncomplete):
# add this exception to list for each active mutation
for idx in error_dict.keys():
error_dict[idx].append(exc)
# remove non-idempotent mutations from mutations_dict, so they are not retried
for idx, mut in list(mutations_dict.items()):
if not mut.is_idempotent():
mutations_dict.pop(idx)
Comment thread
igorbernstein2 marked this conversation as resolved.
Outdated

retry = retries.AsyncRetry(
predicate=predicate,
on_error=on_error_fn,
timeout=operation_timeout,
initial=0.01,
multiplier=2,
maximum=60,
)
# use generator to lower per-attempt timeout as we approach operation_timeout deadline
attempt_timeout_gen = _attempt_timeout_generator(
per_request_timeout, operation_timeout
)
# wrap attempt in retry logic
retry_wrapped = retry(_mutate_rows_retryable_attempt)
# convert RetryErrors from retry wrapper into DeadlineExceeded errors
deadline_wrapped = _convert_retry_deadline(retry_wrapped, operation_timeout)
try:
# trigger mutate_rows
await deadline_wrapped(
gapic_client,
table,
attempt_timeout_gen,
mutations_dict,
error_dict,
predicate,
on_terminal_state,
)
except Exception as exc:
# exceptions raised by retryable are added to the list of exceptions for all unfinalized mutations
for error_list in error_dict.values():
error_list.append(exc)
finally:
# raise exception detailing incomplete mutations
all_errors = []
for idx, exc_list in error_dict.items():
if len(exc_list) == 0:
raise core_exceptions.ClientError(
f"Mutation {idx} failed with no associated errors"
)
elif len(exc_list) == 1:
cause_exc = exc_list[0]
else:
cause_exc = bt_exceptions.RetryExceptionGroup(exc_list)
Comment thread
igorbernstein2 marked this conversation as resolved.
Outdated
entry = mutation_entries[idx]
all_errors.append(
bt_exceptions.FailedMutationEntryError(idx, entry, cause_exc)
)
# call on_terminal_state for each unreported failed mutation
if on_terminal_state:
await on_terminal_state(idx, entry, exc_list)
if all_errors:
raise bt_exceptions.MutationsExceptionGroup(
all_errors, len(mutation_entries)
)


async def _mutate_rows_retryable_attempt(
gapic_client: "BigtableAsyncClient",
table: "Table",
timeout_generator: Iterator[float],
active_dict: dict[int, "RowMutationEntry"],
error_dict: dict[int, list[Exception]],
predicate: Callable[[Exception], bool],
on_terminal_state: Callable[
[int, "RowMutationEntry", list[Exception] | None],
Coroutine[None, None, None],
]
| None = None,
Comment thread
igorbernstein2 marked this conversation as resolved.
Outdated
):
"""
Helper function for managing a single mutate_rows attempt.

If one or more retryable mutations remain incomplete at the end of the function,
_MutateRowsIncomplete will be raised to trigger a retry

This function is intended to be wrapped in an api_core.retry.AsyncRetry object, which will handle
timeouts and retrying raised exceptions.

Args:
- gapic_client: the client to use for the mutate_rows call
- request: the request to send to the server, populated with table name and app profile id
- per_request_timeout: the timeout to use for each mutate_rows attempt
- active_dict: a dictionary tracking unfinalized mutations. At the start of the request,
all entries are outstanding. As mutations are finalized, they are removed from the dict.
- error_dict: a dictionary tracking errors associated with each entry index.
Each retry will append a new error. Successful mutations will remove their index from the dict.
- predicate: a function that takes an exception and returns True if the exception is retryable.
- on_terminal_state: If given, this function will be called as soon as a mutation entry
reaches a terminal state (success or failure).
Raises:
- _MutateRowsIncomplete: if one or more retryable mutations remain incomplete at the end of the function
- GoogleAPICallError: if the server returns an error on the grpc call
"""
# update on_terminal_state to remove finalized mutations from active_dict,
# and successful mutations from error_dict
input_callback = on_terminal_state

async def on_terminal_patched(
idx: int, entry: "RowMutationEntry", exc_list: list[Exception] | None
):
active_dict.pop(idx)
if exc_list is None:
error_dict.pop(idx)
if input_callback is not None:
await input_callback(idx, entry, exc_list)

on_terminal_state = on_terminal_patched
# keep map between sub-request indices and global mutation_dict indices
index_map: dict[int, int] = {}
request_entries: list[dict[str, Any]] = []
for request_idx, (global_idx, m) in enumerate(active_dict.items()):
index_map[request_idx] = global_idx
request_entries.append(m._to_dict())
# make gapic request
metadata = _make_metadata(table.table_name, table.app_profile_id)
async for result_list in await gapic_client.mutate_rows(
request={
"table_name": table.table_name,
"app_profile_id": table.app_profile_id,
"entries": request_entries,
},
timeout=next(timeout_generator),
metadata=metadata,
):
for result in result_list.entries:
# convert sub-request index to global index
idx = index_map[result.index]
if idx not in active_dict:
raise core_exceptions.ClientError(
f"Received result for already finalized mutation at index {idx}"
)
entry = active_dict[idx]
exc = None
if result.status.code == 0:
# mutation succeeded
await on_terminal_state(idx, entry, None)
else:
# mutation failed
exc = core_exceptions.from_grpc_status(
result.status.code,
result.status.message,
details=result.status.details,
)
error_dict[idx].append(exc)
# if mutation is non-idempotent or the error is not retryable,
# mark the mutation as terminal
if not predicate(exc) or not entry.is_idempotent():
await on_terminal_state(idx, entry, error_dict[idx])
# check if attempt succeeded, or needs to be retried
if active_dict:
# unfinished work; raise exception to trigger retry
raise _MutateRowsIncomplete()
Loading