From d8bb5207886b4e53f4734e835342dfe7456de7d7 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 11 Mar 2026 20:47:51 +0000 Subject: [PATCH 01/10] fix: Respect remote function config changes even if logic unchanged --- .../compile/ibis_compiler/ibis_compiler.py | 1 + .../ibis_compiler/scalar_op_registry.py | 8 +- bigframes/core/compile/sqlglot/compiler.py | 1 + .../core/compile/sqlglot/sql/__init__.py | 2 - bigframes/core/compile/sqlglot/sql/base.py | 23 - bigframes/core/rewrite/__init__.py | 2 + bigframes/core/rewrite/udfs.py | 88 ++++ bigframes/core/sql/__init__.py | 29 ++ bigframes/dataframe.py | 5 +- bigframes/functions/_function_client.py | 370 ++++++++-------- bigframes/functions/_function_session.py | 90 +--- bigframes/functions/_utils.py | 56 +-- bigframes/functions/function.py | 69 +-- bigframes/functions/function_template.py | 45 +- bigframes/functions/udf_def.py | 412 +++++++++++++++--- bigframes/operations/remote_function_ops.py | 6 +- bigframes/series.py | 2 - .../large/functions/test_remote_function.py | 49 ++- .../sqlglot/expressions/test_generic_ops.py | 54 +-- tests/unit/functions/test_remote_function.py | 54 --- .../functions/test_remote_function_utils.py | 62 --- 21 files changed, 792 insertions(+), 636 deletions(-) create mode 100644 bigframes/core/rewrite/udfs.py diff --git a/bigframes/core/compile/ibis_compiler/ibis_compiler.py b/bigframes/core/compile/ibis_compiler/ibis_compiler.py index 8d40a9eb740..3802a57e02d 100644 --- a/bigframes/core/compile/ibis_compiler/ibis_compiler.py +++ b/bigframes/core/compile/ibis_compiler/ibis_compiler.py @@ -88,6 +88,7 @@ def _replace_unsupported_ops(node: nodes.BigFrameNode): node = nodes.bottom_up(node, rewrites.rewrite_slice) node = nodes.bottom_up(node, rewrites.rewrite_timedelta_expressions) node = nodes.bottom_up(node, rewrites.rewrite_range_rolling) + node = nodes.bottom_up(node, rewrites.lower_udfs) return node diff --git a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index dd275874332..2b7b336badc 100644 --- a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -1037,7 +1037,7 @@ def timedelta_floor_op_impl(x: ibis_types.NumericValue): @scalar_op_compiler.register_unary_op(ops.RemoteFunctionOp, pass_op=True) def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp): udf_sig = op.function_def.signature - ibis_py_sig = (udf_sig.py_input_types, udf_sig.py_output_type) + ibis_py_sig = ((arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) @ibis_udf.scalar.builtin( name=str(op.function_def.routine_ref), signature=ibis_py_sig @@ -1056,7 +1056,7 @@ def binary_remote_function_op_impl( x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp ): udf_sig = op.function_def.signature - ibis_py_sig = (udf_sig.py_input_types, udf_sig.py_output_type) + ibis_py_sig = ((arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) @ibis_udf.scalar.builtin( name=str(op.function_def.routine_ref), signature=ibis_py_sig @@ -1073,8 +1073,8 @@ def nary_remote_function_op_impl( *operands: ibis_types.Value, op: ops.NaryRemoteFunctionOp ): udf_sig = op.function_def.signature - ibis_py_sig = (udf_sig.py_input_types, udf_sig.py_output_type) - arg_names = tuple(arg.name for arg in udf_sig.input_types) + ibis_py_sig = ((arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) + arg_names = tuple(arg.name for arg in udf_sig.inputs) @ibis_udf.scalar.builtin( name=str(op.function_def.routine_ref), diff --git a/bigframes/core/compile/sqlglot/compiler.py b/bigframes/core/compile/sqlglot/compiler.py index a86a192a9e1..ce9ed6ce377 100644 --- a/bigframes/core/compile/sqlglot/compiler.py +++ b/bigframes/core/compile/sqlglot/compiler.py @@ -369,4 +369,5 @@ def compile_aggregate( def _replace_unsupported_ops(node: nodes.BigFrameNode): node = nodes.bottom_up(node, rewrite.rewrite_slice) node = nodes.bottom_up(node, rewrite.rewrite_range_rolling) + node = nodes.bottom_up(node, rewrite.lower_udfs) return node diff --git a/bigframes/core/compile/sqlglot/sql/__init__.py b/bigframes/core/compile/sqlglot/sql/__init__.py index 6d2dbd65a69..e68285b34af 100644 --- a/bigframes/core/compile/sqlglot/sql/__init__.py +++ b/bigframes/core/compile/sqlglot/sql/__init__.py @@ -15,7 +15,6 @@ from bigframes.core.compile.sqlglot.sql.base import ( cast, - escape_chars, identifier, is_null_literal, literal, @@ -27,7 +26,6 @@ __all__ = [ # From base.py "cast", - "escape_chars", "identifier", "is_null_literal", "literal", diff --git a/bigframes/core/compile/sqlglot/sql/base.py b/bigframes/core/compile/sqlglot/sql/base.py index 6e888fdf5e8..d287b2cac9d 100644 --- a/bigframes/core/compile/sqlglot/sql/base.py +++ b/bigframes/core/compile/sqlglot/sql/base.py @@ -136,29 +136,6 @@ def table(table: bigquery.TableReference) -> sge.Table: ) -def escape_chars(value: str): - """Escapes all special characters""" - # TODO: Reuse literal's escaping logic instead of re-implementing it here. - # https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#string_and_bytes_literals - trans_table = str.maketrans( - { - "\a": r"\a", - "\b": r"\b", - "\f": r"\f", - "\n": r"\n", - "\r": r"\r", - "\t": r"\t", - "\v": r"\v", - "\\": r"\\", - "?": r"\?", - '"': r"\"", - "'": r"\'", - "`": r"\`", - } - ) - return value.translate(trans_table) - - def is_null_literal(expr: sge.Expression) -> bool: """Checks if the given expression is a NULL literal.""" if isinstance(expr, sge.Null): diff --git a/bigframes/core/rewrite/__init__.py b/bigframes/core/rewrite/__init__.py index 5279418f5fb..6b00e9b2f12 100644 --- a/bigframes/core/rewrite/__init__.py +++ b/bigframes/core/rewrite/__init__.py @@ -27,6 +27,7 @@ from bigframes.core.rewrite.select_pullup import defer_selection from bigframes.core.rewrite.slices import pull_out_limit, pull_up_limits, rewrite_slice from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions +from bigframes.core.rewrite.udfs import lower_udfs from bigframes.core.rewrite.windows import ( pull_out_window_order, rewrite_range_rolling, @@ -53,4 +54,5 @@ "pull_out_window_order", "defer_selection", "simplify_complex_windows", + "lower_udfs", ] diff --git a/bigframes/core/rewrite/udfs.py b/bigframes/core/rewrite/udfs.py new file mode 100644 index 00000000000..3a2f3c9539c --- /dev/null +++ b/bigframes/core/rewrite/udfs.py @@ -0,0 +1,88 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import dataclasses + +from bigframes.core import bigframe_node, expression +from bigframes.core.rewrite import op_lowering +import bigframes.functions.udf_def as udf_def +import bigframes.operations as ops + + +@dataclasses.dataclass +class LowerRemoteFunctionRule(op_lowering.OpLoweringRule): + @property + def op(self) -> type[ops.ScalarOp]: + return ops.RemoteFunctionOp + + def lower(self, expr: expression.OpExpression) -> expression.Expression: + assert isinstance(expr.op, ops.RemoteFunctionOp) + func_def = expr.op.function_def + if isinstance(func_def.signature.output, udf_def.DirectScalarType): + return expr.op.as_expr(*expr.children) + assert isinstance(func_def.signature.output, udf_def.VirtualListTypeV1) + devirtualized_expr = ops.RemoteFunctionOp( + func_def.with_devirtualize(), + apply_on_null=expr.op.apply_on_null, + ).as_expr(*expr.children) + return func_def.signature.output.out_expr(devirtualized_expr) + + +@dataclasses.dataclass +class LowerBinaryRemoteFunctionRule(op_lowering.OpLoweringRule): + @property + def op(self) -> type[ops.ScalarOp]: + return ops.BinaryRemoteFunctionOp + + def lower(self, expr: expression.OpExpression) -> expression.Expression: + assert isinstance(expr.op, ops.BinaryRemoteFunctionOp) + func_def = expr.op.function_def + + if isinstance(func_def.signature.output, udf_def.DirectScalarType): + return expr.op.as_expr(*expr.children) + assert isinstance(func_def.signature.output, udf_def.VirtualListTypeV1) + devirtualized_expr = ops.BinaryRemoteFunctionOp( + func_def.with_devirtualize(), + ).as_expr(*expr.children) + return func_def.signature.output.out_expr(devirtualized_expr) + + +@dataclasses.dataclass +class LowerNaryRemoteFunctionRule(op_lowering.OpLoweringRule): + @property + def op(self) -> type[ops.ScalarOp]: + return ops.NaryRemoteFunctionOp + + def lower(self, expr: expression.OpExpression) -> expression.Expression: + assert isinstance(expr.op, ops.NaryRemoteFunctionOp) + func_def = expr.op.function_def + if isinstance(func_def.signature.output, udf_def.DirectScalarType): + return expr.op.as_expr(*expr.children) + assert isinstance(func_def.signature.output, udf_def.VirtualListTypeV1) + devirtualized_expr = ops.NaryRemoteFunctionOp( + func_def.with_devirtualize(), + ).as_expr(*expr.children) + return func_def.signature.output.out_expr(devirtualized_expr) + + +UDF_LOWERING_RULES = ( + LowerRemoteFunctionRule(), + LowerBinaryRemoteFunctionRule(), + LowerNaryRemoteFunctionRule(), +) + + +def lower_udfs(root: bigframe_node.BigFrameNode) -> bigframe_node.BigFrameNode: + return op_lowering.lower_ops(root, rules=UDF_LOWERING_RULES) diff --git a/bigframes/core/sql/__init__.py b/bigframes/core/sql/__init__.py index 8c9a093802c..6441fd60c9d 100644 --- a/bigframes/core/sql/__init__.py +++ b/bigframes/core/sql/__init__.py @@ -48,6 +48,35 @@ to_wkt = dumps +def identifier(name: str) -> str: + if len(name) > 63: + raise ValueError("Identifier must be less than 64 characters") + return f"`{escape_chars(name)}`" + + +def escape_chars(value: str): + """Escapes all special characters""" + # TODO: Reuse literal's escaping logic instead of re-implementing it here. + # https://cloud.google.com/bigquery/docs/reference/standard-sql/lexical#string_and_bytes_literals + trans_table = str.maketrans( + { + "\a": r"\a", + "\b": r"\b", + "\f": r"\f", + "\n": r"\n", + "\r": r"\r", + "\t": r"\t", + "\v": r"\v", + "\\": r"\\", + "?": r"\?", + '"': r"\"", + "'": r"\'", + "`": r"\`", + } + ) + return value.translate(trans_table) + + def multi_literal(*values: Any): literal_strings = [sql.to_sql(sql.literal(i)) for i in values] return "(" + ", ".join(literal_strings) + ")" diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 25cedda8f4a..08c2c85e646 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -4748,7 +4748,9 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): # compatible with the data types of the input params. # 3. The order of the columns in the dataframe must correspond # to the order of the input params in the function. - udf_input_dtypes = func.udf_def.signature.bf_input_types + udf_input_dtypes = tuple( + arg.bf_type for arg in func.udf_def.signature.inputs + ) if not args and len(udf_input_dtypes) != len(self.columns): raise ValueError( f"Parameter count mismatch: BigFrames BigQuery function" @@ -4793,7 +4795,6 @@ def apply(self, func, *, axis=0, args: typing.Tuple = (), **kwargs): ) result_series.name = None - result_series = func._post_process_series(result_series) return result_series # At this point column-wise or element-wise bigquery function operation will diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index be9ff0956ef..5e303eb6b5b 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -24,7 +24,7 @@ import tempfile import textwrap import types -from typing import Any, cast, Optional, Sequence, Tuple, TYPE_CHECKING +from typing import Any, cast, Optional, Sequence, TYPE_CHECKING import warnings import requests @@ -32,6 +32,7 @@ import bigframes.exceptions as bfe import bigframes.formatting_helpers as bf_formatting import bigframes.functions.function_template as bff_template +import bigframes.functions.udf_def as udf_def if TYPE_CHECKING: from bigframes.session import Session @@ -40,7 +41,12 @@ import google.api_core.retry from google.cloud import bigquery, functions_v2 -from . import _utils +from bigframes.functions import _utils +from bigframes.functions._utils import ( + _BIGFRAMES_FUNCTION_PREFIX, + _BQ_FUNCTION_NAME_SEPERATOR, + _GCF_FUNCTION_NAME_SEPERATOR, +) logger = logging.getLogger(__name__) @@ -162,13 +168,8 @@ def _format_function_options(self, function_options: dict) -> str: def create_bq_remote_function( self, - input_args: Sequence[str], - input_types: Sequence[str], - output_type: str, - endpoint: str, - bq_function_name: str, - max_batching_rows: int, - metadata: str, + name: str, + udf_def: udf_def.RemoteFunctionConfig, ): """Create a BigQuery remote function given the artifacts of a user defined function and the http endpoint of a corresponding cloud function.""" @@ -176,29 +177,32 @@ def create_bq_remote_function( # Create BQ function # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2 - bq_function_args = [] - bq_function_return_type = output_type - - # We are expecting the input type annotations to be 1:1 with the input args - for name, type_ in zip(input_args, input_types): - bq_function_args.append(f"{name} {type_}") + bq_function_return_type = udf_def.signature.output.sql_type remote_function_options = { - "endpoint": endpoint, - "max_batching_rows": max_batching_rows, + "endpoint": udf_def.endpoint, + "max_batching_rows": udf_def.max_batching_rows, } - if metadata: + if udf_def.bq_metadata: # We are using the description field to store this structured # bigframes specific metadata for the lack of a better option - remote_function_options["description"] = metadata + remote_function_options["description"] = udf_def.bq_metadata remote_function_options_str = self._format_function_options( remote_function_options ) + import bigframes.core.sql + import bigframes.core.utils + + # removes anything that isn't letter, number or underscore + sql_func_legal_name = bigframes.core.utils.label_to_identifier( + name, strict=True + ) + bq_function_name_escaped = bigframes.core.sql.identifier(sql_func_legal_name) create_function_ddl = f""" - CREATE OR REPLACE FUNCTION `{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name}({','.join(bq_function_args)}) + CREATE OR REPLACE FUNCTION `{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name_escaped}({udf_def.signature.to_sql_input_signature()}) RETURNS {bq_function_return_type} REMOTE WITH CONNECTION `{self._gcp_project_id}.{self._bq_location}.{self._bq_connection_id}` OPTIONS ({remote_function_options_str})""" @@ -268,14 +272,15 @@ def provision_bq_managed_function( managed_function_options ) - session_id = None if name else self._session.session_id bq_function_name = name if not bq_function_name: # Compute a unique hash representing the user code. function_hash = _utils.get_hash(func, packages) - bq_function_name = _utils.get_bigframes_function_name( + bq_function_name = _utils.get_managed_function_name( function_hash, - session_id, + # session-scope in absensce of name from user + # name indicates permanent allocation + None if name else self._session.session_id, ) persistent_func_id = ( @@ -337,7 +342,7 @@ def get_remote_function_fully_qualilfied_name(self, name): "Get the fully qualilfied name for a BQ remote function." return f"{self._gcp_project_id}.{self._bq_dataset}.{name}" - def get_cloud_function_endpoint(self, name): + def get_cloud_function_endpoint(self, name) -> str | None: """Get the http endpoint of a cloud function if it exists.""" fully_qualified_name = self.get_cloud_function_fully_qualified_name(name) try: @@ -351,29 +356,24 @@ def get_cloud_function_endpoint(self, name): def generate_cloud_function_code( self, - def_, + code_def: udf_def.CodeDef, directory, *, - input_types: Tuple[str], - output_type: str, - package_requirements=None, - is_row_processor=False, + udf_signature: udf_def.UdfSignature, ): """Generate the cloud function code for a given user defined function.""" # requirements.txt - if package_requirements: + if code_def.package_requirements: requirements_txt = os.path.join(directory, "requirements.txt") with open(requirements_txt, "w") as f: - f.write("\n".join(package_requirements)) + f.write("\n".join(code_def.package_requirements)) # main.py entry_point = bff_template.generate_cloud_function_main_code( - def_, + code_def, directory, - input_types=input_types, - output_type=output_type, - is_row_processor=is_row_processor, + udf_signature=udf_signature, ) return entry_point @@ -393,35 +393,19 @@ def _get_cloud_function_endpoint_with_retry(self, name): def create_cloud_function( self, - def_, - *, - random_name, - input_types: Tuple[str], - output_type: str, - package_requirements=None, - timeout_seconds=600, - max_instance_count=None, - is_row_processor=False, - vpc_connector=None, - vpc_connector_egress_settings="private-ranges-only", - memory_mib=None, - cpus=None, - ingress_settings="internal-only", - workers=None, - threads=None, - concurrency=None, - ): + name: str, + func_def: udf_def.CloudRunFunctionConfig, + ) -> str: """Create a cloud function from the given user defined function.""" + config = func_def + # Build and deploy folder structure containing cloud function with tempfile.TemporaryDirectory() as directory: entry_point = self.generate_cloud_function_code( - def_, + config.code, directory, - package_requirements=package_requirements, - input_types=input_types, - output_type=output_type, - is_row_processor=is_row_processor, + udf_signature=config.signature, ) archive_path = shutil.make_archive(directory, "zip", directory) @@ -461,9 +445,9 @@ def create_cloud_function( create_function_request.parent = ( self.get_cloud_function_fully_qualified_parent() ) - create_function_request.function_id = random_name + create_function_request.function_id = name function = functions_v2.Function() - function.name = self.get_cloud_function_fully_qualified_name(random_name) + function.name = self.get_cloud_function_fully_qualified_name(name) function.build_config = functions_v2.BuildConfig() function.build_config.runtime = python_version function.build_config.entry_point = entry_point @@ -490,33 +474,34 @@ def create_cloud_function( ) function.service_config = functions_v2.ServiceConfig() - if memory_mib is not None: - function.service_config.available_memory = f"{memory_mib}Mi" - if cpus is not None: - function.service_config.available_cpu = str(cpus) - if timeout_seconds is not None: - if timeout_seconds > 1200: + if config.memory_mib is not None: + function.service_config.available_memory = f"{config.memory_mib}Mi" + if config.cpus is not None: + function.service_config.available_cpu = str(config.cpus) + if config.timeout_seconds is not None: + if config.timeout_seconds > 1200: raise bf_formatting.create_exception_with_feedback_link( ValueError, "BigQuery remote function can wait only up to 20 minutes" ", see for more details " "https://cloud.google.com/bigquery/quotas#remote_function_limits.", ) - function.service_config.timeout_seconds = timeout_seconds - if max_instance_count is not None: - function.service_config.max_instance_count = max_instance_count - if vpc_connector is not None: - function.service_config.vpc_connector = vpc_connector - if vpc_connector_egress_settings is None: + function.service_config.timeout_seconds = config.timeout_seconds + if config.max_instance_count is not None: + function.service_config.max_instance_count = config.max_instance_count + if config.vpc_connector is not None: + function.service_config.vpc_connector = config.vpc_connector + vpc_connector_egress_settings = config.vpc_connector_egress_settings + if config.vpc_connector_egress_settings is None: msg = bfe.format_message( "The 'vpc_connector_egress_settings' was not specified. Defaulting to 'private-ranges-only'.", ) warnings.warn(msg, category=UserWarning) vpc_connector_egress_settings = "private-ranges-only" - if vpc_connector_egress_settings not in _VPC_EGRESS_SETTINGS_MAP: + if config.vpc_connector_egress_settings not in _VPC_EGRESS_SETTINGS_MAP: raise bf_formatting.create_exception_with_feedback_link( ValueError, - f"'{vpc_connector_egress_settings}' is not one of the supported vpc egress settings values: {list(_VPC_EGRESS_SETTINGS_MAP)}", + f"'{config.vpc_connector_egress_settings}' is not one of the supported vpc egress settings values: {list(_VPC_EGRESS_SETTINGS_MAP)}", ) function.service_config.vpc_connector_egress_settings = cast( functions_v2.ServiceConfig.VpcConnectorEgressSettings, @@ -525,28 +510,30 @@ def create_cloud_function( function.service_config.service_account_email = ( self._cloud_function_service_account ) - if concurrency: - function.service_config.max_instance_request_concurrency = concurrency + if config.concurrency: + function.service_config.max_instance_request_concurrency = ( + config.concurrency + ) # Functions framework use environment variables to pass config to gunicorn # See https://github.com/GoogleCloudPlatform/functions-framework-python/issues/241 # Code: https://github.com/GoogleCloudPlatform/functions-framework-python/blob/v3.10.1/src/functions_framework/_http/gunicorn.py#L37-L43 env_vars = {} - if workers: - env_vars["WORKERS"] = str(workers) - if threads: - env_vars["THREADS"] = str(threads) + if config.workers: + env_vars["WORKERS"] = str(config.workers) + if config.threads: + env_vars["THREADS"] = str(config.threads) if env_vars: function.service_config.environment_variables = env_vars - if ingress_settings not in _INGRESS_SETTINGS_MAP: + if config.ingress_settings not in _INGRESS_SETTINGS_MAP: raise bf_formatting.create_exception_with_feedback_link( ValueError, - f"'{ingress_settings}' not one of the supported ingress settings values: {list(_INGRESS_SETTINGS_MAP)}", + f"'{config.ingress_settings}' not one of the supported ingress settings values: {list(_INGRESS_SETTINGS_MAP)}", ) function.service_config.ingress_settings = cast( functions_v2.ServiceConfig.IngressSettings, - _INGRESS_SETTINGS_MAP[ingress_settings], + _INGRESS_SETTINGS_MAP[config.ingress_settings], ) function.kms_key_name = self._cloud_function_kms_key_name create_function_request.function = function @@ -577,68 +564,38 @@ def create_cloud_function( # Fetch the endpoint with retries if it wasn't returned by the operation if not endpoint: try: - endpoint = self._get_cloud_function_endpoint_with_retry(random_name) + endpoint = self._get_cloud_function_endpoint_with_retry(name) except Exception as e: raise bf_formatting.create_exception_with_feedback_link( ValueError, f"Couldn't fetch the http endpoint: {e}" ) - logger.info( - f"Successfully created cloud function {random_name} with uri ({endpoint})" - ) + logger.info(f"Successfully created cloud function {name} with uri ({endpoint})") return endpoint def provision_bq_remote_function( self, def_, - input_types, - output_type, - reuse, - name, - package_requirements, - max_batching_rows, - cloud_function_timeout, - cloud_function_max_instance_count, - is_row_processor, - cloud_function_vpc_connector, - cloud_function_vpc_connector_egress_settings, - cloud_function_memory_mib, - cloud_function_cpus, - cloud_function_ingress_settings, - bq_metadata, + func_signature: udf_def.UdfSignature, + reuse: bool, + name: str | None, + package_requirements: tuple[str, ...], + max_batching_rows: int, + cloud_function_timeout: int | None, + cloud_function_max_instance_count: int | None, + cloud_function_vpc_connector: str | None, + cloud_function_vpc_connector_egress_settings: str | None, + cloud_function_memory_mib: int | None, + cloud_function_cpus: float | None, + cloud_function_ingress_settings: str | None, ): """Provision a BigQuery remote function.""" # Augment user package requirements with any internal package # requirements package_requirements = _utils.get_updated_package_requirements( - package_requirements, is_row_processor + package_requirements, func_signature.is_row_processor ) - # Compute a unique hash representing the user code - function_hash = _utils.get_hash(def_, package_requirements) - - # If reuse of any existing function with the same name (indicated by the - # same hash of its source code) is not intended, then attach a unique - # suffix to the intended function name to make it unique. - uniq_suffix = None - if not reuse: - # use 4 digits as a unique suffix which should suffice for - # uniqueness per session - uniq_suffix = "".join( - random.choices(string.ascii_lowercase + string.digits, k=4) - ) - - # Derive the name of the cloud function underlying the intended BQ - # remote function. Use the session id to identify the GCF for unnamed - # functions. The named remote functions are treated as a persistant - # artifacts, so let's keep them independent of session id, which also - # makes their naming more stable for the same udf code - session_id = None if name else self._session.session_id - cloud_function_name = _utils.get_cloud_function_name( - function_hash, session_id, uniq_suffix - ) - cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) - if cloud_function_memory_mib is None: cloud_function_memory_mib = _DEFAULT_FUNCTION_MEMORY_MIB @@ -654,71 +611,85 @@ def provision_bq_remote_function( # max concurrency==1 for vcpus < 1 hard limit from cloud run concurrency = (workers * threads) if (expected_milli_cpus >= 1000) else 1 + cloud_func_spec = udf_def.CloudRunFunctionConfig( + code=udf_def.CodeDef.from_func(def_, package_requirements), + signature=func_signature, + timeout_seconds=cloud_function_timeout, + max_instance_count=cloud_function_max_instance_count, + vpc_connector=cloud_function_vpc_connector, + vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings + or "private-ranges-only", + memory_mib=cloud_function_memory_mib, + cpus=cloud_function_cpus, + ingress_settings=cloud_function_ingress_settings or "internal_only", + workers=workers, + threads=threads, + concurrency=concurrency, + ) + + # If reuse of any existing function with the same name (indicated by the + # same hash of its source code and config) is not intended, then attach a unique + # suffix to the intended function name to make it unique. + random_suffix = "".join( + random.choices(string.ascii_lowercase + string.digits, k=4) + ) + # Derive the name of the cloud function underlying the intended BQ + # remote function. Use the session id to identify the GCF for unnamed + # functions. The named remote functions are treated as a persistant + # artifacts, so let's keep them independent of session id, which also + # makes their naming more stable for the same udf code + cloud_function_name = get_cloud_function_name( + cloud_func_spec, + session_id=self._session.session_id if (name is None) else None, + uniq_suffix=random_suffix if reuse is False else None, + ) + + cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) # Create the cloud function if it does not exist if not cf_endpoint: cf_endpoint = self.create_cloud_function( - def_, - random_name=cloud_function_name, - input_types=input_types, - output_type=output_type, - package_requirements=package_requirements, - timeout_seconds=cloud_function_timeout, - max_instance_count=cloud_function_max_instance_count, - is_row_processor=is_row_processor, - vpc_connector=cloud_function_vpc_connector, - vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, - memory_mib=cloud_function_memory_mib, - cpus=cloud_function_cpus, - ingress_settings=cloud_function_ingress_settings, - workers=workers, - threads=threads, - concurrency=concurrency, + cloud_function_name, cloud_func_spec ) else: logger.info(f"Cloud function {cloud_function_name} already exists.") - # Derive the name of the remote function - remote_function_name = name - if not remote_function_name: - remote_function_name = _utils.get_bigframes_function_name( - function_hash, self._session.session_id, uniq_suffix - ) - rf_endpoint, rf_conn = self.get_remote_function_specs(remote_function_name) - - # Create the BQ remote function in following circumstances: - # 1. It does not exist - # 2. It exists but the existing remote function has different - # configuration than intended - created_new = False - if not rf_endpoint or ( - rf_endpoint != cf_endpoint or rf_conn != self._bq_connection_id - ): - input_args = inspect.getargs(def_.__code__).args - if len(input_args) != len(input_types): - raise bf_formatting.create_exception_with_feedback_link( - ValueError, - "Exactly one type should be provided for every input arg.", - ) - self.create_bq_remote_function( - input_args, - input_types, - output_type, - cf_endpoint, - remote_function_name, - max_batching_rows, - bq_metadata, - ) + intended_rf_spec = udf_def.RemoteFunctionConfig( + endpoint=cf_endpoint, + connection_id=self._bq_connection_id, + max_batching_rows=max_batching_rows, + signature=func_signature, + ) + # Override value-derived name if reuse is False, or if name is explicitly provided. + # If reuse is False, then attach a unique suffix to the intended + # function name to make it unique. + rf_name = get_bigframes_function_name( + intended_rf_spec, + self._session.session_id if (name is None) else None, + random_suffix if reuse is False else None, + ) - created_new = True + if reuse: + existing_rf_spec = self.get_remote_function_specs(rf_name) + # Create the BQ remote function in following circumstances: + # 1. It does not exist + # 2. It exists but the existing remote function has different + # configuration than intended + created_new = False + if not existing_rf_spec or (existing_rf_spec != intended_rf_spec): + self.create_bq_remote_function(rf_name, intended_rf_spec) + created_new = True + else: + logger.info(f"Remote function {rf_name} already exists.") + + return rf_name, cloud_function_name, created_new else: - logger.info(f"Remote function {remote_function_name} already exists.") - - return remote_function_name, cloud_function_name, created_new + self.create_bq_remote_function(rf_name, intended_rf_spec) + return rf_name, cloud_function_name, True - def get_remote_function_specs(self, remote_function_name): + def get_remote_function_specs( + self, remote_function_name: str + ) -> udf_def.RemoteFunctionConfig | None: """Check whether a remote function already exists for the udf.""" - http_endpoint = None - bq_connection = None routines = self._bq_client.list_routines( f"{self._gcp_project_id}.{self._bq_dataset}" ) @@ -726,18 +697,35 @@ def get_remote_function_specs(self, remote_function_name): for routine in routines: routine = cast(bigquery.Routine, routine) if routine.reference.routine_id == remote_function_name: - rf_options = routine.remote_function_options - if rf_options: - http_endpoint = rf_options.endpoint - bq_connection = rf_options.connection - if bq_connection: - bq_connection = os.path.basename(bq_connection) - break + return udf_def.RemoteFunctionConfig.from_bq_routine(routine) except google.api_core.exceptions.NotFound: - # The dataset might not exist, in which case the http_endpoint doesn't, either. + # The dataset might not exist, in which case the remote function doesn't, either. # Note: list_routines doesn't make an API request until we iterate on the response object. pass - return (http_endpoint, bq_connection) + return None + + +def get_cloud_function_name( + function_def: udf_def.CloudRunFunctionConfig, session_id=None, uniq_suffix=None +): + "Get a name for the cloud function for the given user defined function." + parts = [_BIGFRAMES_FUNCTION_PREFIX] + if session_id: + parts.append(session_id) + parts.append(function_def.stable_hash().hex()) + if uniq_suffix: + parts.append(uniq_suffix) + return _GCF_FUNCTION_NAME_SEPERATOR.join(parts) + + +def get_bigframes_function_name( + function: udf_def.RemoteFunctionConfig, session_id, uniq_suffix=None +): + "Get a name for the bigframes function for the given user defined function." + parts = [_BIGFRAMES_FUNCTION_PREFIX, session_id, function.stable_hash().hex()] + if uniq_suffix: + parts.append(uniq_suffix) + return _BQ_FUNCTION_NAME_SEPERATOR.join(parts) def _infer_milli_cpus_from_memory(memory_mib: int) -> int: diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index 7541936ede3..b23a29a0854 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -24,7 +24,6 @@ Any, cast, Dict, - get_origin, Literal, Mapping, Optional, @@ -51,7 +50,6 @@ if TYPE_CHECKING: from bigframes.session import Session -import pandas from bigframes.functions import _function_client, _utils @@ -241,7 +239,7 @@ def remote_function( cloud_function_service_account: str, cloud_function_kms_key_name: Optional[str] = None, cloud_function_docker_repository: Optional[str] = None, - max_batching_rows: Optional[int] = 1000, + max_batching_rows: Optional[int] = None, cloud_function_timeout: Optional[int] = 600, cloud_function_max_instances: Optional[int] = None, cloud_function_vpc_connector: Optional[str] = None, @@ -580,13 +578,6 @@ def wrapper(func): warnings.warn(msg, category=bfe.FunctionConflictTypeHintWarning) py_sig = py_sig.replace(return_annotation=output_type) - # The function will actually be receiving a pandas Series, but allow - # both BigQuery DataFrames and pandas object types for compatibility. - is_row_processor = False - if new_sig := _convert_row_processor_sig(py_sig): - py_sig = new_sig - is_row_processor = True - remote_function_client = _function_client.FunctionClient( dataset_ref.project, bq_location, @@ -605,24 +596,6 @@ def wrapper(func): session=session, # type: ignore ) - # resolve the output type that can be supported in the bigframes, - # ibis, BQ remote functions and cloud functions integration. - bqrf_metadata = None - post_process_routine = None - if get_origin(py_sig.return_annotation) is list: - # TODO(b/284515241): remove this special handling to support - # array output types once BQ remote functions support ARRAY. - # Until then, use json serialized strings at the cloud function - # and BQ level, and parse that to the intended output type at - # the bigframes level. - bqrf_metadata = _utils.get_bigframes_metadata( - python_output_type=py_sig.return_annotation - ) - post_process_routine = _utils.build_unnest_post_routine( - py_sig.return_annotation - ) - py_sig = py_sig.replace(return_annotation=str) - udf_sig = udf_def.UdfSignature.from_py_signature(py_sig) ( @@ -631,21 +604,18 @@ def wrapper(func): created_new, ) = remote_function_client.provision_bq_remote_function( func, - input_types=udf_sig.sql_input_types, - output_type=udf_sig.sql_output_type, - reuse=reuse, + func_signature=udf_sig, + reuse=reuse or False, name=name, - package_requirements=packages, - max_batching_rows=max_batching_rows, + package_requirements=tuple(packages) if packages else tuple(), + max_batching_rows=max_batching_rows or 1000, cloud_function_timeout=cloud_function_timeout, cloud_function_max_instance_count=cloud_function_max_instances, - is_row_processor=is_row_processor, cloud_function_vpc_connector=cloud_function_vpc_connector, cloud_function_vpc_connector_egress_settings=cloud_function_vpc_connector_egress_settings, cloud_function_memory_mib=cloud_function_memory_mib, cloud_function_cpus=cloud_function_cpus, cloud_function_ingress_settings=cloud_function_ingress_settings, - bq_metadata=bqrf_metadata, ) bigframes_cloud_function = ( @@ -676,12 +646,13 @@ def wrapper(func): signature=udf_sig, ) decorator = functools.wraps(func) - if is_row_processor: + if udf_sig.is_row_processor: + msg = bfe.format_message("input_types=Series is in preview.") + warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) return decorator( bq_functions.BigqueryCallableRowRoutine( udf_definition, session, - post_routine=post_process_routine, cloud_function_ref=bigframes_cloud_function, local_func=func, is_managed=False, @@ -692,7 +663,6 @@ def wrapper(func): bq_functions.BigqueryCallableRoutine( udf_definition, session, - post_routine=post_process_routine, cloud_function_ref=bigframes_cloud_function, local_func=func, is_managed=False, @@ -892,10 +862,6 @@ def wrapper(func): # The function will actually be receiving a pandas Series, but allow # both BigQuery DataFrames and pandas object types for compatibility. - is_row_processor = False - if new_sig := _convert_row_processor_sig(py_sig): - py_sig = new_sig - is_row_processor = True udf_sig = udf_def.UdfSignature.from_py_signature(py_sig) @@ -911,14 +877,14 @@ def wrapper(func): bq_function_name = managed_function_client.provision_bq_managed_function( func=func, - input_types=udf_sig.sql_input_types, - output_type=udf_sig.sql_output_type, + input_types=tuple(arg.sql_type for arg in udf_sig.inputs), + output_type=udf_sig.output.sql_type, name=name, packages=packages, max_batching_rows=max_batching_rows, container_cpu=container_cpu, container_memory=container_memory, - is_row_processor=is_row_processor, + is_row_processor=udf_sig.is_row_processor, bq_connection_id=bq_connection_id, ) full_rf_name = ( @@ -936,7 +902,9 @@ def wrapper(func): self._update_temp_artifacts(full_rf_name, "") decorator = functools.wraps(func) - if is_row_processor: + if udf_sig.is_row_processor: + msg = bfe.format_message("input_types=Series is in preview.") + warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) return decorator( bq_functions.BigqueryCallableRowRoutine( udf_definition, session, local_func=func, is_managed=True @@ -979,33 +947,3 @@ def deploy_udf( # TODO(tswast): If we update udf to defer deployment, update this method # to deploy immediately. return self.udf(**kwargs)(func) - - -def _convert_row_processor_sig( - signature: inspect.Signature, -) -> Optional[inspect.Signature]: - import bigframes.series as bf_series - - if len(signature.parameters) >= 1: - first_param = next(iter(signature.parameters.values())) - param_type = first_param.annotation - # Type hints for Series inputs should use pandas.Series because the - # underlying serialization process converts the input to a string - # representation of a pandas Series (not bigframes Series). Using - # bigframes Series will lead to TypeError when creating the function - # remotely. See more from b/445182819. - if param_type == bf_series.Series: - raise bf_formatting.create_exception_with_feedback_link( - TypeError, - "Argument type hint must be Pandas Series, not BigFrames Series.", - ) - if param_type == pandas.Series: - msg = bfe.format_message("input_types=Series is in preview.") - warnings.warn(msg, stacklevel=1, category=bfe.PreviewWarning) - return signature.replace( - parameters=[ - p.replace(annotation=str) if i == 0 else p - for i, p in enumerate(signature.parameters.values()) - ] - ) - return None diff --git a/bigframes/functions/_utils.py b/bigframes/functions/_utils.py index b6dedeac504..3af989f27e7 100644 --- a/bigframes/functions/_utils.py +++ b/bigframes/functions/_utils.py @@ -183,6 +183,24 @@ def clean_up_by_session_id( pass +def routine_ref_to_string_for_query(routine_ref: bigquery.RoutineReference) -> str: + return f"`{routine_ref.project}.{routine_ref.dataset_id}`.{routine_ref.routine_id}" + + +def get_managed_function_name( + function_hash: str, + session_id: str | None = None, +): + """Get a name for the bigframes managed function for the given user defined function.""" + # TODO: Move over to logic used by remote functions + parts = [_BIGFRAMES_FUNCTION_PREFIX] + if session_id: + parts.append(session_id) + parts.append(function_hash) + return _BQ_FUNCTION_NAME_SEPERATOR.join(parts) + + +# Deprecated: Use CodeDef.stable_hash() instead. def get_hash(def_, package_requirements=None): "Get hash (32 digits alphanumeric) of a function." # There is a known cell-id sensitivity of the cloudpickle serialization in @@ -208,46 +226,28 @@ def get_hash(def_, package_requirements=None): return hashlib.md5(def_repr).hexdigest() -def routine_ref_to_string_for_query(routine_ref: bigquery.RoutineReference) -> str: - return f"`{routine_ref.project}.{routine_ref.dataset_id}`.{routine_ref.routine_id}" - - -def get_cloud_function_name(function_hash, session_id=None, uniq_suffix=None): - "Get a name for the cloud function for the given user defined function." - parts = [_BIGFRAMES_FUNCTION_PREFIX] - if session_id: - parts.append(session_id) - parts.append(function_hash) - if uniq_suffix: - parts.append(uniq_suffix) - return _GCF_FUNCTION_NAME_SEPERATOR.join(parts) - - -def get_bigframes_function_name(function_hash, session_id, uniq_suffix=None): - "Get a name for the bigframes function for the given user defined function." - parts = [_BIGFRAMES_FUNCTION_PREFIX, session_id, function_hash] - if uniq_suffix: - parts.append(uniq_suffix) - return _BQ_FUNCTION_NAME_SEPERATOR.join(parts) - - -def get_python_output_type_from_bigframes_metadata( +def get_python_output_type_str_from_bigframes_metadata( metadata_text: str, -) -> Optional[type]: +) -> Optional[str]: try: metadata_dict = json.loads(metadata_text) except (TypeError, json.decoder.JSONDecodeError): return None - try: - output_type = metadata_dict["value"]["python_array_output_type"] + return metadata_dict["value"]["python_array_output_type"] except KeyError: return None + +def get_python_output_type_from_bigframes_metadata( + metadata_text: str, +) -> Optional[type]: + output_type_str = get_python_output_type_str_from_bigframes_metadata(metadata_text) + for ( python_output_array_type ) in function_typing.RF_SUPPORTED_ARRAY_OUTPUT_PYTHON_TYPES: - if python_output_array_type.__name__ == output_type: + if python_output_array_type.__name__ == output_type_str: return list[python_output_array_type] # type: ignore return None diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index 4e06cb16633..d2b508b9c65 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -15,7 +15,7 @@ from __future__ import annotations import logging -from typing import Callable, cast, get_origin, Optional, TYPE_CHECKING +from typing import Callable, Optional, TYPE_CHECKING if TYPE_CHECKING: from bigframes.session import Session @@ -26,7 +26,7 @@ import bigframes.formatting_helpers as bf_formatting from bigframes.functions import _function_session as bff_session -from bigframes.functions import _utils, function_typing, udf_def +from bigframes.functions import function_typing, udf_def logger = logging.getLogger(__name__) @@ -82,16 +82,9 @@ def _try_import_routine( routine: bigquery.Routine, session: bigframes.Session ) -> BigqueryCallableRoutine: udf_def = _routine_as_udf_def(routine) - override_type = _get_output_type_override(routine) is_remote = ( hasattr(routine, "remote_function_options") and routine.remote_function_options ) - if override_type is not None: - return BigqueryCallableRoutine( - udf_def, - session, - post_routine=_utils.build_unnest_post_routine(override_type), - ) return BigqueryCallableRoutine(udf_def, session, is_managed=not is_remote) @@ -99,16 +92,9 @@ def _try_import_row_routine( routine: bigquery.Routine, session: bigframes.Session ) -> BigqueryCallableRowRoutine: udf_def = _routine_as_udf_def(routine) - override_type = _get_output_type_override(routine) is_remote = ( hasattr(routine, "remote_function_options") and routine.remote_function_options ) - if override_type is not None: - return BigqueryCallableRowRoutine( - udf_def, - session, - post_routine=_utils.build_unnest_post_routine(override_type), - ) return BigqueryCallableRowRoutine(udf_def, session, is_managed=not is_remote) @@ -126,30 +112,6 @@ def _routine_as_udf_def(routine: bigquery.Routine) -> udf_def.BigqueryUdf: ) -def _get_output_type_override(routine: bigquery.Routine) -> Optional[type[list]]: - if routine.description is not None and isinstance(routine.description, str): - if python_output_type := _utils.get_python_output_type_from_bigframes_metadata( - routine.description - ): - bq_return_type = cast(bigquery.StandardSqlDataType, routine.return_type) - - if bq_return_type is None or bq_return_type.type_kind != "STRING": - raise bf_formatting.create_exception_with_feedback_link( - TypeError, - "An explicit output_type should be provided only for a BigQuery function with STRING output.", - ) - if get_origin(python_output_type) is list: - return python_output_type - else: - raise bf_formatting.create_exception_with_feedback_link( - TypeError, - "Currently only list of " - "a type is supported as python output type.", - ) - - return None - - # TODO(b/399894805): Support managed function. def read_gbq_function( function_name: str, @@ -250,23 +212,16 @@ def bigframes_cloud_function(self) -> Optional[str]: @property def input_dtypes(self): - return self.udf_def.signature.bf_input_types + return tuple(arg.bf_type for arg in self.udf_def.signature.inputs) @property def output_dtype(self): - return self.udf_def.signature.bf_output_type + return self.udf_def.signature.output.bf_type @property def bigframes_bigquery_function_output_dtype(self): return self.output_dtype - def _post_process_series( - self, series: bigframes.series.Series - ) -> bigframes.series.Series: - if self._post_routine is not None: - return self._post_routine(series) - return series - class BigqueryCallableRowRoutine: """ @@ -282,14 +237,11 @@ def __init__( *, local_func: Optional[Callable] = None, cloud_function_ref: Optional[str] = None, - post_routine: Optional[ - Callable[[bigframes.series.Series], bigframes.series.Series] - ] = None, is_managed: bool = False, ): + assert self.udf_def.signature.is_row_processor self._udf_def = udf_def self._session = session - self._post_routine = post_routine self._local_fun = local_func self._cloud_function = cloud_function_ref self._is_managed = is_managed @@ -334,19 +286,12 @@ def bigframes_cloud_function(self) -> Optional[str]: @property def input_dtypes(self): - return self.udf_def.signature.bf_input_types + return tuple(arg.bf_type for arg in self.udf_def.signature.inputs) @property def output_dtype(self): - return self.udf_def.signature.bf_output_type + return self.udf_def.signature.output.bf_type @property def bigframes_bigquery_function_output_dtype(self): return self.output_dtype - - def _post_process_series( - self, series: bigframes.series.Series - ) -> bigframes.series.Series: - if self._post_routine is not None: - return self._post_routine(series) - return series diff --git a/bigframes/functions/function_template.py b/bigframes/functions/function_template.py index e48ffda8ed1..cb0268e6254 100644 --- a/bigframes/functions/function_template.py +++ b/bigframes/functions/function_template.py @@ -19,21 +19,17 @@ import os import re import textwrap -from typing import Tuple import cloudpickle -logger = logging.getLogger(__name__) - +from bigframes.functions import udf_def -# Protocol version 4 is available in python version 3.4 and above -# https://docs.python.org/3/library/pickle.html#data-stream-format -_pickle_protocol_version = 4 +logger = logging.getLogger(__name__) # Placeholder variables for testing. -input_types = ("STRING",) -output_type = "STRING" +input_sql_types = ("STRING",) +output_sql_type = "STRING" # Convert inputs to BigQuery JSON. See: @@ -156,7 +152,7 @@ def udf(*args): # } # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#input_format def udf_http(request): - global input_types, output_type + global input_sql_types, output_sql_type import json import traceback @@ -168,7 +164,7 @@ def udf_http(request): replies = [] for call in calls: reply = convert_to_bq_json( - output_type, udf(*convert_call(input_types, call)) + output_sql_type, udf(*convert_call(input_sql_types, call)) ) if type(reply) is list: # Since the BQ remote function does not support array yet, @@ -182,7 +178,7 @@ def udf_http(request): def udf_http_row_processor(request): - global output_type + global output_sql_type import json import math import traceback @@ -196,7 +192,7 @@ def udf_http_row_processor(request): replies = [] for call in calls: reply = convert_to_bq_json( - output_type, udf(get_pd_series(call[0]), *call[1:]) + output_sql_type, udf(get_pd_series(call[0]), *call[1:]) ) if type(reply) is list: # Since the BQ remote function does not support array yet, @@ -228,38 +224,35 @@ def udf_http_row_processor(request): return jsonify({"errorMessage": traceback.format_exc()}), 400 -def generate_udf_code(def_, directory): +def generate_udf_code(code_def: udf_def.CodeDef, directory: str): """Generate serialized code using cloudpickle given a udf.""" udf_code_file_name = "udf.py" udf_pickle_file_name = "udf.cloudpickle" # original code, only for debugging purpose - udf_code = textwrap.dedent(inspect.getsource(def_)) udf_code_file_path = os.path.join(directory, udf_code_file_name) with open(udf_code_file_path, "w") as f: - f.write(udf_code) + f.write(code_def.function_source) # serialized udf udf_pickle_file_path = os.path.join(directory, udf_pickle_file_name) # TODO(b/345433300): try io.BytesIO to avoid writing to the file system with open(udf_pickle_file_path, "wb") as f: - cloudpickle.dump(def_, f, protocol=_pickle_protocol_version) + f.write(code_def.pickled_code) return udf_code_file_name, udf_pickle_file_name def generate_cloud_function_main_code( - def_, - directory, + code_def: udf_def.CodeDef, + directory: str, *, - input_types: Tuple[str], - output_type: str, - is_row_processor=False, + udf_signature: udf_def.UdfSignature, ): """Get main.py code for the cloud function for the given user defined function.""" # Pickle the udf with all its dependencies - udf_code_file, udf_pickle_file = generate_udf_code(def_, directory) + udf_code_file, udf_pickle_file = generate_udf_code(code_def, directory) code_blocks = [ f"""\ @@ -270,15 +263,15 @@ def generate_cloud_function_main_code( with open("{udf_pickle_file}", "rb") as f: udf = cloudpickle.load(f) -input_types = {repr(input_types)} -output_type = {repr(output_type)} +input_types = {repr(input_sql_types)} +output_type = {repr(output_sql_type)} """ ] # For converting scalar outputs to the correct type. code_blocks.append(inspect.getsource(convert_to_bq_json)) - if is_row_processor: + if udf_signature.is_row_processor: code_blocks.append(inspect.getsource(get_pd_series)) handler_func_name = "udf_http_row_processor" code_blocks.append(inspect.getsource(udf_http_row_processor)) @@ -308,8 +301,6 @@ def generate_managed_function_code( # This code path ensures that if the udf body contains any # references to variables and/or imports outside the body, they are # captured as well. - import cloudpickle - pickled = cloudpickle.dumps(def_) func_code = textwrap.dedent( f""" diff --git a/bigframes/functions/udf_def.py b/bigframes/functions/udf_def.py index 078e45f32d4..d04d25fdfcc 100644 --- a/bigframes/functions/udf_def.py +++ b/bigframes/functions/udf_def.py @@ -14,110 +14,272 @@ from __future__ import annotations import dataclasses +import functools +import hashlib import inspect -from typing import cast, Optional +import io +import os +import textwrap +from typing import Any, cast, get_args, get_origin, Sequence, Type import warnings +import cloudpickle from google.cloud import bigquery +import pandas as pd import bigframes.dtypes import bigframes.exceptions as bfe import bigframes.formatting_helpers as bf_formatting from bigframes.functions import function_typing +# Protocol version 4 is available in python version 3.4 and above +# https://docs.python.org/3/library/pickle.html#data-stream-format +_pickle_protocol_version = 4 + class ReturnTypeMissingError(ValueError): pass @dataclasses.dataclass(frozen=True) -class UdfField: +class UdfArg: name: str = dataclasses.field() - dtype: bigquery.StandardSqlDataType = dataclasses.field(hash=False, compare=False) + dtype: DirectScalarType | RowSeriesInputFieldV1 + + def __post_init__(self): + assert isinstance(self.name, str) + assert isinstance(self.dtype, (DirectScalarType, RowSeriesInputFieldV1)) + + @classmethod + def from_py_param(cls, param: inspect.Parameter) -> UdfArg: + if param.annotation == pd.Series: + return cls(param.name, RowSeriesInputFieldV1()) + return cls(param.name, DirectScalarType(param.annotation)) @classmethod - def from_sdk(cls, arg: bigquery.RoutineArgument) -> UdfField: + def from_sdk(cls, arg: bigquery.RoutineArgument) -> UdfArg: assert arg.name is not None - assert arg.data_type is not None - return cls(arg.name, arg.data_type) + + if arg.data_type is None: + msg = bfe.format_message( + "The function has one or more missing input data types. BigQuery DataFrames " + f"will assume default data type {function_typing.DEFAULT_RF_TYPE} for them." + ) + warnings.warn(msg, category=bfe.UnknownDataTypeWarning) + sdk_type = function_typing.DEFAULT_RF_TYPE + else: + sdk_type = arg.data_type + return cls(arg.name, DirectScalarType.from_sdk_type(sdk_type)) + + @property + def py_type(self) -> type: + return self.dtype.py_type + + @property + def bf_type(self) -> bigframes.dtypes.Dtype: + return self.dtype.bf_type + + @property + def sql_type(self) -> str: + return self.dtype.sql_type + + def stable_hash(self) -> bytes: + hash_val = hashlib.md5() + hash_val.update(self.name.encode()) + hash_val.update(self.dtype.stable_hash()) + return hash_val.digest() @dataclasses.dataclass(frozen=True) -class UdfSignature: - input_types: tuple[UdfField, ...] = dataclasses.field() - output_bq_type: bigquery.StandardSqlDataType = dataclasses.field( - hash=False, compare=False - ) +class DirectScalarType: + _py_type: type + + @property + def py_type(self) -> type: + return self._py_type @property - def bf_input_types(self) -> tuple[bigframes.dtypes.Dtype, ...]: - return tuple( - function_typing.sdk_type_to_bf_type(arg.dtype) for arg in self.input_types + def bf_type(self) -> bigframes.dtypes.Dtype: + return function_typing.sdk_type_to_bf_type( + function_typing.sdk_type_from_python_type(self._py_type) ) @property - def bf_output_type(self) -> bigframes.dtypes.Dtype: - return function_typing.sdk_type_to_bf_type(self.output_bq_type) + def sql_type(self) -> str: + return function_typing.sdk_type_from_python_type(self._py_type).type_kind.name + + def stable_hash(self) -> bytes: + hash_val = hashlib.md5() + hash_val.update(self._py_type.__name__.encode()) + return hash_val.digest() + + @classmethod + def from_sdk_type(cls, sdk_type: bigquery.StandardSqlDataType) -> DirectScalarType: + return cls(function_typing.sdk_type_to_py_type(sdk_type)) + + +@dataclasses.dataclass(frozen=True) +class VirtualListTypeV1: + _PROTOCOL_ID = "virtual_list_v1" + + inner_dtype: DirectScalarType @property - def py_input_types(self) -> tuple[type, ...]: - return tuple( - function_typing.sdk_type_to_py_type(arg.dtype) for arg in self.input_types + def py_type(self) -> Type[list[Any]]: + return list[function_typing.sdk_type_to_py_type(self.inner_dtype)] # type: ignore + + # TODO: Specify emulating type and mapping expressions between said types + @property + def bf_type(self) -> bigframes.dtypes.Dtype: + return bigframes.dtypes.list_type( + function_typing.sdk_type_to_bf_type(self.inner_dtype) ) @property - def py_output_type(self) -> type: - return function_typing.sdk_type_to_py_type(self.output_bq_type) + def emulating_type(self) -> DirectScalarType: + # Regardless of list inner type, string is used to emulate the list in the remote function. + return DirectScalarType(str) + + def out_expr( + self, expr: bigframes.core.expression.Expression + ) -> bigframes.core.expression.Expression: + import bigframes.operations as ops + + # convert json string to array of underlying type + return ops.JSONValueArray(json_path="$").as_expr(expr) @property - def sql_input_types(self) -> tuple[str, ...]: - return tuple( - function_typing.sdk_type_to_sql_string(arg.dtype) - for arg in self.input_types + def sql_type(self) -> str: + return f"ARRAY<{self.inner_dtype.sql_type}>" + + def stable_hash(self) -> bytes: + hash_val = hashlib.md5() + hash_val.update(self._PROTOCOL_ID.encode()) + hash_val.update(self.inner_dtype.stable_hash()) + return hash_val.digest() + + +@dataclasses.dataclass(frozen=True) +class RowSeriesInputFieldV1: + """ + Used to handle functions that logically take a series as an input, but handled via a string protocol in the remote function. + """ + + _PROTOCOL_ID = "row_series_input_v1" + + @property + def py_type(self) -> type: + return pd.Series + + @property + def bf_type(self) -> bigframes.dtypes.Dtype: + # Code paths shouldn't hit this. + raise ValueError("Series does not have a corresponding BigFrames type.") + + @property + def sql_type(self) -> str: + return "STRING" + + def stable_hash(self) -> bytes: + hash_val = hashlib.md5() + hash_val.update(self._PROTOCOL_ID.encode()) + return hash_val.digest() + + +@dataclasses.dataclass(frozen=True) +class UdfSignature: + """ + Represents the mapping of input types from bigframes to sql to python and back. + """ + + inputs: tuple[UdfArg, ...] = dataclasses.field() + output: DirectScalarType | VirtualListTypeV1 + + def __post_init__(self): + if any(isinstance(arg, RowSeriesInputFieldV1) for arg in self.inputs): + if len(self.inputs) != 1: + raise ValueError("Row processor functions must have exactly one input.") + assert all(isinstance(arg, UdfArg) for arg in self.inputs) + assert isinstance(self.output, (DirectScalarType, VirtualListTypeV1)) + + def to_sql_input_signature(self) -> str: + return ",".join(f"{field.name} {field.sql_type}" for field in self.inputs) + + @property + def protocol_metadata(self) -> str: + import bigframes.functions._utils + + # TODO: The output field itself should handle this, to handle protocol versioning. + return bigframes.functions._utils.get_bigframes_metadata( + python_output_type=self.output.py_type ) @property - def sql_output_type(self) -> str: - return function_typing.sdk_type_to_sql_string(self.output_bq_type) + def is_row_processor(self) -> bool: + return any(isinstance(arg, RowSeriesInputFieldV1) for arg in self.inputs) + + def with_devirtualize(self) -> UdfSignature: + if isinstance(self.output, DirectScalarType): + return self + assert isinstance(self.output, VirtualListTypeV1) + return UdfSignature( + inputs=self.inputs, + output=self.output.emulating_type, + ) @classmethod def from_routine(cls, routine: bigquery.Routine) -> UdfSignature: + import bigframes.functions._utils + + ## Handle return type if routine.return_type is None: raise ReturnTypeMissingError + bq_return_type = cast(bigquery.StandardSqlDataType, routine.return_type) + return_type: DirectScalarType | VirtualListTypeV1 = ( + DirectScalarType.from_sdk_type(bq_return_type) + ) + if python_output_type := bigframes.functions._utils.get_python_output_type_from_bigframes_metadata( + routine.description + ): + if routine.return_type is None or bq_return_type.type_kind != "STRING": + raise bf_formatting.create_exception_with_feedback_link( + TypeError, + "An explicit output_type should be provided only for a BigQuery function with STRING output.", + ) + + if get_origin(python_output_type) is list: + inner_type = get_args(python_output_type)[0] + return_type = VirtualListTypeV1(DirectScalarType(inner_type)) + else: + raise bf_formatting.create_exception_with_feedback_link( + TypeError, + "Currently only list of " + "a type is supported as python output type.", + ) + if ( - bq_return_type.type_kind is None - or bq_return_type.type_kind + return_type.sql_type not in function_typing.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS ): raise ValueError( f"Remote function must have one of the following supported output types: {function_typing.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS}" ) + ## Handle input types udf_fields = [] for argument in routine.arguments: - if argument.data_type is None: - msg = bfe.format_message( - "The function has one or more missing input data types. BigQuery DataFrames " - f"will assume default data type {function_typing.DEFAULT_RF_TYPE} for them." - ) - warnings.warn(msg, category=bfe.UnknownDataTypeWarning) - assert argument.name is not None - udf_fields.append( - UdfField(argument.name, function_typing.DEFAULT_RF_TYPE) - ) - else: - udf_fields.append(UdfField.from_sdk(argument)) + udf_fields.append(UdfArg.from_sdk(argument)) return cls( - input_types=tuple(udf_fields), - output_bq_type=bq_return_type, + inputs=tuple(udf_fields), + output=return_type, ) @classmethod def from_py_signature(cls, signature: inspect.Signature): - input_types: list[UdfField] = [] + input_types: list[UdfArg] = [] for parameter in signature.parameters.values(): if parameter.annotation is inspect.Signature.empty: raise bf_formatting.create_exception_with_feedback_link( @@ -126,8 +288,8 @@ def from_py_signature(cls, signature: inspect.Signature): f"'{parameter.name}' is missing a type annotation. " "Types are required to use @remote_function.", ) - bq_type = function_typing.sdk_type_from_python_type(parameter.annotation) - input_types.append(UdfField(parameter.name, bq_type)) + + input_types.append(UdfArg.from_py_param(parameter)) if signature.return_annotation is inspect.Signature.empty: raise bf_formatting.create_exception_with_feedback_link( @@ -136,26 +298,38 @@ def from_py_signature(cls, signature: inspect.Signature): "return type annotation. Types are required to use " "@remote_function.", ) - output_bq_type = function_typing.sdk_type_from_python_type( - signature.return_annotation, - allow_lists=True, - ) - return cls(tuple(input_types), output_bq_type) + + if get_origin(signature.return_annotation) is list: + inner_py_type = get_args(signature.return_annotation)[0] + virtual_list_output_type = VirtualListTypeV1( + DirectScalarType(inner_py_type) + ) + return cls(tuple(input_types), virtual_list_output_type) + else: + direct_output_type = DirectScalarType(signature.return_annotation) + return cls(tuple(input_types), direct_output_type) + + def stable_hash(self) -> bytes: + hash_val = hashlib.md5() + for input_type in self.inputs: + hash_val.update(input_type.stable_hash()) + hash_val.update(self.output.stable_hash()) + return hash_val.digest() @dataclasses.dataclass(frozen=True) class BigqueryUdf: + """ + Represents the information needed to call a BigQuery remote function - not a full spec. + """ + routine_ref: bigquery.RoutineReference = dataclasses.field() signature: UdfSignature - # Used to provide alternative interpretations of output bq type, eg interpret int as timestamp - output_type_override: Optional[bigframes.dtypes.Dtype] = dataclasses.field( - default=None - ) - @property - def bigframes_output_type(self) -> bigframes.dtypes.Dtype: - return self.output_type_override or function_typing.sdk_type_to_bf_type( - self.signature.output_bq_type + def with_devirtualize(self) -> BigqueryUdf: + return BigqueryUdf( + routine_ref=self.routine_ref, + signature=self.signature.with_devirtualize(), ) @classmethod @@ -163,11 +337,127 @@ def from_routine(cls, routine: bigquery.Routine) -> BigqueryUdf: signature = UdfSignature.from_routine(routine) if ( - signature.output_bq_type.type_kind is None - or signature.output_bq_type.type_kind + signature.output.sql_type is None + or signature.output.sql_type not in function_typing.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS ): raise ValueError( f"Remote function must have one of the following supported output types: {function_typing.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS}" ) return cls(routine.reference, signature=signature) + + +@dataclasses.dataclass(frozen=True) +class CodeDef: + # Produced by cloudpickle, not compatible across python versions + pickled_code: bytes + # This is just the function itself, and does not include referenced objects/functions/modules + function_source: str + package_requirements: tuple[str, ...] + + @classmethod + def from_func(cls, func, package_requirements: Sequence[str] | None = None): + bytes_io = io.BytesIO() + cloudpickle.dump(func, bytes_io, protocol=_pickle_protocol_version) + # this is hacky, but works for some nested functions + source = textwrap.dedent(inspect.getsource(func)) + return cls( + pickled_code=bytes_io.getvalue(), + function_source=source, + package_requirements=tuple(package_requirements or []), + ) + + @functools.cache + def stable_hash(self) -> bytes: + # There is a known cell-id sensitivity of the cloudpickle serialization in + # notebooks https://github.com/cloudpipe/cloudpickle/issues/538. Because of + # this, if a cell contains a udf decorated with @remote_function, a unique + # cloudpickle code is generated every time the cell is run, creating new + # cloud artifacts every time. This is slow and wasteful. + # A workaround of the same can be achieved by replacing the filename in the + # code object to a static value + # https://github.com/cloudpipe/cloudpickle/issues/120#issuecomment-338510661. + # + # To respect the user code/environment let's make this modification on a + # copy of the udf, not on the original udf itself. + def_copy = cloudpickle.loads(self.pickled_code) + def_copy.__code__ = def_copy.__code__.replace( + co_filename="bigframes_place_holder_filename" + ) + + normalized_pickled_code = cloudpickle.dumps( + def_copy, protocol=_pickle_protocol_version + ) + + hash_val = hashlib.md5() + hash_val.update(normalized_pickled_code) + + if self.package_requirements: + for p in sorted(self.package_requirements): + hash_val.update(p.encode()) + + return hash_val.digest() + + +@dataclasses.dataclass(frozen=True) +class CloudRunFunctionConfig: + code: CodeDef + signature: UdfSignature + timeout_seconds: int | None + max_instance_count: int | None + vpc_connector: str | None + vpc_connector_egress_settings: str + memory_mib: int | None + cpus: float | None + ingress_settings: str + workers: int | None + threads: int | None + concurrency: int | None + + def stable_hash(self) -> bytes: + hash_val = hashlib.md5() + hash_val.update(self.code.stable_hash()) + hash_val.update(self.signature.stable_hash()) + hash_val.update(str(self.timeout_seconds).encode()) + hash_val.update(str(self.max_instance_count).encode()) + hash_val.update(str(self.vpc_connector).encode()) + hash_val.update(str(self.vpc_connector_egress_settings).encode()) + hash_val.update(str(self.memory_mib).encode()) + hash_val.update(str(self.cpus).encode()) + hash_val.update(str(self.ingress_settings).encode()) + hash_val.update(str(self.workers).encode()) + hash_val.update(str(self.threads).encode()) + hash_val.update(str(self.concurrency).encode()) + return hash_val.digest() + + +@dataclasses.dataclass(frozen=True) +class RemoteFunctionConfig: + """ + Represents the information needed to create a BigQuery remote function. + """ + + endpoint: str + signature: UdfSignature + connection_id: str + max_batching_rows: int + bq_metadata: str | None = None + + @classmethod + def from_bq_routine(cls, routine: bigquery.Routine) -> RemoteFunctionConfig: + return cls( + endpoint=routine.remote_function_options.endpoint, + connection_id=os.path.basename(routine.remote_function_options.connection), + signature=UdfSignature.from_routine(routine), + max_batching_rows=routine.remote_function_options.max_batching_rows, + bq_metadata=routine.description, + ) + + def stable_hash(self) -> bytes: + hash_val = hashlib.md5() + hash_val.update(self.endpoint.encode()) + hash_val.update(self.signature.stable_hash()) + hash_val.update(self.connection_id.encode()) + hash_val.update(str(self.max_batching_rows).encode()) + hash_val.update(str(self.bq_metadata).encode()) + return hash_val.digest() diff --git a/bigframes/operations/remote_function_ops.py b/bigframes/operations/remote_function_ops.py index e610ce61d6e..9c51210df0e 100644 --- a/bigframes/operations/remote_function_ops.py +++ b/bigframes/operations/remote_function_ops.py @@ -31,7 +31,7 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - return self.function_def.bigframes_output_type + return self.function_def.signature.output.bf_type @dataclasses.dataclass(frozen=True) @@ -44,7 +44,7 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - return self.function_def.bigframes_output_type + return self.function_def.signature.output.bf_type @dataclasses.dataclass(frozen=True) @@ -57,4 +57,4 @@ def expensive(self) -> bool: return True def output_type(self, *input_types): - return self.function_def.bigframes_output_type + return self.function_def.signature.output.bf_type diff --git a/bigframes/series.py b/bigframes/series.py index 23799a0a43c..7eb30beb826 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -2042,7 +2042,6 @@ def apply( result_series = self._apply_unary_op( ops.RemoteFunctionOp(function_def=func.udf_def, apply_on_null=True) ) - result_series = func._post_process_series(result_series) return result_series @@ -2095,7 +2094,6 @@ def combine( result_series = self._apply_binary_op( other, ops.BinaryRemoteFunctionOp(function_def=func.udf_def) ) - result_series = func._post_process_series(result_series) return result_series bf_op = python_ops.python_callable_to_op(func) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 114b600d9de..446c01e616d 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -20,6 +20,7 @@ import shutil import tempfile import textwrap +import uuid import warnings import google.api_core.exceptions @@ -529,9 +530,7 @@ def add_one(x): # Expected cloud function name for the unique udf package_requirements = bff_utils.get_updated_package_requirements() add_one_uniq_hash = bff_utils.get_hash(add_one_uniq, package_requirements) - add_one_uniq_cf_name = bff_utils.get_cloud_function_name( - add_one_uniq_hash, session.session_id - ) + add_one_uniq_cf_name = f"bff_{add_one_uniq_hash}_{session.session_id}" # There should be no cloud function yet for the unique udf cloud_functions = list( @@ -1693,6 +1692,50 @@ def square(x): ) +@pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_reflects_config_change_with_reuse(session): + square_remote = None + square_remote_2 = None + try: + + def square(x): + return x * x + + deploy_name = str(uuid.uuid4()) + square_remote = session.remote_function( + input_types=[int], + name=deploy_name, + output_type=int, + reuse=True, + cloud_function_service_account="default", + cloud_function_cpus=1, + )(square) + square_remote_2 = session.remote_function( + input_types=[int], + name=deploy_name, + output_type=int, + reuse=True, + cloud_function_service_account="default", + cloud_function_cpus=2, + )(square) + + # Assert that the GCF is created with the intended max instance count + gcf = session.cloudfunctionsclient.get_function( + name=square_remote_2.bigframes_cloud_function + ) + assert gcf.service_config.available_cpu == 2.0 + finally: + # clean up the gcp assets created for the remote function + if square_remote is not None: + cleanup_function_assets( + square_remote, session.bqclient, session.cloudfunctionsclient + ) + if square_remote_2 is not None: + cleanup_function_assets( + square_remote_2, session.bqclient, session.cloudfunctionsclient + ) + + @pytest.mark.flaky(retries=2, delay=120) def test_df_apply_axis_1(session, scalars_dfs): columns = ["bool_col", "int64_col", "int64_too", "float64_col", "string_col"] diff --git a/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py b/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py index 2667e482c88..da5baea5248 100644 --- a/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py +++ b/tests/unit/core/compile/sqlglot/expressions/test_generic_ops.py @@ -178,17 +178,13 @@ def test_remote_function_op(scalar_types_df: bpd.DataFrame, snapshot): "my_project.my_dataset.my_routine" ), signature=udf_def.UdfSignature( - input_types=( - udf_def.UdfField( + inputs=( + udf_def.UdfArg( "x", - bigquery.StandardSqlDataType( - type_kind=bigquery.StandardSqlTypeNames.INT64 - ), + udf_def.DirectScalarType(int), ), ), - output_bq_type=bigquery.StandardSqlDataType( - type_kind=bigquery.StandardSqlTypeNames.FLOAT64 - ), + output=udf_def.DirectScalarType(float), ), ) ops_map = { @@ -211,23 +207,17 @@ def test_binary_remote_function_op(scalar_types_df: bpd.DataFrame, snapshot): "my_project.my_dataset.my_routine" ), signature=udf_def.UdfSignature( - input_types=( - udf_def.UdfField( + inputs=( + udf_def.UdfArg( "x", - bigquery.StandardSqlDataType( - type_kind=bigquery.StandardSqlTypeNames.INT64 - ), + udf_def.DirectScalarType(int), ), - udf_def.UdfField( + udf_def.UdfArg( "y", - bigquery.StandardSqlDataType( - type_kind=bigquery.StandardSqlTypeNames.FLOAT64 - ), + udf_def.DirectScalarType(float), ), ), - output_bq_type=bigquery.StandardSqlDataType( - type_kind=bigquery.StandardSqlTypeNames.FLOAT64 - ), + output=udf_def.DirectScalarType(float), ), ) ) @@ -244,29 +234,21 @@ def test_nary_remote_function_op(scalar_types_df: bpd.DataFrame, snapshot): "my_project.my_dataset.my_routine" ), signature=udf_def.UdfSignature( - input_types=( - udf_def.UdfField( + inputs=( + udf_def.UdfArg( "x", - bigquery.StandardSqlDataType( - type_kind=bigquery.StandardSqlTypeNames.INT64 - ), + udf_def.DirectScalarType(int), ), - udf_def.UdfField( + udf_def.UdfArg( "y", - bigquery.StandardSqlDataType( - type_kind=bigquery.StandardSqlTypeNames.FLOAT64 - ), + udf_def.DirectScalarType(float), ), - udf_def.UdfField( + udf_def.UdfArg( "z", - bigquery.StandardSqlDataType( - type_kind=bigquery.StandardSqlTypeNames.STRING - ), + udf_def.DirectScalarType(str), ), ), - output_bq_type=bigquery.StandardSqlDataType( - type_kind=bigquery.StandardSqlTypeNames.FLOAT64 - ), + output=udf_def.DirectScalarType(float), ), ) ) diff --git a/tests/unit/functions/test_remote_function.py b/tests/unit/functions/test_remote_function.py index e9e0d0df677..bfb6192a2c4 100644 --- a/tests/unit/functions/test_remote_function.py +++ b/tests/unit/functions/test_remote_function.py @@ -12,36 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import re - -import pandas import pytest -import bigframes.exceptions import bigframes.functions.function as bff from bigframes.testing import mocks -def test_series_input_types_to_str(): - """Check that is_row_processor=True uses str as the input type to serialize a row.""" - session = mocks.create_bigquery_session() - remote_function_decorator = bff.remote_function( - session=session, cloud_function_service_account="default" - ) - - with pytest.warns( - bigframes.exceptions.PreviewWarning, - match=re.escape("input_types=Series is in preview."), - ): - - @remote_function_decorator - def axis_1_function(myparam: pandas.Series) -> str: # type: ignore - return "Hello, " + myparam["str_col"] + "!" # type: ignore - - # Still works as a normal function. - assert axis_1_function(pandas.Series({"str_col": "World"})) == "Hello, World!" - - def test_missing_input_types(): session = mocks.create_bigquery_session() remote_function_decorator = bff.remote_function( @@ -78,36 +54,6 @@ def function_without_return_annotation(myparam: int): remote_function_decorator(function_without_return_annotation) -def test_deploy_remote_function(): - session = mocks.create_bigquery_session() - - def my_remote_func(x: int) -> int: - return x * 2 - - deployed = session.deploy_remote_function( - my_remote_func, cloud_function_service_account="test_sa@example.com" - ) - - # Test that the function would have been deployed somewhere. - assert deployed.bigframes_bigquery_function - - -def test_deploy_remote_function_with_name(): - session = mocks.create_bigquery_session() - - def my_remote_func(x: int) -> int: - return x * 2 - - deployed = session.deploy_remote_function( - my_remote_func, - name="my_custom_name", - cloud_function_service_account="test_sa@example.com", - ) - - # Test that the function would have been deployed somewhere. - assert "my_custom_name" in deployed.bigframes_bigquery_function - - def test_deploy_udf(): session = mocks.create_bigquery_session() diff --git a/tests/unit/functions/test_remote_function_utils.py b/tests/unit/functions/test_remote_function_utils.py index e200e7c12a1..a2687761270 100644 --- a/tests/unit/functions/test_remote_function_utils.py +++ b/tests/unit/functions/test_remote_function_utils.py @@ -41,68 +41,6 @@ def test_get_remote_function_locations( assert cf_region == expected_cf_region -@pytest.mark.parametrize( - "func_hash, session_id, uniq_suffix, expected_name", - [ - ( - "hash123", - None, - None, - "bigframes-hash123", - ), - ( - "hash456", - "session789", - None, - "bigframes-session789-hash456", - ), - ( - "hash123", - None, - "suffixABC", - "bigframes-hash123-suffixABC", - ), - ( - "hash456", - "session789", - "suffixDEF", - "bigframes-session789-hash456-suffixDEF", - ), - ], -) -def test_get_cloud_function_name(func_hash, session_id, uniq_suffix, expected_name): - """Tests the construction of the cloud function name from its parts.""" - result = _utils.get_cloud_function_name(func_hash, session_id, uniq_suffix) - - assert result == expected_name - - -@pytest.mark.parametrize( - "function_hash, session_id, uniq_suffix, expected_name", - [ - ( - "hash123", - "session456", - None, - "bigframes_session456_hash123", - ), - ( - "hash789", - "sessionABC", - "suffixDEF", - "bigframes_sessionABC_hash789_suffixDEF", - ), - ], -) -def test_get_bigframes_function_name( - function_hash, session_id, uniq_suffix, expected_name -): - """Tests the construction of the BigQuery function name from its parts.""" - result = _utils.get_bigframes_function_name(function_hash, session_id, uniq_suffix) - - assert result == expected_name - - def test_get_updated_package_requirements_no_extra_package(): """Tests with no extra package.""" result = _utils.get_updated_package_requirements(capture_references=False) From a66c4dbc16613218dad13fd78abc322268166fd3 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 13 Mar 2026 04:52:28 +0000 Subject: [PATCH 02/10] fix issues --- .../ibis_compiler/scalar_op_registry.py | 6 ++--- bigframes/functions/_function_client.py | 23 ++++++++--------- bigframes/functions/udf_def.py | 25 +++---------------- 3 files changed, 17 insertions(+), 37 deletions(-) diff --git a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index 2b7b336badc..80bd9de3d39 100644 --- a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -1037,7 +1037,7 @@ def timedelta_floor_op_impl(x: ibis_types.NumericValue): @scalar_op_compiler.register_unary_op(ops.RemoteFunctionOp, pass_op=True) def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp): udf_sig = op.function_def.signature - ibis_py_sig = ((arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) + ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) @ibis_udf.scalar.builtin( name=str(op.function_def.routine_ref), signature=ibis_py_sig @@ -1056,7 +1056,7 @@ def binary_remote_function_op_impl( x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp ): udf_sig = op.function_def.signature - ibis_py_sig = ((arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) + ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) @ibis_udf.scalar.builtin( name=str(op.function_def.routine_ref), signature=ibis_py_sig @@ -1073,7 +1073,7 @@ def nary_remote_function_op_impl( *operands: ibis_types.Value, op: ops.NaryRemoteFunctionOp ): udf_sig = op.function_def.signature - ibis_py_sig = ((arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) + ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) arg_names = tuple(arg.name for arg in udf_sig.inputs) @ibis_udf.scalar.builtin( diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 5e303eb6b5b..80ec816b872 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -641,7 +641,7 @@ def provision_bq_remote_function( cloud_function_name = get_cloud_function_name( cloud_func_spec, session_id=self._session.session_id if (name is None) else None, - uniq_suffix=random_suffix if reuse is False else None, + uniq_suffix=random_suffix if (not reuse) else None, ) cf_endpoint = self.get_cloud_function_endpoint(cloud_function_name) @@ -659,32 +659,29 @@ def provision_bq_remote_function( max_batching_rows=max_batching_rows, signature=func_signature, ) - # Override value-derived name if reuse is False, or if name is explicitly provided. - # If reuse is False, then attach a unique suffix to the intended - # function name to make it unique. - rf_name = get_bigframes_function_name( + remote_function_name = name or get_bigframes_function_name( intended_rf_spec, - self._session.session_id if (name is None) else None, - random_suffix if reuse is False else None, + self._session.session_id, + random_suffix if (not reuse) else None, ) if reuse: - existing_rf_spec = self.get_remote_function_specs(rf_name) + existing_rf_spec = self.get_remote_function_specs(remote_function_name) # Create the BQ remote function in following circumstances: # 1. It does not exist # 2. It exists but the existing remote function has different # configuration than intended created_new = False if not existing_rf_spec or (existing_rf_spec != intended_rf_spec): - self.create_bq_remote_function(rf_name, intended_rf_spec) + self.create_bq_remote_function(remote_function_name, intended_rf_spec) created_new = True else: - logger.info(f"Remote function {rf_name} already exists.") + logger.info(f"Remote function {remote_function_name} already exists.") - return rf_name, cloud_function_name, created_new + return remote_function_name, cloud_function_name, created_new else: - self.create_bq_remote_function(rf_name, intended_rf_spec) - return rf_name, cloud_function_name, True + self.create_bq_remote_function(remote_function_name, intended_rf_spec) + return remote_function_name, cloud_function_name, True def get_remote_function_specs( self, remote_function_name: str diff --git a/bigframes/functions/udf_def.py b/bigframes/functions/udf_def.py index d04d25fdfcc..0d7d9b8cbbe 100644 --- a/bigframes/functions/udf_def.py +++ b/bigframes/functions/udf_def.py @@ -106,7 +106,9 @@ def bf_type(self) -> bigframes.dtypes.Dtype: @property def sql_type(self) -> str: - return function_typing.sdk_type_from_python_type(self._py_type).type_kind.name + type_kind = function_typing.sdk_type_from_python_type(self._py_type) + assert type_kind is not None + return type_kind.name def stable_hash(self) -> bytes: hash_val = hashlib.md5() @@ -131,9 +133,7 @@ def py_type(self) -> Type[list[Any]]: # TODO: Specify emulating type and mapping expressions between said types @property def bf_type(self) -> bigframes.dtypes.Dtype: - return bigframes.dtypes.list_type( - function_typing.sdk_type_to_bf_type(self.inner_dtype) - ) + return bigframes.dtypes.list_type(self.inner_dtype.bf_type) @property def emulating_type(self) -> DirectScalarType: @@ -259,14 +259,6 @@ def from_routine(cls, routine: bigquery.Routine) -> UdfSignature: "a type is supported as python output type.", ) - if ( - return_type.sql_type - not in function_typing.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS - ): - raise ValueError( - f"Remote function must have one of the following supported output types: {function_typing.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS}" - ) - ## Handle input types udf_fields = [] for argument in routine.arguments: @@ -335,15 +327,6 @@ def with_devirtualize(self) -> BigqueryUdf: @classmethod def from_routine(cls, routine: bigquery.Routine) -> BigqueryUdf: signature = UdfSignature.from_routine(routine) - - if ( - signature.output.sql_type is None - or signature.output.sql_type - not in function_typing.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS - ): - raise ValueError( - f"Remote function must have one of the following supported output types: {function_typing.RF_SUPPORTED_IO_BIGQUERY_TYPEKINDS}" - ) return cls(routine.reference, signature=signature) From 2efc89e4f21b9bd8a88378b38b29b859ac523236 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 13 Mar 2026 06:33:17 +0000 Subject: [PATCH 03/10] fix some more stuff --- bigframes/functions/_function_client.py | 7 ++++++- bigframes/functions/function.py | 4 ---- bigframes/functions/function_template.py | 19 +++++++++++-------- bigframes/functions/udf_def.py | 10 ++++++---- 4 files changed, 23 insertions(+), 17 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 80ec816b872..ef4221cb44e 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -694,7 +694,12 @@ def get_remote_function_specs( for routine in routines: routine = cast(bigquery.Routine, routine) if routine.reference.routine_id == remote_function_name: - return udf_def.RemoteFunctionConfig.from_bq_routine(routine) + try: + return udf_def.RemoteFunctionConfig.from_bq_routine(routine) + except udf_def.ReturnTypeMissingError: + # The remote function exists, but it's missing a return type. + # Something is wrong with the function, so we should replace it. + return None except google.api_core.exceptions.NotFound: # The dataset might not exist, in which case the remote function doesn't, either. # Note: list_routines doesn't make an API request until we iterate on the response object. diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index d2b508b9c65..0bcc3d11482 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -160,14 +160,10 @@ def __init__( *, local_func: Optional[Callable] = None, cloud_function_ref: Optional[str] = None, - post_routine: Optional[ - Callable[[bigframes.series.Series], bigframes.series.Series] - ] = None, is_managed: bool = False, ): self._udf_def = udf_def self._session = session - self._post_routine = post_routine self._local_fun = local_func self._cloud_function = cloud_function_ref self._is_managed = is_managed diff --git a/bigframes/functions/function_template.py b/bigframes/functions/function_template.py index cb0268e6254..31b5b20520d 100644 --- a/bigframes/functions/function_template.py +++ b/bigframes/functions/function_template.py @@ -28,8 +28,8 @@ # Placeholder variables for testing. -input_sql_types = ("STRING",) -output_sql_type = "STRING" +input_types = ("STRING",) +output_type = "STRING" # Convert inputs to BigQuery JSON. See: @@ -152,7 +152,7 @@ def udf(*args): # } # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#input_format def udf_http(request): - global input_sql_types, output_sql_type + global input_types, output_type import json import traceback @@ -164,7 +164,7 @@ def udf_http(request): replies = [] for call in calls: reply = convert_to_bq_json( - output_sql_type, udf(*convert_call(input_sql_types, call)) + output_type, udf(*convert_call(input_types, call)) ) if type(reply) is list: # Since the BQ remote function does not support array yet, @@ -178,7 +178,7 @@ def udf_http(request): def udf_http_row_processor(request): - global output_sql_type + global output_type import json import math import traceback @@ -192,7 +192,7 @@ def udf_http_row_processor(request): replies = [] for call in calls: reply = convert_to_bq_json( - output_sql_type, udf(get_pd_series(call[0]), *call[1:]) + output_type, udf(get_pd_series(call[0]), *call[1:]) ) if type(reply) is list: # Since the BQ remote function does not support array yet, @@ -254,6 +254,9 @@ def generate_cloud_function_main_code( # Pickle the udf with all its dependencies udf_code_file, udf_pickle_file = generate_udf_code(code_def, directory) + input_types = tuple(arg.sql_type for arg in udf_signature.inputs) + output_type = udf_signature.output.sql_type + code_blocks = [ f"""\ import cloudpickle @@ -263,8 +266,8 @@ def generate_cloud_function_main_code( with open("{udf_pickle_file}", "rb") as f: udf = cloudpickle.load(f) -input_types = {repr(input_sql_types)} -output_type = {repr(output_sql_type)} +input_types = {repr(input_types)} +output_type = {repr(output_type)} """ ] diff --git a/bigframes/functions/udf_def.py b/bigframes/functions/udf_def.py index 0d7d9b8cbbe..b065ec55c96 100644 --- a/bigframes/functions/udf_def.py +++ b/bigframes/functions/udf_def.py @@ -106,9 +106,9 @@ def bf_type(self) -> bigframes.dtypes.Dtype: @property def sql_type(self) -> str: - type_kind = function_typing.sdk_type_from_python_type(self._py_type) - assert type_kind is not None - return type_kind.name + sdk_type = function_typing.sdk_type_from_python_type(self._py_type) + assert sdk_type.type_kind is not None + return sdk_type.type_kind.name def stable_hash(self) -> bytes: hash_val = hashlib.md5() @@ -233,7 +233,9 @@ def from_routine(cls, routine: bigquery.Routine) -> UdfSignature: ## Handle return type if routine.return_type is None: - raise ReturnTypeMissingError + raise ReturnTypeMissingError( + f"Routine {routine} has no return type. Routine properties: {routine._properties}" + ) bq_return_type = cast(bigquery.StandardSqlDataType, routine.return_type) From 33f5727423cd1813280114a88c98877e38ad8e6a Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 13 Mar 2026 22:01:24 +0000 Subject: [PATCH 04/10] fixes --- .../ibis_compiler/scalar_op_registry.py | 3 + bigframes/core/rewrite/udfs.py | 25 +++--- bigframes/functions/_function_client.py | 4 +- bigframes/functions/_function_session.py | 4 +- bigframes/functions/function.py | 2 +- bigframes/functions/function_typing.py | 2 +- bigframes/functions/udf_def.py | 77 +++++++++++++------ .../small/functions/test_remote_function.py | 40 ++++------ 8 files changed, 94 insertions(+), 63 deletions(-) diff --git a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index 80bd9de3d39..10698366fee 100644 --- a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -1037,6 +1037,7 @@ def timedelta_floor_op_impl(x: ibis_types.NumericValue): @scalar_op_compiler.register_unary_op(ops.RemoteFunctionOp, pass_op=True) def remote_function_op_impl(x: ibis_types.Value, op: ops.RemoteFunctionOp): udf_sig = op.function_def.signature + assert not udf_sig.is_virtual # should have been devirtualized in lowering pass ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) @ibis_udf.scalar.builtin( @@ -1056,6 +1057,7 @@ def binary_remote_function_op_impl( x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp ): udf_sig = op.function_def.signature + assert not udf_sig.is_virtual # should have been devirtualized in lowering pass ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) @ibis_udf.scalar.builtin( @@ -1073,6 +1075,7 @@ def nary_remote_function_op_impl( *operands: ibis_types.Value, op: ops.NaryRemoteFunctionOp ): udf_sig = op.function_def.signature + assert not udf_sig.is_virtual # should have been devirtualized in lowering pass ibis_py_sig = (tuple(arg.py_type for arg in udf_sig.inputs), udf_sig.output.py_type) arg_names = tuple(arg.name for arg in udf_sig.inputs) diff --git a/bigframes/core/rewrite/udfs.py b/bigframes/core/rewrite/udfs.py index 3a2f3c9539c..f9aa330247b 100644 --- a/bigframes/core/rewrite/udfs.py +++ b/bigframes/core/rewrite/udfs.py @@ -30,14 +30,14 @@ def op(self) -> type[ops.ScalarOp]: def lower(self, expr: expression.OpExpression) -> expression.Expression: assert isinstance(expr.op, ops.RemoteFunctionOp) func_def = expr.op.function_def - if isinstance(func_def.signature.output, udf_def.DirectScalarType): - return expr.op.as_expr(*expr.children) - assert isinstance(func_def.signature.output, udf_def.VirtualListTypeV1) devirtualized_expr = ops.RemoteFunctionOp( func_def.with_devirtualize(), apply_on_null=expr.op.apply_on_null, ).as_expr(*expr.children) - return func_def.signature.output.out_expr(devirtualized_expr) + if isinstance(func_def.signature.output, udf_def.VirtualListTypeV1): + return func_def.signature.output.out_expr(devirtualized_expr) + else: + return devirtualized_expr @dataclasses.dataclass @@ -49,14 +49,13 @@ def op(self) -> type[ops.ScalarOp]: def lower(self, expr: expression.OpExpression) -> expression.Expression: assert isinstance(expr.op, ops.BinaryRemoteFunctionOp) func_def = expr.op.function_def - - if isinstance(func_def.signature.output, udf_def.DirectScalarType): - return expr.op.as_expr(*expr.children) - assert isinstance(func_def.signature.output, udf_def.VirtualListTypeV1) devirtualized_expr = ops.BinaryRemoteFunctionOp( func_def.with_devirtualize(), ).as_expr(*expr.children) - return func_def.signature.output.out_expr(devirtualized_expr) + if isinstance(func_def.signature.output, udf_def.VirtualListTypeV1): + return func_def.signature.output.out_expr(devirtualized_expr) + else: + return devirtualized_expr @dataclasses.dataclass @@ -68,13 +67,13 @@ def op(self) -> type[ops.ScalarOp]: def lower(self, expr: expression.OpExpression) -> expression.Expression: assert isinstance(expr.op, ops.NaryRemoteFunctionOp) func_def = expr.op.function_def - if isinstance(func_def.signature.output, udf_def.DirectScalarType): - return expr.op.as_expr(*expr.children) - assert isinstance(func_def.signature.output, udf_def.VirtualListTypeV1) devirtualized_expr = ops.NaryRemoteFunctionOp( func_def.with_devirtualize(), ).as_expr(*expr.children) - return func_def.signature.output.out_expr(devirtualized_expr) + if isinstance(func_def.signature.output, udf_def.VirtualListTypeV1): + return func_def.signature.output.out_expr(devirtualized_expr) + else: + return devirtualized_expr UDF_LOWERING_RULES = ( diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index ef4221cb44e..d63cc571dcb 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -177,7 +177,6 @@ def create_bq_remote_function( # Create BQ function # https://cloud.google.com/bigquery/docs/reference/standard-sql/remote-functions#create_a_remote_function_2 - bq_function_return_type = udf_def.signature.output.sql_type remote_function_options = { "endpoint": udf_def.endpoint, @@ -203,7 +202,7 @@ def create_bq_remote_function( bq_function_name_escaped = bigframes.core.sql.identifier(sql_func_legal_name) create_function_ddl = f""" CREATE OR REPLACE FUNCTION `{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name_escaped}({udf_def.signature.to_sql_input_signature()}) - RETURNS {bq_function_return_type} + RETURNS {udf_def.signature.with_devirtualize().output.sql_type} REMOTE WITH CONNECTION `{self._gcp_project_id}.{self._bq_location}.{self._bq_connection_id}` OPTIONS ({remote_function_options_str})""" @@ -658,6 +657,7 @@ def provision_bq_remote_function( connection_id=self._bq_connection_id, max_batching_rows=max_batching_rows, signature=func_signature, + bq_metadata=func_signature.protocol_metadata, ) remote_function_name = name or get_bigframes_function_name( intended_rf_spec, diff --git a/bigframes/functions/_function_session.py b/bigframes/functions/_function_session.py index b23a29a0854..85753a71ce3 100644 --- a/bigframes/functions/_function_session.py +++ b/bigframes/functions/_function_session.py @@ -596,7 +596,9 @@ def wrapper(func): session=session, # type: ignore ) - udf_sig = udf_def.UdfSignature.from_py_signature(py_sig) + udf_sig = udf_def.UdfSignature.from_py_signature( + py_sig + ).to_remote_function_compatible() ( rf_name, diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index 0bcc3d11482..5d02c269d7e 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -235,7 +235,7 @@ def __init__( cloud_function_ref: Optional[str] = None, is_managed: bool = False, ): - assert self.udf_def.signature.is_row_processor + assert udf_def.signature.is_row_processor self._udf_def = udf_def self._session = session self._local_fun = local_func diff --git a/bigframes/functions/function_typing.py b/bigframes/functions/function_typing.py index 30804f317c4..43ccfe9b25b 100644 --- a/bigframes/functions/function_typing.py +++ b/bigframes/functions/function_typing.py @@ -81,7 +81,7 @@ def __init__(self, type_, supported_types): def sdk_type_from_python_type( - t: type, allow_lists: bool = False + t: type, allow_lists: bool = True ) -> bigquery.StandardSqlDataType: if (get_origin(t) is list) and allow_lists: return sdk_array_output_type_from_python_type(t) diff --git a/bigframes/functions/udf_def.py b/bigframes/functions/udf_def.py index b065ec55c96..54966c0fce7 100644 --- a/bigframes/functions/udf_def.py +++ b/bigframes/functions/udf_def.py @@ -92,6 +92,12 @@ def stable_hash(self) -> bytes: @dataclasses.dataclass(frozen=True) class DirectScalarType: + """ + Represents a scalar value that is passed directly to the remote function. + + For these values, BigQuery handles the serialization and deserialization without any additional processing. + """ + _py_type: type @property @@ -119,16 +125,26 @@ def stable_hash(self) -> bytes: def from_sdk_type(cls, sdk_type: bigquery.StandardSqlDataType) -> DirectScalarType: return cls(function_typing.sdk_type_to_py_type(sdk_type)) + @property + def emulating_type(self) -> DirectScalarType: + return self + @dataclasses.dataclass(frozen=True) class VirtualListTypeV1: + """ + Represents a list of scalar values that is emulated as a JSON array string in the remote function. + + Only works as output paramter right now where array -> string in function runtime, and then string -> array in SQL post-processing (defined in out_expr()). + """ + _PROTOCOL_ID = "virtual_list_v1" inner_dtype: DirectScalarType @property def py_type(self) -> Type[list[Any]]: - return list[function_typing.sdk_type_to_py_type(self.inner_dtype)] # type: ignore + return list[self.inner_dtype.py_type] # type: ignore # TODO: Specify emulating type and mapping expressions between said types @property @@ -163,6 +179,8 @@ def stable_hash(self) -> bytes: class RowSeriesInputFieldV1: """ Used to handle functions that logically take a series as an input, but handled via a string protocol in the remote function. + + For these, the serialization is dependent on index metadata, which must be provided by the caller. """ _PROTOCOL_ID = "row_series_input_v1" @@ -180,6 +198,11 @@ def bf_type(self) -> bigframes.dtypes.Dtype: def sql_type(self) -> str: return "STRING" + @property + def emulating_type(self) -> DirectScalarType: + # Regardless of list inner type, string is used to emulate the list in the remote function. + return DirectScalarType(str) + def stable_hash(self) -> bytes: hash_val = hashlib.md5() hash_val.update(self._PROTOCOL_ID.encode()) @@ -196,14 +219,14 @@ class UdfSignature: output: DirectScalarType | VirtualListTypeV1 def __post_init__(self): - if any(isinstance(arg, RowSeriesInputFieldV1) for arg in self.inputs): - if len(self.inputs) != 1: - raise ValueError("Row processor functions must have exactly one input.") assert all(isinstance(arg, UdfArg) for arg in self.inputs) assert isinstance(self.output, (DirectScalarType, VirtualListTypeV1)) def to_sql_input_signature(self) -> str: - return ",".join(f"{field.name} {field.sql_type}" for field in self.inputs) + return ",".join( + f"{field.name} {field.sql_type}" + for field in self.with_devirtualize().inputs + ) @property def protocol_metadata(self) -> str: @@ -214,16 +237,20 @@ def protocol_metadata(self) -> str: python_output_type=self.output.py_type ) + @property + def is_virtual(self) -> bool: + dtypes = (self.output,) + tuple(arg.dtype for arg in self.inputs) + return not all(isinstance(dtype, DirectScalarType) for dtype in dtypes) + @property def is_row_processor(self) -> bool: - return any(isinstance(arg, RowSeriesInputFieldV1) for arg in self.inputs) + return any(isinstance(arg.dtype, RowSeriesInputFieldV1) for arg in self.inputs) def with_devirtualize(self) -> UdfSignature: - if isinstance(self.output, DirectScalarType): - return self - assert isinstance(self.output, VirtualListTypeV1) return UdfSignature( - inputs=self.inputs, + inputs=tuple( + UdfArg(arg.name, arg.dtype.emulating_type) for arg in self.inputs + ), output=self.output.emulating_type, ) @@ -245,7 +272,7 @@ def from_routine(cls, routine: bigquery.Routine) -> UdfSignature: if python_output_type := bigframes.functions._utils.get_python_output_type_from_bigframes_metadata( routine.description ): - if routine.return_type is None or bq_return_type.type_kind != "STRING": + if bq_return_type.type_kind != "STRING": raise bf_formatting.create_exception_with_feedback_link( TypeError, "An explicit output_type should be provided only for a BigQuery function with STRING output.", @@ -280,7 +307,7 @@ def from_py_signature(cls, signature: inspect.Signature): ValueError, "'input_types' was not set and parameter " f"'{parameter.name}' is missing a type annotation. " - "Types are required to use @remote_function.", + "Types are required to use udfs.", ) input_types.append(UdfArg.from_py_param(parameter)) @@ -290,18 +317,22 @@ def from_py_signature(cls, signature: inspect.Signature): ValueError, "'output_type' was not set and function is missing a " "return type annotation. Types are required to use " - "@remote_function.", + "udfs.", ) - if get_origin(signature.return_annotation) is list: - inner_py_type = get_args(signature.return_annotation)[0] - virtual_list_output_type = VirtualListTypeV1( - DirectScalarType(inner_py_type) - ) - return cls(tuple(input_types), virtual_list_output_type) - else: - direct_output_type = DirectScalarType(signature.return_annotation) - return cls(tuple(input_types), direct_output_type) + output_type = DirectScalarType(signature.return_annotation) + return cls(tuple(input_types), output_type) + + def to_remote_function_compatible(self) -> UdfSignature: + # need to virtualize list outputs + if isinstance(self.output, DirectScalarType): + if get_origin(self.output.py_type) is list: + inner_py_type = get_args(self.output.py_type)[0] + return UdfSignature( + inputs=self.inputs, + output=VirtualListTypeV1(DirectScalarType(inner_py_type)), + ) + return self def stable_hash(self) -> bytes: hash_val = hashlib.md5() @@ -321,6 +352,8 @@ class BigqueryUdf: signature: UdfSignature def with_devirtualize(self) -> BigqueryUdf: + if not self.signature.is_virtual: + return self return BigqueryUdf( routine_ref=self.routine_ref, signature=self.signature.with_devirtualize(), diff --git a/tests/system/small/functions/test_remote_function.py b/tests/system/small/functions/test_remote_function.py index 1ee60dafd66..d6de1772197 100644 --- a/tests/system/small/functions/test_remote_function.py +++ b/tests/system/small/functions/test_remote_function.py @@ -34,7 +34,11 @@ from bigframes.functions import _utils as bff_utils from bigframes.functions import function as bff import bigframes.session._io.bigquery -from bigframes.testing.utils import assert_frame_equal, get_function_name +from bigframes.testing.utils import ( + assert_frame_equal, + assert_series_equal, + get_function_name, +) _prefixer = test_utils.prefixer.Prefixer("bigframes", "") @@ -617,7 +621,7 @@ def bytes_to_hex(mybytes: bytes) -> bytes: )(bytes_to_hex) bf_result = scalars_df.bytes_col.map(remote_bytes_to_hex).to_pandas() - pd.testing.assert_series_equal( + assert_series_equal( bf_result, pd_result, ) @@ -785,7 +789,7 @@ def test_read_gbq_function_runs_existing_udf_array_output(session, routine_id_un pd_result = pd_s.apply(func) bf_result = bf_s.apply(func) assert bigframes.dtypes.is_array_string_like(bf_result.dtype) - pd.testing.assert_series_equal( + assert_series_equal( pd_result, bf_result.to_pandas(), check_dtype=False, check_index_type=False ) @@ -826,7 +830,7 @@ def test_read_gbq_function_runs_existing_udf_2_params_array_output( pd_result = pd_df["col0"].combine(pd_df["col1"], func) bf_result = bf_df["col0"].combine(bf_df["col1"], func) assert bigframes.dtypes.is_array_string_like(bf_result.dtype) - pd.testing.assert_series_equal( + assert_series_equal( pd_result, bf_result.to_pandas(), check_dtype=False, check_index_type=False ) @@ -881,7 +885,7 @@ def test_read_gbq_function_runs_existing_udf_4_params_array_output( ) bf_result = bf_df.apply(func, axis=1) assert bigframes.dtypes.is_array_string_like(bf_result.dtype) - pd.testing.assert_series_equal( + assert_series_equal( pd_result, bf_result.to_pandas(), check_dtype=False, check_index_type=False ) @@ -1060,9 +1064,7 @@ def test_read_gbq_function_respects_python_output_type( actual = s.apply(func).to_pandas() # ignore type disparities, e.g. "int64" in pandas v/s "Int64" in bigframes - pd.testing.assert_series_equal( - expected, actual, check_dtype=False, check_index_type=False - ) + assert_series_equal(expected, actual, check_dtype=False, check_index_type=False) @pytest.mark.parametrize( @@ -1200,9 +1202,7 @@ def add_ints(row: pandas.Series) -> int: # bf_result.to_numpy() produces an array of numpy.float64's # (in system_prerelease tests), while pd_result.to_numpy() produces an # array of ints, ignore this mismatch by using check_exact=False. - pd.testing.assert_series_equal( - pd_result, bf_result, check_dtype=False, check_exact=False - ) + assert_series_equal(pd_result, bf_result, check_dtype=False, check_exact=False) # Read back the deployed BQ remote function using read_gbq_function. func_ref = session.read_gbq_function( @@ -1215,9 +1215,7 @@ def add_ints(row: pandas.Series) -> int: assert func_ref.bigframes_remote_function == func_ref.bigframes_bigquery_function # type: ignore bf_result_gbq = scalars_df[columns].apply(func_ref, axis=1).to_pandas() - pd.testing.assert_series_equal( - pd_result, bf_result_gbq, check_dtype=False, check_exact=False - ) + assert_series_equal(pd_result, bf_result_gbq, check_dtype=False, check_exact=False) @pytest.mark.flaky(retries=2, delay=120) @@ -1253,9 +1251,7 @@ def add_ints(row: pandas.Series) -> int: # bf_result.to_numpy() produces an array of numpy.float64's # (in system_prerelease tests), while pd_result.to_numpy() produces an # array of ints, ignore this mismatch by using check_exact=False. - pd.testing.assert_series_equal( - pd_result, bf_result, check_dtype=False, check_exact=False - ) + assert_series_equal(pd_result, bf_result, check_dtype=False, check_exact=False) @pytest.mark.flaky(retries=2, delay=120) @@ -1286,9 +1282,7 @@ def add_numbers(row): # bf_result.index[0].dtype is 'string[pyarrow]' while # pd_result.index[0].dtype is 'object', ignore this mismatch by using # check_index_type=False. - pd.testing.assert_series_equal( - pd_result, bf_result, check_dtype=False, check_index_type=False - ) + assert_series_equal(pd_result, bf_result, check_dtype=False, check_index_type=False) def test_df_apply_axis_1_unsupported_callable(scalars_dfs): @@ -1452,7 +1446,7 @@ def is_odd(x: int) -> bool: bf_result = bf_method(is_odd_remote).to_pandas() # ignore any dtype difference - pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) @@ -1501,7 +1495,7 @@ def add(x: int, y: int) -> int: ) # ignore any dtype difference - pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.flaky(retries=2, delay=120) @@ -1563,7 +1557,7 @@ def add_pandas(s: pd.Series) -> float: bf_result = bf_df[bf_filter].apply(add_remote, axis=1).to_pandas() # ignore any dtype difference - pd.testing.assert_series_equal(pd_result, bf_result, check_dtype=False) + assert_series_equal(pd_result, bf_result, check_dtype=False) @pytest.mark.parametrize( From e2de96bca7689210700a7ff614299eeb13cd5b6e Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 16 Mar 2026 20:06:27 +0000 Subject: [PATCH 05/10] fix test issues --- bigframes/functions/_function_client.py | 4 ++-- bigframes/functions/udf_def.py | 13 +++++++------ tests/system/large/ml/test_ensemble.py | 2 +- tests/system/large/ml/test_llm.py | 2 +- .../system/small/functions/test_remote_function.py | 2 +- 5 files changed, 12 insertions(+), 11 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index d63cc571dcb..42a22a6e5c2 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -586,7 +586,7 @@ def provision_bq_remote_function( cloud_function_vpc_connector_egress_settings: str | None, cloud_function_memory_mib: int | None, cloud_function_cpus: float | None, - cloud_function_ingress_settings: str | None, + cloud_function_ingress_settings: str, ): """Provision a BigQuery remote function.""" # Augment user package requirements with any internal package @@ -620,7 +620,7 @@ def provision_bq_remote_function( or "private-ranges-only", memory_mib=cloud_function_memory_mib, cpus=cloud_function_cpus, - ingress_settings=cloud_function_ingress_settings or "internal_only", + ingress_settings=cloud_function_ingress_settings, workers=workers, threads=threads, concurrency=concurrency, diff --git a/bigframes/functions/udf_def.py b/bigframes/functions/udf_def.py index 54966c0fce7..0f3dc322c09 100644 --- a/bigframes/functions/udf_def.py +++ b/bigframes/functions/udf_def.py @@ -113,8 +113,7 @@ def bf_type(self) -> bigframes.dtypes.Dtype: @property def sql_type(self) -> str: sdk_type = function_typing.sdk_type_from_python_type(self._py_type) - assert sdk_type.type_kind is not None - return sdk_type.type_kind.name + return function_typing.sdk_type_to_sql_string(sdk_type) def stable_hash(self) -> bytes: hash_val = hashlib.md5() @@ -229,13 +228,15 @@ def to_sql_input_signature(self) -> str: ) @property - def protocol_metadata(self) -> str: + def protocol_metadata(self) -> str | None: import bigframes.functions._utils # TODO: The output field itself should handle this, to handle protocol versioning. - return bigframes.functions._utils.get_bigframes_metadata( - python_output_type=self.output.py_type - ) + if isinstance(self.output, VirtualListTypeV1): + return bigframes.functions._utils.get_bigframes_metadata( + python_output_type=self.output.py_type + ) + return None @property def is_virtual(self) -> bool: diff --git a/tests/system/large/ml/test_ensemble.py b/tests/system/large/ml/test_ensemble.py index c2e9036eed7..eabd36ab387 100644 --- a/tests/system/large/ml/test_ensemble.py +++ b/tests/system/large/ml/test_ensemble.py @@ -155,7 +155,7 @@ def test_xgbclassifier_default_params(penguins_df_default_index, dataset_id): ) -# @pytest.mark.flaky(retries=2) +@pytest.mark.flaky(retries=2) def test_xgbclassifier_dart_booster_multiple_params( penguins_df_default_index, dataset_id ): diff --git a/tests/system/large/ml/test_llm.py b/tests/system/large/ml/test_llm.py index 6e2695b1b53..5d780b7615e 100644 --- a/tests/system/large/ml/test_llm.py +++ b/tests/system/large/ml/test_llm.py @@ -63,7 +63,7 @@ def test_create_load_gemini_text_generator_model( "gemini-2.5-flash-lite", ), ) -# @pytest.mark.flaky(retries=2) +@pytest.mark.flaky(retries=2) def test_gemini_text_generator_predict_default_params_success( llm_text_df, model_name, session, bq_connection ): diff --git a/tests/system/small/functions/test_remote_function.py b/tests/system/small/functions/test_remote_function.py index d6de1772197..643f503c052 100644 --- a/tests/system/small/functions/test_remote_function.py +++ b/tests/system/small/functions/test_remote_function.py @@ -106,7 +106,7 @@ def get_bq_connection_id_path_format(connection_id_dot_format): return f"projects/{fields[0]}/locations/{fields[1]}/connections/{fields[2]}" -# @pytest.mark.flaky(retries=2, delay=120) +@pytest.mark.flaky(retries=2, delay=120) def test_remote_function_direct_no_session_param( bigquery_client, bigqueryconnection_client, From 2196527fb85a755107d03339ea08dc4435fdc6ec Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 16 Mar 2026 22:23:10 +0000 Subject: [PATCH 06/10] fix mores tests --- bigframes/functions/_function_client.py | 22 +++++++--------- bigframes/functions/function.py | 12 ++++++--- bigframes/functions/udf_def.py | 26 ++++++++++++++++--- .../large/functions/test_remote_function.py | 2 +- 4 files changed, 43 insertions(+), 19 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 42a22a6e5c2..79f79432d4d 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -687,19 +687,17 @@ def get_remote_function_specs( self, remote_function_name: str ) -> udf_def.RemoteFunctionConfig | None: """Check whether a remote function already exists for the udf.""" - routines = self._bq_client.list_routines( - f"{self._gcp_project_id}.{self._bq_dataset}" - ) try: - for routine in routines: - routine = cast(bigquery.Routine, routine) - if routine.reference.routine_id == remote_function_name: - try: - return udf_def.RemoteFunctionConfig.from_bq_routine(routine) - except udf_def.ReturnTypeMissingError: - # The remote function exists, but it's missing a return type. - # Something is wrong with the function, so we should replace it. - return None + routine = self._bq_client.get_routine( + f"{self._gcp_project_id}.{self._bq_dataset}.{remote_function_name}" + ) + if routine.reference.routine_id == remote_function_name: + try: + return udf_def.RemoteFunctionConfig.from_bq_routine(routine) + except udf_def.ReturnTypeMissingError: + # The remote function exists, but it's missing a return type. + # Something is wrong with the function, so we should replace it. + return None except google.api_core.exceptions.NotFound: # The dataset might not exist, in which case the remote function doesn't, either. # Note: list_routines doesn't make an API request until we iterate on the response object. diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index 5d02c269d7e..69e59327c6a 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -91,16 +91,21 @@ def _try_import_routine( def _try_import_row_routine( routine: bigquery.Routine, session: bigframes.Session ) -> BigqueryCallableRowRoutine: - udf_def = _routine_as_udf_def(routine) + udf_def = _routine_as_udf_def(routine, is_row_processor=True) + is_remote = ( hasattr(routine, "remote_function_options") and routine.remote_function_options ) return BigqueryCallableRowRoutine(udf_def, session, is_managed=not is_remote) -def _routine_as_udf_def(routine: bigquery.Routine) -> udf_def.BigqueryUdf: +def _routine_as_udf_def( + routine: bigquery.Routine, is_row_processor: bool = False +) -> udf_def.BigqueryUdf: try: - return udf_def.BigqueryUdf.from_routine(routine) + return udf_def.BigqueryUdf.from_routine( + routine, is_row_processor=is_row_processor + ) except udf_def.ReturnTypeMissingError: raise bf_formatting.create_exception_with_feedback_link( ValueError, "Function return type must be specified." @@ -140,6 +145,7 @@ def read_gbq_function( ValueError, f"Unknown function '{routine_ref}'." ) + # TODO(493293086): Deprecate is_row_processor. if is_row_processor: return _try_import_row_routine(routine, session) else: diff --git a/bigframes/functions/udf_def.py b/bigframes/functions/udf_def.py index 0f3dc322c09..5f9e649619a 100644 --- a/bigframes/functions/udf_def.py +++ b/bigframes/functions/udf_def.py @@ -255,8 +255,11 @@ def with_devirtualize(self) -> UdfSignature: output=self.output.emulating_type, ) + # TODO(493293086): Deprecate is_row_processor. @classmethod - def from_routine(cls, routine: bigquery.Routine) -> UdfSignature: + def from_routine( + cls, routine: bigquery.Routine, is_row_processor: bool = False + ) -> UdfSignature: import bigframes.functions._utils ## Handle return type @@ -291,7 +294,20 @@ def from_routine(cls, routine: bigquery.Routine) -> UdfSignature: ## Handle input types udf_fields = [] + + if is_row_processor: + if len(routine.arguments) != 1: + raise ValueError( + "Row processor functions must have exactly one input argument." + ) + for argument in routine.arguments: + if is_row_processor: + if argument.data_type.type_kind != "STRING": + raise ValueError( + "Row processor functions must have STRING input type." + ) + udf_fields.append(UdfArg(argument.name, RowSeriesInputFieldV1())) udf_fields.append(UdfArg.from_sdk(argument)) return cls( @@ -361,8 +377,12 @@ def with_devirtualize(self) -> BigqueryUdf: ) @classmethod - def from_routine(cls, routine: bigquery.Routine) -> BigqueryUdf: - signature = UdfSignature.from_routine(routine) + def from_routine( + cls, routine: bigquery.Routine, is_row_processor: bool = False + ) -> BigqueryUdf: + signature = UdfSignature.from_routine( + routine, is_row_processor=is_row_processor + ) return cls(routine.reference, signature=signature) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 446c01e616d..7f78149b89a 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -1723,7 +1723,7 @@ def square(x): gcf = session.cloudfunctionsclient.get_function( name=square_remote_2.bigframes_cloud_function ) - assert gcf.service_config.available_cpu == 2.0 + assert float(gcf.service_config.available_cpu) == 2.0 finally: # clean up the gcp assets created for the remote function if square_remote is not None: From 79986ddf62fe3e0e52311a81cbe0294ad981ee90 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Tue, 17 Mar 2026 20:24:25 +0000 Subject: [PATCH 07/10] fix tests --- bigframes/core/sql/__init__.py | 4 +-- bigframes/functions/_function_client.py | 29 +++++++++++++------ bigframes/functions/_utils.py | 21 +++++++------- bigframes/functions/function.py | 4 +-- bigframes/functions/udf_def.py | 23 ++++++++------- bigframes/testing/utils.py | 2 +- .../large/functions/test_remote_function.py | 27 +++++------------ .../functions/test_remote_function_utils.py | 16 +--------- 8 files changed, 56 insertions(+), 70 deletions(-) diff --git a/bigframes/core/sql/__init__.py b/bigframes/core/sql/__init__.py index 6441fd60c9d..69a74b15ced 100644 --- a/bigframes/core/sql/__init__.py +++ b/bigframes/core/sql/__init__.py @@ -49,8 +49,8 @@ def identifier(name: str) -> str: - if len(name) > 63: - raise ValueError("Identifier must be less than 64 characters") + if len(name) > 256: + raise ValueError("Identifier must be less than 256 characters") return f"`{escape_chars(name)}`" diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 79f79432d4d..90c94a3fbc8 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -19,6 +19,7 @@ import logging import os import random +import re import shutil import string import tempfile @@ -196,10 +197,8 @@ def create_bq_remote_function( import bigframes.core.utils # removes anything that isn't letter, number or underscore - sql_func_legal_name = bigframes.core.utils.label_to_identifier( - name, strict=True - ) - bq_function_name_escaped = bigframes.core.sql.identifier(sql_func_legal_name) + _validate_routine_name(name) + bq_function_name_escaped = bigframes.core.sql.identifier(name) create_function_ddl = f""" CREATE OR REPLACE FUNCTION `{self._gcp_project_id}.{self._bq_dataset}`.{bq_function_name_escaped}({udf_def.signature.to_sql_input_signature()}) RETURNS {udf_def.signature.with_devirtualize().output.sql_type} @@ -263,7 +262,10 @@ def provision_bq_managed_function( # Augment user package requirements with any internal package # requirements. packages = _utils.get_updated_package_requirements( - packages, is_row_processor, capture_references, ignore_package_version=True + packages or [], + is_row_processor, + capture_references, + ignore_package_version=True, ) if packages: managed_function_options["packages"] = packages @@ -579,7 +581,7 @@ def provision_bq_remote_function( reuse: bool, name: str | None, package_requirements: tuple[str, ...], - max_batching_rows: int, + max_batching_rows: int | None, cloud_function_timeout: int | None, cloud_function_max_instance_count: int | None, cloud_function_vpc_connector: str | None, @@ -591,7 +593,7 @@ def provision_bq_remote_function( """Provision a BigQuery remote function.""" # Augment user package requirements with any internal package # requirements - package_requirements = _utils.get_updated_package_requirements( + full_package_requirements = _utils.get_updated_package_requirements( package_requirements, func_signature.is_row_processor ) @@ -611,7 +613,7 @@ def provision_bq_remote_function( concurrency = (workers * threads) if (expected_milli_cpus >= 1000) else 1 cloud_func_spec = udf_def.CloudRunFunctionConfig( - code=udf_def.CodeDef.from_func(def_, package_requirements), + code=udf_def.CodeDef.from_func(def_, full_package_requirements), signature=func_signature, timeout_seconds=cloud_function_timeout, max_instance_count=cloud_function_max_instance_count, @@ -655,7 +657,7 @@ def provision_bq_remote_function( intended_rf_spec = udf_def.RemoteFunctionConfig( endpoint=cf_endpoint, connection_id=self._bq_connection_id, - max_batching_rows=max_batching_rows, + max_batching_rows=max_batching_rows or 1000, signature=func_signature, bq_metadata=func_signature.protocol_metadata, ) @@ -728,6 +730,15 @@ def get_bigframes_function_name( return _BQ_FUNCTION_NAME_SEPERATOR.join(parts) +def _validate_routine_name(name: str) -> None: + """Validate that the given name is a valid BigQuery routine name.""" + # Routine IDs can contain only letters (a-z, A-Z), numbers (0-9), or underscores (_) + if not re.match(r"^[a-zA-Z0-9_]+$", name): + raise ValueError( + "Routine ID can contain only letters (a-z, A-Z), numbers (0-9), or underscores (_)" + ) + + def _infer_milli_cpus_from_memory(memory_mib: int) -> int: # observed values, not formally documented by cloud run functions if memory_mib < 128: diff --git a/bigframes/functions/_utils.py b/bigframes/functions/_utils.py index 3af989f27e7..46762c12f2a 100644 --- a/bigframes/functions/_utils.py +++ b/bigframes/functions/_utils.py @@ -75,12 +75,12 @@ def _package_existed(package_requirements: list[str], package: str) -> bool: def get_updated_package_requirements( - package_requirements=None, - is_row_processor=False, - capture_references=True, - ignore_package_version=False, -): - requirements = [] + package_requirements: Sequence[str] = (), + is_row_processor: bool = False, + capture_references: bool = True, + ignore_package_version: bool = False, +) -> Sequence[str]: + requirements: list[str] = [] if capture_references: requirements.append(f"cloudpickle=={cloudpickle.__version__}") @@ -110,13 +110,12 @@ def get_updated_package_requirements( if not requirements: return package_requirements - if not package_requirements: - package_requirements = [] + result = list(package_requirements) for package in requirements: - if not _package_existed(package_requirements, package): - package_requirements.append(package) + if not _package_existed(result, package): + result.append(package) - return sorted(package_requirements) + return sorted(result) def clean_up_by_session_id( diff --git a/bigframes/functions/function.py b/bigframes/functions/function.py index 69e59327c6a..18a000c722f 100644 --- a/bigframes/functions/function.py +++ b/bigframes/functions/function.py @@ -222,7 +222,7 @@ def output_dtype(self): @property def bigframes_bigquery_function_output_dtype(self): - return self.output_dtype + return self.udf_def.signature.output.emulating_type.bf_type class BigqueryCallableRowRoutine: @@ -296,4 +296,4 @@ def output_dtype(self): @property def bigframes_bigquery_function_output_dtype(self): - return self.output_dtype + return self.udf_def.signature.output.emulating_type.bf_type diff --git a/bigframes/functions/udf_def.py b/bigframes/functions/udf_def.py index 5f9e649619a..c3e22c1c558 100644 --- a/bigframes/functions/udf_def.py +++ b/bigframes/functions/udf_def.py @@ -295,19 +295,14 @@ def from_routine( ## Handle input types udf_fields = [] - if is_row_processor: - if len(routine.arguments) != 1: - raise ValueError( - "Row processor functions must have exactly one input argument." - ) - - for argument in routine.arguments: - if is_row_processor: - if argument.data_type.type_kind != "STRING": + for i, argument in enumerate(routine.arguments): + if is_row_processor and i == 0: + if argument.data_type.type_kind == "STRING": + udf_fields.append(UdfArg(argument.name, RowSeriesInputFieldV1())) + else: raise ValueError( - "Row processor functions must have STRING input type." + "Row processor functions must have STRING input type as first argument." ) - udf_fields.append(UdfArg(argument.name, RowSeriesInputFieldV1())) udf_fields.append(UdfArg.from_sdk(argument)) return cls( @@ -317,6 +312,8 @@ def from_routine( @classmethod def from_py_signature(cls, signature: inspect.Signature): + import bigframes.series + input_types: list[UdfArg] = [] for parameter in signature.parameters.values(): if parameter.annotation is inspect.Signature.empty: @@ -326,6 +323,10 @@ def from_py_signature(cls, signature: inspect.Signature): f"'{parameter.name}' is missing a type annotation. " "Types are required to use udfs.", ) + if parameter.annotation is bigframes.series.Series: + raise TypeError( + "Argument type hint must be Pandas Series, not BigFrames Series." + ) input_types.append(UdfArg.from_py_param(parameter)) diff --git a/bigframes/testing/utils.py b/bigframes/testing/utils.py index 26a944d760a..5f4a8d26276 100644 --- a/bigframes/testing/utils.py +++ b/bigframes/testing/utils.py @@ -513,7 +513,7 @@ def get_function_name(func, package_requirements=None, is_row_processor=False): # Augment user package requirements with any internal package # requirements. package_requirements = bff_utils.get_updated_package_requirements( - package_requirements, is_row_processor + package_requirements or [], is_row_processor ) # Compute a unique hash representing the user code. diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index 7346cb80e4f..a0ec847e3bc 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -33,7 +33,6 @@ import bigframes.dataframe import bigframes.dtypes import bigframes.exceptions -import bigframes.functions._utils as bff_utils import bigframes.pandas as bpd import bigframes.series from bigframes.testing.utils import ( @@ -527,22 +526,6 @@ def add_one(x): # Make a unique udf add_one_uniq, add_one_uniq_dir = make_uniq_udf(add_one) - # Expected cloud function name for the unique udf - package_requirements = bff_utils.get_updated_package_requirements() - add_one_uniq_hash = bff_utils.get_hash(add_one_uniq, package_requirements) - add_one_uniq_cf_name = f"bff_{add_one_uniq_hash}_{session.session_id}" - - # There should be no cloud function yet for the unique udf - cloud_functions = list( - get_cloud_functions( - session.cloudfunctionsclient, - session.bqclient.project, - session.bqclient.location, - name=add_one_uniq_cf_name, - ) - ) - assert len(cloud_functions) == 0 - # The first time both the cloud function and the bq remote function don't # exist and would be created remote_add_one = session.remote_function( @@ -554,6 +537,9 @@ def add_one(x): cloud_function_service_account="default", )(add_one_uniq) + assert remote_add_one.bigframes_cloud_function is not None + add_one_uniq_cf_name = remote_add_one.bigframes_cloud_function.split("/")[-1] + # There should have been excactly one cloud function created at this point cloud_functions = list( get_cloud_functions( @@ -1561,7 +1547,9 @@ def square(x): bq_routine = session.bqclient.get_routine( square_remote.bigframes_bigquery_function ) - assert bq_routine.remote_function_options.max_batching_rows == max_batching_rows + assert bq_routine.remote_function_options.max_batching_rows == ( + max_batching_rows or 1000 + ) scalars_df, scalars_pandas_df = scalars_dfs @@ -1690,7 +1678,8 @@ def test_remote_function_reflects_config_change_with_reuse(session): def square(x): return x * x - deploy_name = str(uuid.uuid4()) + # random alphanumeric name + deploy_name = str(uuid.uuid4().hex) square_remote = session.remote_function( input_types=[int], name=deploy_name, diff --git a/tests/unit/functions/test_remote_function_utils.py b/tests/unit/functions/test_remote_function_utils.py index a2687761270..dcf60587675 100644 --- a/tests/unit/functions/test_remote_function_utils.py +++ b/tests/unit/functions/test_remote_function_utils.py @@ -41,20 +41,6 @@ def test_get_remote_function_locations( assert cf_region == expected_cf_region -def test_get_updated_package_requirements_no_extra_package(): - """Tests with no extra package.""" - result = _utils.get_updated_package_requirements(capture_references=False) - - assert result is None - - initial_packages = ["xgboost"] - result = _utils.get_updated_package_requirements( - initial_packages, capture_references=False - ) - - assert result == initial_packages - - @patch("bigframes.functions._utils.numpy.__version__", "1.24.4") @patch("bigframes.functions._utils.pyarrow.__version__", "14.0.1") @patch("bigframes.functions._utils.pandas.__version__", "2.0.3") @@ -100,7 +86,7 @@ def test_get_updated_package_requirements_capture_references_false(): # Case 1: Only capture_references=False. result_1 = _utils.get_updated_package_requirements(capture_references=False) - assert result_1 is None + assert len(result_1) == 0 # Case 2: capture_references=False but is_row_processor=True. expected_2 = ["numpy==1.24.4", "pandas==2.0.3", "pyarrow==14.0.1"] From f38799cb88698a0e71cd65306c4664e129da67f0 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 18 Mar 2026 00:05:06 +0000 Subject: [PATCH 08/10] handle post-processing virtualized array type --- .../ibis_compiler/scalar_op_registry.py | 7 ++++++ .../compile/sqlglot/expressions/array_ops.py | 22 +++++++++++++++++++ bigframes/functions/_utils.py | 14 ------------ bigframes/functions/udf_def.py | 13 +++++++++-- bigframes/operations/__init__.py | 2 ++ bigframes/operations/array_ops.py | 14 ++++++++++++ 6 files changed, 56 insertions(+), 16 deletions(-) diff --git a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py index 10698366fee..5cc6cfcd7b0 100644 --- a/bigframes/core/compile/ibis_compiler/scalar_op_registry.py +++ b/bigframes/core/compile/ibis_compiler/scalar_op_registry.py @@ -1156,6 +1156,13 @@ def array_reduce_op_impl(x: ibis_types.Value, op: ops.ArrayReduceOp): ) +@scalar_op_compiler.register_unary_op(ops.ArrayMapOp, pass_op=True) +def array_map_op_impl(x: ibis_types.Value, op: ops.ArrayMapOp): + return typing.cast(ibis_types.ArrayValue, x).map( + lambda arr_vals: scalar_op_compiler.compile_row_op(op.map_op, (arr_vals,)) + ) + + # JSON Ops @scalar_op_compiler.register_binary_op(ops.JSONSet, pass_op=True) def json_set_op_impl(x: ibis_types.Value, y: ibis_types.Value, op: ops.JSONSet): diff --git a/bigframes/core/compile/sqlglot/expressions/array_ops.py b/bigframes/core/compile/sqlglot/expressions/array_ops.py index eb7582cb168..165bad09960 100644 --- a/bigframes/core/compile/sqlglot/expressions/array_ops.py +++ b/bigframes/core/compile/sqlglot/expressions/array_ops.py @@ -73,6 +73,28 @@ def _(expr: TypedExpr, op: ops.ArrayReduceOp) -> sge.Expression: ) +@register_unary_op(ops.ArrayMapOp, pass_op=True) +def _(expr: TypedExpr, op: ops.ArrayMapOp) -> sge.Expression: + sub_expr = sg.to_identifier("bf_arr_map_uid") + sub_type = dtypes.get_array_inner_type(expr.dtype) + + # TODO: Expression should be provided instead of invoking compiler manually + map_expr = expression_compiler.expression_compiler.compile_row_op( + op.map_op, (TypedExpr(sub_expr, sub_type),) + ) + + return sge.array( + sge.select(map_expr) + .from_( + sge.Unnest( + expressions=[expr.expr], + alias=sge.TableAlias(columns=[sub_expr]), + ) + ) + .subquery() + ) + + @register_unary_op(ops.ArraySliceOp, pass_op=True) def _(expr: TypedExpr, op: ops.ArraySliceOp) -> sge.Expression: if expr.dtype == dtypes.STRING_DTYPE: diff --git a/bigframes/functions/_utils.py b/bigframes/functions/_utils.py index 46762c12f2a..6f7f43e4124 100644 --- a/bigframes/functions/_utils.py +++ b/bigframes/functions/_utils.py @@ -292,20 +292,6 @@ def get_python_version(is_compat: bool = False) -> str: return f"python{major}{minor}" if is_compat else f"python-{major}.{minor}" -def build_unnest_post_routine(py_list_type: type[list]): - sdk_type = function_typing.sdk_array_output_type_from_python_type(py_list_type) - assert sdk_type.array_element_type is not None - inner_sdk_type = sdk_type.array_element_type - result_dtype = function_typing.sdk_type_to_bf_type(inner_sdk_type) - - def post_process(input): - import bigframes.bigquery as bbq - - return bbq.json_extract_string_array(input, value_dtype=result_dtype) - - return post_process - - def has_conflict_input_type( signature: inspect.Signature, input_types: Sequence[Any], diff --git a/bigframes/functions/udf_def.py b/bigframes/functions/udf_def.py index c3e22c1c558..62ea03c662f 100644 --- a/bigframes/functions/udf_def.py +++ b/bigframes/functions/udf_def.py @@ -158,10 +158,19 @@ def emulating_type(self) -> DirectScalarType: def out_expr( self, expr: bigframes.core.expression.Expression ) -> bigframes.core.expression.Expression: + # essentially we are undoing json.dumps in sql import bigframes.operations as ops - # convert json string to array of underlying type - return ops.JSONValueArray(json_path="$").as_expr(expr) + as_str_list = ops.JSONValueArray(json_path="$").as_expr(expr) + if self.inner_dtype.py_type is str: + return as_str_list + elif self.inner_dtype.py_type is bool: + # TODO: hack so we don't need to make ArrayMap support general expressions yet + return ops.ArrayMapOp(ops.IsInOp(values=("true",))).as_expr(as_str_list) + else: + return ops.ArrayMapOp(ops.AsTypeOp(self.inner_dtype.bf_type)).as_expr( + as_str_list + ) @property def sql_type(self) -> str: diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index a1c7754ab5c..9f585843b84 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -25,6 +25,7 @@ ) from bigframes.operations.array_ops import ( ArrayIndexOp, + ArrayMapOp, ArrayReduceOp, ArraySliceOp, ArrayToStringOp, @@ -440,4 +441,5 @@ "NUMPY_TO_OP", "ToArrayOp", "ArrayReduceOp", + "ArrayMapOp", ] diff --git a/bigframes/operations/array_ops.py b/bigframes/operations/array_ops.py index 61ada59cc7b..9126e235410 100644 --- a/bigframes/operations/array_ops.py +++ b/bigframes/operations/array_ops.py @@ -88,3 +88,17 @@ def output_type(self, *input_types): assert dtypes.is_array_like(input_type) inner_type = dtypes.get_array_inner_type(input_type) return self.aggregation.output_type(inner_type) + + +@dataclasses.dataclass(frozen=True) +class ArrayMapOp(base_ops.UnaryOp): + name: typing.ClassVar[str] = "array_map" + # TODO: Generalize to chained expressions + map_op: base_ops.UnaryOp + + def output_type(self, *input_types): + input_type = input_types[0] + assert dtypes.is_array_like(input_type) + inner_type = dtypes.get_array_inner_type(input_type) + out_inner_type = self.map_op.output_type(inner_type) + return dtypes.list_type(out_inner_type) From f713798e7d21424fb294fb6f88dde619e28e6366 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 18 Mar 2026 19:23:47 +0000 Subject: [PATCH 09/10] update routine name restriction --- bigframes/functions/_function_client.py | 3 ++- tests/system/large/functions/test_remote_function.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 90c94a3fbc8..0906c71f84f 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -733,7 +733,8 @@ def get_bigframes_function_name( def _validate_routine_name(name: str) -> None: """Validate that the given name is a valid BigQuery routine name.""" # Routine IDs can contain only letters (a-z, A-Z), numbers (0-9), or underscores (_) - if not re.match(r"^[a-zA-Z0-9_]+$", name): + # must also start with a letter or underscore only + if not re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name): raise ValueError( "Routine ID can contain only letters (a-z, A-Z), numbers (0-9), or underscores (_)" ) diff --git a/tests/system/large/functions/test_remote_function.py b/tests/system/large/functions/test_remote_function.py index a0ec847e3bc..94fb93db5f3 100644 --- a/tests/system/large/functions/test_remote_function.py +++ b/tests/system/large/functions/test_remote_function.py @@ -1678,8 +1678,8 @@ def test_remote_function_reflects_config_change_with_reuse(session): def square(x): return x * x - # random alphanumeric name - deploy_name = str(uuid.uuid4().hex) + # random alphanumeric name starting with a letter + deploy_name = "a" + str(uuid.uuid4().hex) square_remote = session.remote_function( input_types=[int], name=deploy_name, From 4f982fa0e016eb25d073f2dc1534e5b19eb97f75 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 23 Mar 2026 21:58:43 +0000 Subject: [PATCH 10/10] pr comments --- bigframes/functions/_function_client.py | 1 + bigframes/functions/_utils.py | 1 - bigframes/functions/udf_def.py | 24 ++++++++++++------------ bigframes/operations/array_ops.py | 2 +- setup.py | 1 + 5 files changed, 15 insertions(+), 14 deletions(-) diff --git a/bigframes/functions/_function_client.py b/bigframes/functions/_function_client.py index 0906c71f84f..4b368f48ccf 100644 --- a/bigframes/functions/_function_client.py +++ b/bigframes/functions/_function_client.py @@ -229,6 +229,7 @@ def provision_bq_managed_function( # TODO(b/406283812): Expose the capability to pass down # capture_references=True in the public udf API. + # TODO(b/495508827): Include all config in the value hash. if ( capture_references and (python_version := _utils.get_python_version()) diff --git a/bigframes/functions/_utils.py b/bigframes/functions/_utils.py index 6f7f43e4124..c197ed14fc3 100644 --- a/bigframes/functions/_utils.py +++ b/bigframes/functions/_utils.py @@ -191,7 +191,6 @@ def get_managed_function_name( session_id: str | None = None, ): """Get a name for the bigframes managed function for the given user defined function.""" - # TODO: Move over to logic used by remote functions parts = [_BIGFRAMES_FUNCTION_PREFIX] if session_id: parts.append(session_id) diff --git a/bigframes/functions/udf_def.py b/bigframes/functions/udf_def.py index 62ea03c662f..f02f289ef66 100644 --- a/bigframes/functions/udf_def.py +++ b/bigframes/functions/udf_def.py @@ -15,7 +15,6 @@ import dataclasses import functools -import hashlib import inspect import io import os @@ -25,6 +24,7 @@ import cloudpickle from google.cloud import bigquery +import google_crc32c import pandas as pd import bigframes.dtypes @@ -84,7 +84,7 @@ def sql_type(self) -> str: return self.dtype.sql_type def stable_hash(self) -> bytes: - hash_val = hashlib.md5() + hash_val = google_crc32c.Checksum() hash_val.update(self.name.encode()) hash_val.update(self.dtype.stable_hash()) return hash_val.digest() @@ -116,7 +116,7 @@ def sql_type(self) -> str: return function_typing.sdk_type_to_sql_string(sdk_type) def stable_hash(self) -> bytes: - hash_val = hashlib.md5() + hash_val = google_crc32c.Checksum() hash_val.update(self._py_type.__name__.encode()) return hash_val.digest() @@ -145,7 +145,6 @@ class VirtualListTypeV1: def py_type(self) -> Type[list[Any]]: return list[self.inner_dtype.py_type] # type: ignore - # TODO: Specify emulating type and mapping expressions between said types @property def bf_type(self) -> bigframes.dtypes.Dtype: return bigframes.dtypes.list_type(self.inner_dtype.bf_type) @@ -165,7 +164,8 @@ def out_expr( if self.inner_dtype.py_type is str: return as_str_list elif self.inner_dtype.py_type is bool: - # TODO: hack so we don't need to make ArrayMap support general expressions yet + # hack so we don't need to make ArrayMap support general expressions yet + # with b/495513753 we can map the equality operator instead return ops.ArrayMapOp(ops.IsInOp(values=("true",))).as_expr(as_str_list) else: return ops.ArrayMapOp(ops.AsTypeOp(self.inner_dtype.bf_type)).as_expr( @@ -177,7 +177,7 @@ def sql_type(self) -> str: return f"ARRAY<{self.inner_dtype.sql_type}>" def stable_hash(self) -> bytes: - hash_val = hashlib.md5() + hash_val = google_crc32c.Checksum() hash_val.update(self._PROTOCOL_ID.encode()) hash_val.update(self.inner_dtype.stable_hash()) return hash_val.digest() @@ -212,7 +212,7 @@ def emulating_type(self) -> DirectScalarType: return DirectScalarType(str) def stable_hash(self) -> bytes: - hash_val = hashlib.md5() + hash_val = google_crc32c.Checksum() hash_val.update(self._PROTOCOL_ID.encode()) return hash_val.digest() @@ -227,6 +227,7 @@ class UdfSignature: output: DirectScalarType | VirtualListTypeV1 def __post_init__(self): + # Validate inputs and outputs are of the correct types. assert all(isinstance(arg, UdfArg) for arg in self.inputs) assert isinstance(self.output, (DirectScalarType, VirtualListTypeV1)) @@ -240,7 +241,6 @@ def to_sql_input_signature(self) -> str: def protocol_metadata(self) -> str | None: import bigframes.functions._utils - # TODO: The output field itself should handle this, to handle protocol versioning. if isinstance(self.output, VirtualListTypeV1): return bigframes.functions._utils.get_bigframes_metadata( python_output_type=self.output.py_type @@ -362,7 +362,7 @@ def to_remote_function_compatible(self) -> UdfSignature: return self def stable_hash(self) -> bytes: - hash_val = hashlib.md5() + hash_val = google_crc32c.Checksum() for input_type in self.inputs: hash_val.update(input_type.stable_hash()) hash_val.update(self.output.stable_hash()) @@ -438,7 +438,7 @@ def stable_hash(self) -> bytes: def_copy, protocol=_pickle_protocol_version ) - hash_val = hashlib.md5() + hash_val = google_crc32c.Checksum() hash_val.update(normalized_pickled_code) if self.package_requirements: @@ -464,7 +464,7 @@ class CloudRunFunctionConfig: concurrency: int | None def stable_hash(self) -> bytes: - hash_val = hashlib.md5() + hash_val = google_crc32c.Checksum() hash_val.update(self.code.stable_hash()) hash_val.update(self.signature.stable_hash()) hash_val.update(str(self.timeout_seconds).encode()) @@ -503,7 +503,7 @@ def from_bq_routine(cls, routine: bigquery.Routine) -> RemoteFunctionConfig: ) def stable_hash(self) -> bytes: - hash_val = hashlib.md5() + hash_val = google_crc32c.Checksum() hash_val.update(self.endpoint.encode()) hash_val.update(self.signature.stable_hash()) hash_val.update(self.connection_id.encode()) diff --git a/bigframes/operations/array_ops.py b/bigframes/operations/array_ops.py index 9126e235410..c5694e50baa 100644 --- a/bigframes/operations/array_ops.py +++ b/bigframes/operations/array_ops.py @@ -93,7 +93,7 @@ def output_type(self, *input_types): @dataclasses.dataclass(frozen=True) class ArrayMapOp(base_ops.UnaryOp): name: typing.ClassVar[str] = "array_map" - # TODO: Generalize to chained expressions + # TODO(b/495513753): Generalize to chained expressions map_op: base_ops.UnaryOp def output_type(self, *input_types): diff --git a/setup.py b/setup.py index 2179fe3e964..e22b52442d5 100644 --- a/setup.py +++ b/setup.py @@ -46,6 +46,7 @@ "google-cloud-bigquery-connection >=1.12.0", "google-cloud-resource-manager >=1.10.3", "google-cloud-storage >=2.0.0", + "google-crc32c >=1.0.0,<2.0.0", "grpc-google-iam-v1 >= 0.14.2", "numpy >=1.24.0", "pandas >=1.5.3",