-
Notifications
You must be signed in to change notification settings - Fork 65
feat: add test proxy #836
feat: add test proxy #836
Changes from 1 commit
aa1e40e
b059139
3ed2168
5199839
4e14bf3
cbb95c9
f43aac1
91fc1e6
06e5276
62b8e48
237e051
868ff2e
21a5077
02f0c09
456caba
f3627c1
bcc02d7
5e7c156
604d3d8
14f359d
858c57d
36a3153
07b39b1
5d90478
b69da5a
df3ea47
8dcd444
94a8684
72b8d1b
8496211
71ba0ea
94e98db
320d157
5064870
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,98 @@ | ||
| # CBT Python Test Proxy | ||
|
|
||
| The CBT test proxy is intended for running confromance tests for Cloug Bigtable Python Client. | ||
|
|
||
| ## Start test proxy | ||
|
|
||
| #### running the proxy with nox | ||
|
|
||
| You can launch the test proxy directly using `nox`, which will handle dependency management | ||
|
|
||
| ``` | ||
| cd python-bigtable/test_proxy | ||
| nox -s run_proxy | ||
| ``` | ||
|
|
||
| The port can be configured using the `PROXY_SERVER_PORT` environment variable | ||
|
|
||
| ``` | ||
| cd python-bigtable/test_proxy | ||
| PROXY_SERVER_PORT=8080 | ||
| nox -s run_proxy | ||
| ``` | ||
|
|
||
| #### running the proxy script manually | ||
|
|
||
| You can also run the `test_proxy.py` file directly | ||
|
|
||
| ``` | ||
| cd python-bigtable/test_proxy | ||
| python test_proxy.py | ||
| ``` | ||
|
|
||
| The port can be set by passing in an extra positional argument | ||
|
|
||
| ``` | ||
| cd python-bigtable/test_proxy | ||
| python test_proxy.py --port 8080 | ||
| ``` | ||
|
|
||
| ## Run the test cases | ||
|
|
||
| Prerequisites: | ||
| - If you have not already done so, [install golang](https://go.dev/doc/install). | ||
| - Before running tests, [launch an instance of the test proxy](#start-test-proxy) | ||
| in a separate shell session, and make note of the port | ||
|
|
||
| #### running the test cases with nox | ||
|
|
||
| You can trigger the tests directly using `nox`, which will clone the test repo locally if it doesn't exist | ||
|
|
||
| ``` | ||
| cd python-bigtable/test_proxy | ||
| nox -s conformance_tests | ||
| ``` | ||
|
|
||
| The port can be configured using the `PROXY_SERVER_PORT` environment variable | ||
|
|
||
| ``` | ||
| cd python-bigtable/test_proxy | ||
| PROXY_SERVER_PORT=8080 | ||
| nox -s conformance_tests | ||
| ``` | ||
|
|
||
| #### running the test cases manually | ||
|
|
||
| Clone and navigate to the go test library: | ||
|
|
||
| ``` | ||
| git clone https://github.com/googleapis/cloud-bigtable-clients-test.git | ||
| cd cloud-bigtable-clients-test/tests | ||
| ``` | ||
|
|
||
|
|
||
| Launch the tests | ||
|
|
||
| ``` | ||
| go test -v -proxy_addr=:50055 | ||
| ``` | ||
|
|
||
| ## Test a released client | ||
|
|
||
| You can run the test proxy against a released version of the library with `nox` | ||
| by setting the `PROXY_CLIENT_VERSION` environment variable: | ||
|
|
||
| ``` | ||
| PROXY_CLIENT_VERSION=3.0.0 | ||
| nox -s run_proxy | ||
| ``` | ||
|
|
||
| if unset, it will default to installing the library from source | ||
|
|
||
| ## Test the legacy client | ||
|
|
||
| You can run the test proxy against the previous `v2` client by running it with the `--legacy-client` flag: | ||
|
|
||
| ``` | ||
| python test_proxy.py --legacy-client | ||
| ``` | ||
Large diffs are not rendered by default.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
| @@ -0,0 +1,211 @@ | ||||
| # Copyright 2023 Google LLC | ||||
| # | ||||
| # Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| # you may not use this file except in compliance with the License. | ||||
| # You may obtain a copy of the License at | ||||
| # | ||||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, software | ||||
| # distributed under the License is distributed on an "AS IS" BASIS, | ||||
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| # See the License for the specific language governing permissions and | ||||
| # limitations under the License. | ||||
| """ | ||||
| This module contains the client handler process for proxy_server.py. | ||||
| """ | ||||
| import os | ||||
|
|
||||
| from google.cloud.environment_vars import BIGTABLE_EMULATOR | ||||
| from google.cloud.bigtable.data import BigtableDataClientAsync | ||||
|
|
||||
|
|
||||
| def error_safe(func): | ||||
| """ | ||||
| Catch and pass errors back to the grpc_server_process | ||||
| Also check if client is closed before processing requests | ||||
| """ | ||||
| async def wrapper(self, *args, **kwargs): | ||||
| try: | ||||
| if self.closed: | ||||
| raise RuntimeError("client is closed") | ||||
| return await func(self, *args, **kwargs) | ||||
| except (Exception, NotImplementedError) as e: | ||||
| # exceptions should be raised in grpc_server_process | ||||
| return encode_exception(e) | ||||
|
|
||||
| return wrapper | ||||
|
|
||||
| def encode_exception(exc): | ||||
| """ | ||||
| Encode an exception or chain of exceptions to pass back to grpc_handler | ||||
| """ | ||||
| from google.api_core.exceptions import GoogleAPICallError | ||||
| error_msg = f"{type(exc).__name__}: {exc}" | ||||
| result = {"error": error_msg} | ||||
| if exc.__cause__: | ||||
| result["cause"] = encode_exception(exc.__cause__) | ||||
| if hasattr(exc, "exceptions"): | ||||
| result["subexceptions"] = [encode_exception(e) for e in exc.exceptions] | ||||
| if hasattr(exc, "index"): | ||||
| result["index"] = exc.index | ||||
| if isinstance(exc, GoogleAPICallError): | ||||
| if exc.grpc_status_code is not None: | ||||
| result["code"] = exc.grpc_status_code.value[0] | ||||
| elif exc.code is not None: | ||||
| result["code"] = int(exc.code) | ||||
| else: | ||||
| result["code"] = -1 | ||||
| elif result.get("cause", {}).get("code", None): | ||||
| # look for code code in cause | ||||
| result["code"] = result["cause"]["code"] | ||||
| elif result.get("subexceptions", None): | ||||
| # look for code in subexceptions | ||||
| for subexc in result["subexceptions"]: | ||||
| if subexc.get("code", None): | ||||
| result["code"] = subexc["code"] | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will record the last exception's code. Is it intended? If so, does it correspond to the first exception occurrence (i.e., the head/root of the exception chain)?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, we make use of exception groups so that we can expose a set of retried errors together with the final error. The exceptions are wrapped in a More details here: #825 |
||||
| return result | ||||
|
|
||||
|
|
||||
| class TestProxyClientHandler: | ||||
| """ | ||||
| Implements the same methods as the grpc server, but handles the client | ||||
| library side of the request. | ||||
|
|
||||
| Requests received in TestProxyGrpcServer are converted to a dictionary, | ||||
| and supplied to the TestProxyClientHandler methods as kwargs. | ||||
| The client response is then returned back to the TestProxyGrpcServer | ||||
| """ | ||||
|
|
||||
| def __init__( | ||||
| self, | ||||
| data_target=None, | ||||
| project_id=None, | ||||
| instance_id=None, | ||||
| app_profile_id=None, | ||||
| per_operation_timeout=None, | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what's the expected unit/format? The implementation seems to imply that it's integer?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for the timeouts? They are float or None, but python accepts ints as floats There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it mean seconds or milliseconds?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's in seconds. It's better documented here:
Does that match the data sent from the tests? |
||||
| **kwargs, | ||||
| ): | ||||
| self.closed = False | ||||
| # use emulator | ||||
| os.environ[BIGTABLE_EMULATOR] = data_target | ||||
| self.client = BigtableDataClientAsync(project=project_id) | ||||
| self.instance_id = instance_id | ||||
| self.app_profile_id = app_profile_id | ||||
| self.per_operation_timeout = per_operation_timeout | ||||
|
|
||||
| def close(self): | ||||
| self.closed = True | ||||
|
|
||||
| @error_safe | ||||
| async def ReadRows(self, request, **kwargs): | ||||
| table_id = request["table_name"].split("/")[-1] | ||||
| app_profile_id = self.app_profile_id or request.get("app_profile_id", None) | ||||
| table = self.client.get_table(self.instance_id, table_id, app_profile_id) | ||||
| kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20 | ||||
| result_list = await table.read_rows(request, **kwargs) | ||||
| # pack results back into protobuf-parsable format | ||||
| serialized_response = [row.to_dict() for row in result_list] | ||||
| return serialized_response | ||||
|
|
||||
| @error_safe | ||||
| async def ReadRow(self, row_key, **kwargs): | ||||
| table_id = kwargs.pop("table_name").split("/")[-1] | ||||
| app_profile_id = self.app_profile_id or kwargs.get("app_profile_id", None) | ||||
| table = self.client.get_table(self.instance_id, table_id, app_profile_id) | ||||
| kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20 | ||||
| result_row = await table.read_row(row_key, **kwargs) | ||||
| # pack results back into protobuf-parsable format | ||||
| if result_row: | ||||
| return result_row.to_dict() | ||||
| else: | ||||
| return "None" | ||||
|
|
||||
| @error_safe | ||||
| async def MutateRow(self, request, **kwargs): | ||||
| from google.cloud.bigtable.data.mutations import Mutation | ||||
| table_id = request["table_name"].split("/")[-1] | ||||
| app_profile_id = self.app_profile_id or request.get("app_profile_id", None) | ||||
| table = self.client.get_table(self.instance_id, table_id, app_profile_id) | ||||
| kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20 | ||||
| row_key = request["row_key"] | ||||
| mutations = [Mutation._from_dict(d) for d in request["mutations"]] | ||||
| await table.mutate_row(row_key, mutations, **kwargs) | ||||
| return "OK" | ||||
|
|
||||
| @error_safe | ||||
| async def BulkMutateRows(self, request, **kwargs): | ||||
| from google.cloud.bigtable.data.mutations import RowMutationEntry | ||||
| table_id = request["table_name"].split("/")[-1] | ||||
| app_profile_id = self.app_profile_id or request.get("app_profile_id", None) | ||||
| table = self.client.get_table(self.instance_id, table_id, app_profile_id) | ||||
| kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20 | ||||
| entry_list = [RowMutationEntry._from_dict(entry) for entry in request["entries"]] | ||||
| await table.bulk_mutate_rows(entry_list, **kwargs) | ||||
| return "OK" | ||||
|
|
||||
| @error_safe | ||||
| async def CheckAndMutateRow(self, request, **kwargs): | ||||
| from google.cloud.bigtable.data.mutations import Mutation, SetCell | ||||
| table_id = request["table_name"].split("/")[-1] | ||||
| app_profile_id = self.app_profile_id or request.get("app_profile_id", None) | ||||
| table = self.client.get_table(self.instance_id, table_id, app_profile_id) | ||||
| kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20 | ||||
| row_key = request["row_key"] | ||||
| # add default values for incomplete dicts, so they can still be parsed to objects | ||||
| true_mutations = [] | ||||
| for mut_dict in request.get("true_mutations", []): | ||||
| try: | ||||
| true_mutations.append(Mutation._from_dict(mut_dict)) | ||||
| except ValueError: | ||||
| # invalid mutation type. Conformance test may be sending generic empty request | ||||
| true_mutations.append(SetCell("", "", "", -1)) | ||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it have real effect on the client? I wonder if no-op here can also work?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's been a while since I looked at this, but I think this is the one spot where I couldn't get a test to pass without modifications to the test code I think the library will raise an exception if I try to create an empty SetCell mutation, but I believe the tests expect it. I was going to suggest we change the example mutation in the conformance tests to one with valid data There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see. You are right.. But somehow they other clients seems to be fine with the empty set. You can keep the line as is. I will make the test change later and let you know when it's done. |
||||
| false_mutations = [] | ||||
| for mut_dict in request.get("false_mutations", []): | ||||
| try: | ||||
| false_mutations.append(Mutation._from_dict(mut_dict)) | ||||
| except ValueError: | ||||
| # invalid mutation type. Conformance test may be sending generic empty request | ||||
| false_mutations.append(SetCell("", "", "", -1)) | ||||
| predicate_filter = request.get("predicate_filter", None) | ||||
| result = await table.check_and_mutate_row( | ||||
| row_key, | ||||
| predicate_filter, | ||||
| true_case_mutations=true_mutations, | ||||
| false_case_mutations=false_mutations, | ||||
| **kwargs, | ||||
| ) | ||||
| return result | ||||
|
|
||||
| @error_safe | ||||
| async def ReadModifyWriteRow(self, request, **kwargs): | ||||
| from google.cloud.bigtable.data.read_modify_write_rules import IncrementRule | ||||
| from google.cloud.bigtable.data.read_modify_write_rules import AppendValueRule | ||||
| table_id = request["table_name"].split("/")[-1] | ||||
| app_profile_id = self.app_profile_id or request.get("app_profile_id", None) | ||||
| table = self.client.get_table(self.instance_id, table_id, app_profile_id) | ||||
| kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20 | ||||
| row_key = request["row_key"] | ||||
| rules = [] | ||||
| for rule_dict in request.get("rules", []): | ||||
| qualifier = rule_dict["column_qualifier"] | ||||
| if "append_value" in rule_dict: | ||||
| new_rule = AppendValueRule(rule_dict["family_name"], qualifier, rule_dict["append_value"]) | ||||
| else: | ||||
| new_rule = IncrementRule(rule_dict["family_name"], qualifier, rule_dict["increment_amount"]) | ||||
| rules.append(new_rule) | ||||
| result = await table.read_modify_write_row(row_key, rules, **kwargs) | ||||
| # pack results back into protobuf-parsable format | ||||
| if result: | ||||
| return result.to_dict() | ||||
| else: | ||||
| return "None" | ||||
|
|
||||
| @error_safe | ||||
| async def SampleRowKeys(self, request, **kwargs): | ||||
| table_id = request["table_name"].split("/")[-1] | ||||
| app_profile_id = self.app_profile_id or request.get("app_profile_id", None) | ||||
| table = self.client.get_table(self.instance_id, table_id, app_profile_id) | ||||
| kwargs["operation_timeout"] = kwargs.get("operation_timeout", self.per_operation_timeout) or 20 | ||||
| result = await table.sample_row_keys(**kwargs) | ||||
| return result | ||||
Uh oh!
There was an error while loading. Please reload this page.