diff --git a/bigframes/_config/bigquery_options.py b/bigframes/_config/bigquery_options.py index e1e8129ca3..f81ee90072 100644 --- a/bigframes/_config/bigquery_options.py +++ b/bigframes/_config/bigquery_options.py @@ -96,6 +96,7 @@ def __init__( Tuple[str, requests.adapters.BaseAdapter] ] = (), enable_polars_execution: bool = False, + enable_datafusion_execution: bool = False, ): self._credentials = credentials self._project = project @@ -119,6 +120,10 @@ def __init__( bigframes._importing.import_polars() self._enable_polars_execution = enable_polars_execution + if enable_datafusion_execution: + bigframes._importing.import_datafusion() + self._enable_datafusion_execution = enable_datafusion_execution + @property def application_name(self) -> Optional[str]: """The application name to amend to the user-agent sent to Google APIs. @@ -503,3 +508,29 @@ def enable_polars_execution(self, value: bool): warnings.warn(msg, category=bfe.PreviewWarning) bigframes._importing.import_polars() self._enable_polars_execution = value + + @property + def enable_datafusion_execution(self) -> bool: + """If True, will use datafusion to execute some simple query plans locally. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> bpd.options.bigquery.enable_datafusion_execution = True # doctest: +SKIP + + """ + return self._enable_datafusion_execution + + @enable_datafusion_execution.setter + def enable_datafusion_execution(self, value: bool): + if self._session_started and self._enable_datafusion_execution != value: + raise ValueError( + SESSION_STARTED_MESSAGE.format(attribute="enable_datafusion_execution") + ) + if value is True: + msg = bfe.format_message( + "DataFusion execution is an experimental feature, and may not be stable. Must have datafusion installed." + ) + warnings.warn(msg, category=bfe.PreviewWarning) + bigframes._importing.import_datafusion() + self._enable_datafusion_execution = value diff --git a/bigframes/_importing.py b/bigframes/_importing.py index e88bd77fe8..415cadf1f3 100644 --- a/bigframes/_importing.py +++ b/bigframes/_importing.py @@ -33,3 +33,17 @@ def import_polars() -> ModuleType: f"Imported polars version is likely below the minimum version: {POLARS_MIN_VERSION}" ) return polars_module + + +DATAFUSION_MIN_VERSION = version.Version("52.0.0") + + +def import_datafusion() -> ModuleType: + datafusion_module = importlib.import_module("datafusion") + # Add any version checks if necessary, for now just check it imports + df_version = version.Version(datafusion_module.__version__) + if df_version < DATAFUSION_MIN_VERSION: + raise ImportError( + f"Imported datafusion version {df_version} is below the minimum version: {DATAFUSION_MIN_VERSION}" + ) + return datafusion_module diff --git a/bigframes/core/compile/datafusion/__init__.py b/bigframes/core/compile/datafusion/__init__.py new file mode 100644 index 0000000000..de0dab378b --- /dev/null +++ b/bigframes/core/compile/datafusion/__init__.py @@ -0,0 +1,39 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Compiler for BigFrames expression to Apache DataFusion expression. + +Make sure to import all datafusion implementations here so that they get registered. +""" +from __future__ import annotations + +import warnings + +import bigframes.core.compile.datafusion.operations.comparison_ops # noqa: F401 + +# The ops imports appear first so that the implementations can be registered. +import bigframes.core.compile.datafusion.operations.generic_ops # noqa: F401 +import bigframes.core.compile.datafusion.operations.numeric_ops # noqa: F401 + +try: + import bigframes._importing + + bigframes._importing.import_datafusion() + + from bigframes.core.compile.datafusion.compiler import DataFusionCompiler + + __all__ = ["DataFusionCompiler"] +except Exception as exc: + msg = f"DataFusion compiler not available as there was an exception importing datafusion. Details: {str(exc)}" + warnings.warn(msg) diff --git a/bigframes/core/compile/datafusion/compiler.py b/bigframes/core/compile/datafusion/compiler.py new file mode 100644 index 0000000000..7442294123 --- /dev/null +++ b/bigframes/core/compile/datafusion/compiler.py @@ -0,0 +1,239 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import dataclasses +import functools +from typing import Type, TYPE_CHECKING + +import pandas as pd +import pyarrow as pa + +import bigframes.core +from bigframes.core import agg_expressions, nodes +import bigframes.core.expression as ex +import bigframes.dtypes +import bigframes.operations as ops + +datafusion_installed = True +if TYPE_CHECKING: + import datafusion +else: + try: + import bigframes._importing + + datafusion = bigframes._importing.import_datafusion() + except Exception: + datafusion_installed = False + + +def register_op(op: Type): + """Register a compilation from BigFrames to DataFusion. + + This decorator can be used, even if DataFusion is not installed. + + Args: + op: The type of the operator the wrapped function compiles. + """ + + def decorator(func): + if datafusion_installed: + return DataFusionExpressionCompiler.compile_op.register(op)(func) # type: ignore + else: + return func + + return decorator + + +if datafusion_installed: + _DTYPE_MAPPING = { + bigframes.dtypes.INT_DTYPE: pa.int64(), + bigframes.dtypes.FLOAT_DTYPE: pa.float64(), + bigframes.dtypes.BOOL_DTYPE: pa.bool_(), + bigframes.dtypes.STRING_DTYPE: pa.string(), + # For now, map numeric to double or decimal if supported + bigframes.dtypes.NUMERIC_DTYPE: pa.decimal128(38, 9), + bigframes.dtypes.BIGNUMERIC_DTYPE: pa.decimal256(76, 38), + bigframes.dtypes.BYTES_DTYPE: pa.binary(), + bigframes.dtypes.DATE_DTYPE: pa.date32(), + bigframes.dtypes.DATETIME_DTYPE: pa.timestamp("us"), + bigframes.dtypes.TIMESTAMP_DTYPE: pa.timestamp("us", tz="UTC"), + bigframes.dtypes.TIME_DTYPE: pa.time64("us"), + bigframes.dtypes.TIMEDELTA_DTYPE: pa.duration("us"), + bigframes.dtypes.GEO_DTYPE: pa.string(), + bigframes.dtypes.JSON_DTYPE: pa.string(), + } + + def _bigframes_dtype_to_arrow_dtype( + dtype: bigframes.dtypes.ExpressionType, + ) -> pa.DataType: + if dtype is None: + return pa.null() + # TODO: Add struct and array handling if needed + return _DTYPE_MAPPING[dtype] + + @dataclasses.dataclass(frozen=True) + class DataFusionExpressionCompiler: + """ + Compiler for converting bigframes expressions to datafusion expressions. + """ + + @functools.singledispatchmethod + def compile_expression(self, expression: ex.Expression) -> datafusion.Expr: + raise NotImplementedError(f"Cannot compile expression: {expression}") + + @compile_expression.register + def _( + self, + expression: ex.ScalarConstantExpression, + ) -> datafusion.Expr: + value = expression.value + if not isinstance(value, float) and pd.isna(value): # type: ignore + value = None + if expression.dtype is None: + return datafusion.lit(None) + + # DataFusion lit handles standard types + return datafusion.lit(value) + + @compile_expression.register + def _( + self, + expression: ex.DerefOp, + ) -> datafusion.Expr: + return datafusion.col(expression.id.sql) + + @compile_expression.register + def _( + self, + expression: ex.ResolvedDerefOp, + ) -> datafusion.Expr: + return datafusion.col(expression.id.sql) + + @compile_expression.register + def _( + self, + expression: ex.OpExpression, + ) -> datafusion.Expr: + op = expression.op + args = tuple(map(self.compile_expression, expression.inputs)) + return self.compile_op(op, *args) + + @functools.singledispatchmethod + def compile_op( + self, op: ops.ScalarOp, *args: datafusion.Expr + ) -> datafusion.Expr: + raise NotImplementedError(f"DataFusion compiler hasn't implemented {op}") + + # Add basic ops here, others via register_op + # df expressions overload operators usually + + @dataclasses.dataclass(frozen=True) + class DataFusionAggregateCompiler: + scalar_compiler = DataFusionExpressionCompiler() + + def compile_agg_expr(self, expr: agg_expressions.Aggregation): + # Skeleton for now + raise NotImplementedError("Aggregate compilation not implemented") + + @dataclasses.dataclass(frozen=True) + class DataFusionCompiler: + """ + Compiles BigFrameNode to DataFusion DataFrame. + """ + + expr_compiler = DataFusionExpressionCompiler() + agg_compiler = DataFusionAggregateCompiler() + + def compile(self, plan: nodes.BigFrameNode) -> datafusion.DataFrame: + if not datafusion_installed: + raise ValueError( + "DataFusion is not installed, cannot compile to datafusion engine." + ) + + from bigframes.core.compile.datafusion import lowering + + node = lowering.lower_ops_to_datafusion(plan) + return self.compile_node(node) + + @functools.singledispatchmethod + def compile_node(self, node: nodes.BigFrameNode) -> datafusion.DataFrame: + raise ValueError(f"Can't compile unrecognized node: {node}") + + @compile_node.register + def compile_readlocal(self, node: nodes.ReadLocalNode): + # Need SessionContext, maybe pass it in or create one + ctx = datafusion.SessionContext() + df = ctx.from_arrow(node.local_data_source.data) + + cols_to_read = { + scan_item.source_id: scan_item.id.sql + for scan_item in node.scan_list.items + } + # Rename columns + # DataFusion select can take list of expressions + exprs = [ + datafusion.col(orig).alias(new) for orig, new in cols_to_read.items() + ] + df = df.select(*exprs) + + if node.offsets_col: + # DataFusion has row_number()? + # But ReadLocalNode usually has small data, could just use arrow offsets if needed + # For now, let's just make it error if offsets_col is requested and see + raise NotImplementedError( + "offsets_col in ReadLocalNode not supported yet for DataFusion" + ) + return df + + @compile_node.register + def compile_filter(self, node: nodes.FilterNode): + return self.compile_node(node.child).filter( + self.expr_compiler.compile_expression(node.predicate) + ) + + @compile_node.register + def compile_selection(self, node: nodes.SelectionNode): + df = self.compile_node(node.child) + exprs = [ + datafusion.col(orig.id.sql).alias(new.sql) + for orig, new in node.input_output_pairs + ] + return df.select(*exprs) + + @compile_node.register + def compile_projection(self, node: nodes.ProjectionNode): + df = self.compile_node(node.child) + new_cols = [] + for proj_expr, name in node.assignments: + # bind_schema_fields might be needed + bound_expr = ex.bind_schema_fields(proj_expr, node.child.field_by_id) + new_col = self.expr_compiler.compile_expression(bound_expr).alias( + name.sql + ) + new_cols.append(new_col) + + # with_columns takes dict or list of aliases? + # DF DataFrame has with_column + for col in new_cols: + # df = df.with_column(col) # wait, with_column usually takes name and expr + # let's see df.select(*existing, new) + pass + # Better to use select with existing columns + new columns + new_names = [name.sql for _, name in node.assignments] + filtered_existing = [ + datafusion.col(c) for c in df.schema().names if c not in new_names + ] + return df.select(*(filtered_existing + new_cols)) diff --git a/bigframes/core/compile/datafusion/lowering.py b/bigframes/core/compile/datafusion/lowering.py new file mode 100644 index 0000000000..771b0fefbb --- /dev/null +++ b/bigframes/core/compile/datafusion/lowering.py @@ -0,0 +1,23 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bigframes.core import bigframe_node + + +def lower_ops_to_datafusion( + root: bigframe_node.BigFrameNode, +) -> bigframe_node.BigFrameNode: + """Lower operations for DataFusion execution.""" + # Skeleton implementation, returns node as-is + return root diff --git a/bigframes/core/compile/datafusion/operations/__init__.py b/bigframes/core/compile/datafusion/operations/__init__.py new file mode 100644 index 0000000000..8b51074086 --- /dev/null +++ b/bigframes/core/compile/datafusion/operations/__init__.py @@ -0,0 +1,16 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""DataFusion implementations for BigFrames operations.""" +from __future__ import annotations diff --git a/bigframes/core/compile/datafusion/operations/comparison_ops.py b/bigframes/core/compile/datafusion/operations/comparison_ops.py new file mode 100644 index 0000000000..230dfb62fa --- /dev/null +++ b/bigframes/core/compile/datafusion/operations/comparison_ops.py @@ -0,0 +1,87 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +BigFrames -> DataFusion compilation for the operations in bigframes.operations.comparison_ops. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import bigframes.core.compile.datafusion.compiler as df_compiler +from bigframes.operations import comparison_ops + +if TYPE_CHECKING: + import datafusion + + +@df_compiler.register_op(comparison_ops.EqOp) +def eq_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: comparison_ops.EqOp, # type: ignore + left: datafusion.Expr, + right: datafusion.Expr, +) -> datafusion.Expr: + return left == right + + +@df_compiler.register_op(comparison_ops.NeOp) +def ne_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: comparison_ops.NeOp, # type: ignore + left: datafusion.Expr, + right: datafusion.Expr, +) -> datafusion.Expr: + return left != right + + +@df_compiler.register_op(comparison_ops.LtOp) +def lt_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: comparison_ops.LtOp, # type: ignore + left: datafusion.Expr, + right: datafusion.Expr, +) -> datafusion.Expr: + return left < right + + +@df_compiler.register_op(comparison_ops.LeOp) +def le_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: comparison_ops.LeOp, # type: ignore + left: datafusion.Expr, + right: datafusion.Expr, +) -> datafusion.Expr: + return left <= right + + +@df_compiler.register_op(comparison_ops.GtOp) +def gt_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: comparison_ops.GtOp, # type: ignore + left: datafusion.Expr, + right: datafusion.Expr, +) -> datafusion.Expr: + return left > right + + +@df_compiler.register_op(comparison_ops.GeOp) +def ge_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: comparison_ops.GeOp, # type: ignore + left: datafusion.Expr, + right: datafusion.Expr, +) -> datafusion.Expr: + return left >= right diff --git a/bigframes/core/compile/datafusion/operations/generic_ops.py b/bigframes/core/compile/datafusion/operations/generic_ops.py new file mode 100644 index 0000000000..4ef0a47d3c --- /dev/null +++ b/bigframes/core/compile/datafusion/operations/generic_ops.py @@ -0,0 +1,59 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import datafusion +import datafusion.functions as f + +import bigframes.core.compile.datafusion.compiler as df_compiler +from bigframes.operations import generic_ops + + +@df_compiler.register_op(generic_ops.IsNullOp) +def isnull_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: generic_ops.IsNullOp, # type: ignore + input: datafusion.Expr, +) -> datafusion.Expr: + return input.is_null() + + +@df_compiler.register_op(generic_ops.NotNullOp) +def notnull_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: generic_ops.NotNullOp, # type: ignore + input: datafusion.Expr, +) -> datafusion.Expr: + return input.is_not_null() + + +@df_compiler.register_op(generic_ops.CoalesceOp) +def coalesce_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: generic_ops.CoalesceOp, # type: ignore + *args: datafusion.Expr, +) -> datafusion.Expr: + return f.coalesce(*args) + + +@df_compiler.register_op(generic_ops.WhereOp) +def where_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: generic_ops.WhereOp, # type: ignore + cond: datafusion.Expr, + value: datafusion.Expr, + other: datafusion.Expr, +) -> datafusion.Expr: + return f.when(cond, value).otherwise(other) diff --git a/bigframes/core/compile/datafusion/operations/numeric_ops.py b/bigframes/core/compile/datafusion/operations/numeric_ops.py new file mode 100644 index 0000000000..b712fc7b66 --- /dev/null +++ b/bigframes/core/compile/datafusion/operations/numeric_ops.py @@ -0,0 +1,124 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import datafusion +import datafusion.functions as f + +import bigframes.core.compile.datafusion.compiler as df_compiler +from bigframes.operations import numeric_ops + + +@df_compiler.register_op(numeric_ops.AddOp) +def add_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.AddOp, # type: ignore + left: datafusion.Expr, + right: datafusion.Expr, +) -> datafusion.Expr: + return left + right + + +@df_compiler.register_op(numeric_ops.SubOp) +def sub_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.SubOp, # type: ignore + left: datafusion.Expr, + right: datafusion.Expr, +) -> datafusion.Expr: + return left - right + + +@df_compiler.register_op(numeric_ops.MulOp) +def mul_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.MulOp, # type: ignore + left: datafusion.Expr, + right: datafusion.Expr, +) -> datafusion.Expr: + return left * right + + +@df_compiler.register_op(numeric_ops.DivOp) +def div_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.DivOp, # type: ignore + left: datafusion.Expr, + right: datafusion.Expr, +) -> datafusion.Expr: + return left / right + + +@df_compiler.register_op(numeric_ops.AbsOp) +def abs_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.AbsOp, # type: ignore + input: datafusion.Expr, +) -> datafusion.Expr: + return f.abs(input) + + +@df_compiler.register_op(numeric_ops.FloorOp) +def floor_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.FloorOp, # type: ignore + input: datafusion.Expr, +) -> datafusion.Expr: + return f.floor(input) + + +@df_compiler.register_op(numeric_ops.CeilOp) +def ceil_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.CeilOp, # type: ignore + input: datafusion.Expr, +) -> datafusion.Expr: + return f.ceil(input) + + +@df_compiler.register_op(numeric_ops.SqrtOp) +def sqrt_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.SqrtOp, # type: ignore + input: datafusion.Expr, +) -> datafusion.Expr: + return f.sqrt(input) + + +@df_compiler.register_op(numeric_ops.ExpOp) +def exp_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.ExpOp, # type: ignore + input: datafusion.Expr, +) -> datafusion.Expr: + return f.exp(input) + + +@df_compiler.register_op(numeric_ops.LnOp) +def ln_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.LnOp, # type: ignore + input: datafusion.Expr, +) -> datafusion.Expr: + return f.ln(input) + + +@df_compiler.register_op(numeric_ops.Log10Op) +def log10_op_impl( + compiler: df_compiler.DataFusionExpressionCompiler, + op: numeric_ops.Log10Op, # type: ignore + input: datafusion.Expr, +) -> datafusion.Expr: + return f.log10(input) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 0a2f2db189..dcaf52c6bc 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -277,6 +277,7 @@ def __init__( storage_manager=self._temp_storage_manager, metrics=self._metrics, enable_polars_execution=context.enable_polars_execution, + enable_datafusion_execution=context.enable_datafusion_execution, publisher=self._publisher, labels=labels, ) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index cf275154ce..c0ef02a8e1 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -78,6 +78,7 @@ def __init__( *, metrics: Optional[bigframes.session.metrics.ExecutionMetrics] = None, enable_polars_execution: bool = False, + enable_datafusion_execution: bool = False, publisher: bigframes.core.events.Publisher, labels: Mapping[str, str] = {}, ): @@ -88,6 +89,7 @@ def __init__( self.loader = loader self.bqstoragereadclient = bqstoragereadclient self._enable_polars_execution = enable_polars_execution + self._enable_datafusion_execution = enable_datafusion_execution self._publisher = publisher self._labels = labels @@ -106,6 +108,13 @@ def __init__( *self._semi_executors, polars_executor.PolarsExecutor(), ) + if enable_datafusion_execution: + from bigframes.session import datafusion_executor + + self._semi_executors = ( + *self._semi_executors, + datafusion_executor.DataFusionExecutor(), + ) self._upload_lock = threading.Lock() def to_sql( diff --git a/bigframes/session/datafusion_executor.py b/bigframes/session/datafusion_executor.py new file mode 100644 index 0000000000..ff162efb91 --- /dev/null +++ b/bigframes/session/datafusion_executor.py @@ -0,0 +1,105 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import itertools +from typing import Optional + +import pyarrow as pa + +from bigframes.core import agg_expressions, bigframe_node, expression, nodes +import bigframes.operations +from bigframes.operations import generic_ops, numeric_ops +from bigframes.session import executor, semi_executor + +_COMPATIBLE_NODES = ( + nodes.ReadLocalNode, + nodes.SelectionNode, + nodes.ProjectionNode, + nodes.FilterNode, +) + +_COMPATIBLE_SCALAR_OPS = ( + generic_ops.IsNullOp, + generic_ops.NotNullOp, + generic_ops.CoalesceOp, + generic_ops.WhereOp, + numeric_ops.AddOp, + numeric_ops.SubOp, + numeric_ops.MulOp, + numeric_ops.DivOp, + numeric_ops.AbsOp, + numeric_ops.FloorOp, + numeric_ops.CeilOp, + numeric_ops.SqrtOp, + numeric_ops.ExpOp, + numeric_ops.LnOp, + numeric_ops.Log10Op, +) + +_COMPATIBLE_AGG_OPS = () + + +def _get_expr_ops(expr: expression.Expression) -> set[bigframes.operations.ScalarOp]: + if isinstance(expr, expression.OpExpression): + return set(itertools.chain.from_iterable(map(_get_expr_ops, expr.children))) + return set() + + +def _is_node_datafusion_executable(node: nodes.BigFrameNode): + if not isinstance(node, _COMPATIBLE_NODES): + return False + for expr in node._node_expressions: + if isinstance(expr, agg_expressions.Aggregation): + if not type(expr.op) in _COMPATIBLE_AGG_OPS: + return False + if isinstance(expr, expression.Expression): + if not set(map(type, _get_expr_ops(expr))).issubset(_COMPATIBLE_SCALAR_OPS): + return False + return True + + +class DataFusionExecutor(semi_executor.SemiExecutor): + def __init__(self): + from bigframes.core.compile.datafusion import DataFusionCompiler + + self._compiler = DataFusionCompiler() + + def execute( + self, + plan: bigframe_node.BigFrameNode, + ordered: bool, + peek: Optional[int] = None, + ) -> Optional[executor.ExecuteResult]: + if not self._can_execute(plan): + return None + # Note: Ignoring ordered flag for now, similar to Polars executor + try: + df = self._compiler.compile(plan) + except Exception: + return None + + if peek is not None: + df = df.limit(peek) + + batches = df.collect() + pa_table = pa.Table.from_batches(batches) + return executor.LocalExecuteResult( + data=pa_table, + bf_schema=plan.schema, + ) + + def _can_execute(self, plan: bigframe_node.BigFrameNode): + return all(_is_node_datafusion_executable(node) for node in plan.unique_nodes()) diff --git a/setup.py b/setup.py index 2179fe3e96..67b520a067 100644 --- a/setup.py +++ b/setup.py @@ -77,6 +77,7 @@ ], # used for local engine "polars": ["polars >= 1.21.0"], + "datafusion": ["datafusion >= 52.0.0"], "scikit-learn": ["scikit-learn>=1.2.2"], # Packages required for basic development flow. "dev": [ diff --git a/tests/system/small/test_datafusion_execution.py b/tests/system/small/test_datafusion_execution.py new file mode 100644 index 0000000000..9c07489891 --- /dev/null +++ b/tests/system/small/test_datafusion_execution.py @@ -0,0 +1,65 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import bigframes +import bigframes.bigquery +from bigframes.testing.utils import assert_frame_equal + +datafusion = pytest.importorskip("datafusion") + + +@pytest.fixture(scope="module") +def session_w_datafusion(): + context = bigframes.BigQueryOptions(location="US", enable_datafusion_execution=True) + session = bigframes.Session(context=context) + yield session + session.close() + + +def test_datafusion_execution_basic(session_w_datafusion, scalars_pandas_df_index): + execution_count_before = session_w_datafusion._metrics.execution_count + bf_df = session_w_datafusion.read_pandas(scalars_pandas_df_index) + + # Test projection and arithmetic + # int64_too is a column that likely exists, let's verify or use a generic one + # scalars_pandas_df_index usually has 'int64_too' based on polars test + bf_df["int64_plus_one"] = bf_df["int64_too"] + 1 + bf_result = bf_df[["int64_plus_one"]].to_pandas() + + # Pandas result + pd_df = scalars_pandas_df_index.copy() + pd_df["int64_plus_one"] = pd_df["int64_too"] + 1 + pd_result = pd_df[["int64_plus_one"]] + + # Verify execution stayed local (metrics count shouldn't increase for BQ jobs) + assert session_w_datafusion._metrics.execution_count == execution_count_before + assert_frame_equal(bf_result, pd_result) + + +def test_datafusion_execution_filter(session_w_datafusion, scalars_pandas_df_index): + execution_count_before = session_w_datafusion._metrics.execution_count + bf_df = session_w_datafusion.read_pandas(scalars_pandas_df_index) + + # Test filter + bf_filtered = bf_df[bf_df["int64_too"] > 0] + bf_result = bf_filtered[["int64_too"]].to_pandas() + + pd_df = scalars_pandas_df_index.copy() + pd_filtered = pd_df[pd_df["int64_too"] > 0] + pd_result = pd_filtered[["int64_too"]] + + assert session_w_datafusion._metrics.execution_count == execution_count_before + assert_frame_equal(bf_result, pd_result)