From 59ed1a81f1647b4a1fadd0628b933b1f76fa9eff Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 20 Mar 2026 21:37:18 +0000 Subject: [PATCH 1/3] perf: Make executor data uploads async internally --- bigframes/session/bq_caching_executor.py | 39 +++++++++++++++--------- bigframes/session/loader.py | 13 +++++++- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index cf275154ce..8e432ae473 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -14,6 +14,7 @@ from __future__ import annotations +import concurrent.futures import math import threading from typing import Literal, Mapping, Optional, Sequence, Tuple @@ -28,7 +29,7 @@ from bigframes import exceptions as bfe import bigframes.constants import bigframes.core -from bigframes.core import bq_data, compile, local_data, rewrite +from bigframes.core import bq_data, compile, rewrite from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.compile.sqlglot import sqlglot_ir import bigframes.core.events @@ -514,13 +515,35 @@ def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): Replace large local sources with the uploaded version of those datasources. """ # Step 1: Upload all previously un-uploaded data + needs_upload = [] for leaf in original_root.unique_nodes(): if isinstance(leaf, nodes.ReadLocalNode): if ( leaf.local_data_source.metadata.total_bytes > bigframes.constants.MAX_INLINE_BYTES ): - self._upload_local_data(leaf.local_data_source) + needs_upload.append(leaf.local_data_source) + + futures = [] + try: + for local_source in needs_upload: + future = self.loader.read_data_async( + local_source, bigframes.core.guid.generate_guid() + ) + future.add_done_callback( + lambda f: self.cache.cache_remote_replacement( + local_source, f.result() + ) + ) + futures.append(future) + concurrent.futures.wait(futures) + for future in futures: + future.result() + except Exception as e: + # cancel all futures + for future in futures: + future.cancel() + raise e # Step 2: Replace local scans with remote scans def map_local_scans(node: nodes.BigFrameNode): @@ -550,18 +573,6 @@ def map_local_scans(node: nodes.BigFrameNode): return original_root.bottom_up(map_local_scans) - def _upload_local_data(self, local_table: local_data.ManagedArrowTable): - if self.cache.get_uploaded_local_data(local_table) is not None: - return - # Lock prevents concurrent repeated work, but slows things down. - # Might be better as a queue and a worker thread - with self._upload_lock: - if self.cache.get_uploaded_local_data(local_table) is None: - uploaded = self.loader.load_data_or_write_data( - local_table, bigframes.core.guid.generate_guid() - ) - self.cache.cache_remote_replacement(local_table, uploaded) - def _execute_plan_gbq( self, plan: nodes.BigFrameNode, diff --git a/bigframes/session/loader.py b/bigframes/session/loader.py index 0944c0dab6..7b5d1bcaf1 100644 --- a/bigframes/session/loader.py +++ b/bigframes/session/loader.py @@ -300,6 +300,17 @@ def __init__( self._session = session self._clock = session_time.BigQuerySyncedClock(bqclient) self._clock.sync() + self._threadpool = concurrent.futures.ThreadPoolExecutor( + max_workers=1, thread_name_prefix="bigframes-loader" + ) + + def read_data_async( + self, local_data: local_data.ManagedArrowTable, offsets_col: str + ) -> concurrent.futures.Future[bq_data.BigqueryDataSource]: + future = self._threadpool.submit( + self._load_data_or_write_data, local_data, offsets_col + ) + return future def read_pandas( self, @@ -350,7 +361,7 @@ def read_managed_data( session=self._session, ) - def load_data_or_write_data( + def _load_data_or_write_data( self, data: local_data.ManagedArrowTable, offsets_col: str, From f17c2d7bced2b3930bc82574fcd89e7cdbd48fa2 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 23 Mar 2026 19:41:16 +0000 Subject: [PATCH 2/3] fix closure bug, use as_completed instead of wait --- bigframes/session/bq_caching_executor.py | 28 +++++++++++++----------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 8e432ae473..2b7de2bad1 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -29,7 +29,7 @@ from bigframes import exceptions as bfe import bigframes.constants import bigframes.core -from bigframes.core import bq_data, compile, rewrite +from bigframes.core import bq_data, compile, local_data, rewrite from bigframes.core.compile.sqlglot import sql as sg_sql from bigframes.core.compile.sqlglot import sqlglot_ir import bigframes.core.events @@ -525,19 +525,21 @@ def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): needs_upload.append(leaf.local_data_source) futures = [] + for local_source in needs_upload: + future = self.loader.read_data_async( + local_source, bigframes.core.guid.generate_guid() + ) + + def cache_result( + future: concurrent.futures.Future, + local: local_data.ManagedArrowTable = local_source, + ): + self.cache.cache_remote_replacement(local, future.result()) + + future.add_done_callback(cache_result) + futures.append(future) try: - for local_source in needs_upload: - future = self.loader.read_data_async( - local_source, bigframes.core.guid.generate_guid() - ) - future.add_done_callback( - lambda f: self.cache.cache_remote_replacement( - local_source, f.result() - ) - ) - futures.append(future) - concurrent.futures.wait(futures) - for future in futures: + for future in concurrent.futures.as_completed(futures): future.result() except Exception as e: # cancel all futures From bfbb60314dbdb827e9969b164d3c4eabca8b8f66 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 23 Mar 2026 21:15:23 +0000 Subject: [PATCH 3/3] stop using callback to cache upload result --- bigframes/session/bq_caching_executor.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 2b7de2bad1..c5d6fe3e5f 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -524,23 +524,15 @@ def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): ): needs_upload.append(leaf.local_data_source) - futures = [] + futures: dict[concurrent.futures.Future, local_data.ManagedArrowTable] = dict() for local_source in needs_upload: future = self.loader.read_data_async( local_source, bigframes.core.guid.generate_guid() ) - - def cache_result( - future: concurrent.futures.Future, - local: local_data.ManagedArrowTable = local_source, - ): - self.cache.cache_remote_replacement(local, future.result()) - - future.add_done_callback(cache_result) - futures.append(future) + futures[future] = local_source try: - for future in concurrent.futures.as_completed(futures): - future.result() + for future in concurrent.futures.as_completed(futures.keys()): + self.cache.cache_remote_replacement(futures[future], future.result()) except Exception as e: # cancel all futures for future in futures: