1313# limitations under the License.
1414
1515import itertools
16- import json
1716import logging
1817import time
1918import unittest
@@ -2271,26 +2270,26 @@ def test_to_dataframe_w_bqstorage_logs_session(self):
22712270 @unittest .skipIf (
22722271 bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
22732272 )
2273+ @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
22742274 def test_to_dataframe_w_bqstorage_empty_streams (self ):
22752275 from google .cloud .bigquery import schema
22762276 from google .cloud .bigquery import table as mut
22772277 from google .cloud .bigquery_storage_v1beta1 import reader
22782278
2279+ arrow_fields = [
2280+ pyarrow .field ("colA" , pyarrow .int64 ()),
2281+ # Not alphabetical to test column order.
2282+ pyarrow .field ("colC" , pyarrow .float64 ()),
2283+ pyarrow .field ("colB" , pyarrow .utf8 ()),
2284+ ]
2285+ arrow_schema = pyarrow .schema (arrow_fields )
2286+
22792287 bqstorage_client = mock .create_autospec (
22802288 bigquery_storage_v1beta1 .BigQueryStorageClient
22812289 )
22822290 session = bigquery_storage_v1beta1 .types .ReadSession (
2283- streams = [{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" }]
2284- )
2285- session .avro_schema .schema = json .dumps (
2286- {
2287- "fields" : [
2288- {"name" : "colA" },
2289- # Not alphabetical to test column order.
2290- {"name" : "colC" },
2291- {"name" : "colB" },
2292- ]
2293- }
2291+ streams = [{"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" }],
2292+ arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
22942293 )
22952294 bqstorage_client .create_read_session .return_value = session
22962295
@@ -2327,11 +2326,20 @@ def test_to_dataframe_w_bqstorage_empty_streams(self):
23272326 @unittest .skipIf (
23282327 bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
23292328 )
2329+ @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
23302330 def test_to_dataframe_w_bqstorage_nonempty (self ):
23312331 from google .cloud .bigquery import schema
23322332 from google .cloud .bigquery import table as mut
23332333 from google .cloud .bigquery_storage_v1beta1 import reader
23342334
2335+ arrow_fields = [
2336+ pyarrow .field ("colA" , pyarrow .int64 ()),
2337+ # Not alphabetical to test column order.
2338+ pyarrow .field ("colC" , pyarrow .float64 ()),
2339+ pyarrow .field ("colB" , pyarrow .utf8 ()),
2340+ ]
2341+ arrow_schema = pyarrow .schema (arrow_fields )
2342+
23352343 bqstorage_client = mock .create_autospec (
23362344 bigquery_storage_v1beta1 .BigQueryStorageClient
23372345 )
@@ -2340,16 +2348,9 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
23402348 {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" },
23412349 {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/5678" },
23422350 ]
2343- session = bigquery_storage_v1beta1 .types .ReadSession (streams = streams )
2344- session .avro_schema .schema = json .dumps (
2345- {
2346- "fields" : [
2347- {"name" : "colA" },
2348- # Not alphabetical to test column order.
2349- {"name" : "colC" },
2350- {"name" : "colB" },
2351- ]
2352- }
2351+ session = bigquery_storage_v1beta1 .types .ReadSession (
2352+ streams = streams ,
2353+ arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
23532354 )
23542355 bqstorage_client .create_read_session .return_value = session
23552356
@@ -2400,17 +2401,23 @@ def test_to_dataframe_w_bqstorage_nonempty(self):
24002401 @unittest .skipIf (
24012402 bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
24022403 )
2404+ @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
24032405 def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index (self ):
24042406 from google .cloud .bigquery import schema
24052407 from google .cloud .bigquery import table as mut
24062408 from google .cloud .bigquery_storage_v1beta1 import reader
24072409
2410+ arrow_fields = [pyarrow .field ("colA" , pyarrow .int64 ())]
2411+ arrow_schema = pyarrow .schema (arrow_fields )
2412+
24082413 streams = [
24092414 {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" },
24102415 {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/5678" },
24112416 ]
2412- session = bigquery_storage_v1beta1 .types .ReadSession (streams = streams )
2413- session .avro_schema .schema = json .dumps ({"fields" : [{"name" : "colA" }]})
2417+ session = bigquery_storage_v1beta1 .types .ReadSession (
2418+ streams = streams ,
2419+ arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
2420+ )
24142421
24152422 bqstorage_client = mock .create_autospec (
24162423 bigquery_storage_v1beta1 .BigQueryStorageClient
@@ -2448,6 +2455,7 @@ def test_to_dataframe_w_bqstorage_multiple_streams_return_unique_index(self):
24482455 bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
24492456 )
24502457 @unittest .skipIf (tqdm is None , "Requires `tqdm`" )
2458+ @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
24512459 @mock .patch ("tqdm.tqdm" )
24522460 def test_to_dataframe_w_bqstorage_updates_progress_bar (self , tqdm_mock ):
24532461 from google .cloud .bigquery import schema
@@ -2457,6 +2465,9 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
24572465 # Speed up testing.
24582466 mut ._PROGRESS_INTERVAL = 0.01
24592467
2468+ arrow_fields = [pyarrow .field ("testcol" , pyarrow .int64 ())]
2469+ arrow_schema = pyarrow .schema (arrow_fields )
2470+
24602471 bqstorage_client = mock .create_autospec (
24612472 bigquery_storage_v1beta1 .BigQueryStorageClient
24622473 )
@@ -2466,8 +2477,10 @@ def test_to_dataframe_w_bqstorage_updates_progress_bar(self, tqdm_mock):
24662477 {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" },
24672478 {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/5678" },
24682479 ]
2469- session = bigquery_storage_v1beta1 .types .ReadSession (streams = streams )
2470- session .avro_schema .schema = json .dumps ({"fields" : [{"name" : "testcol" }]})
2480+ session = bigquery_storage_v1beta1 .types .ReadSession (
2481+ streams = streams ,
2482+ arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
2483+ )
24712484 bqstorage_client .create_read_session .return_value = session
24722485
24732486 mock_rowstream = mock .create_autospec (reader .ReadRowsStream )
@@ -2521,6 +2534,7 @@ def blocking_to_dataframe(*args, **kwargs):
25212534 @unittest .skipIf (
25222535 bigquery_storage_v1beta1 is None , "Requires `google-cloud-bigquery-storage`"
25232536 )
2537+ @unittest .skipIf (pyarrow is None , "Requires `pyarrow`" )
25242538 def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt (self ):
25252539 from google .cloud .bigquery import schema
25262540 from google .cloud .bigquery import table as mut
@@ -2529,6 +2543,14 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
25292543 # Speed up testing.
25302544 mut ._PROGRESS_INTERVAL = 0.01
25312545
2546+ arrow_fields = [
2547+ pyarrow .field ("colA" , pyarrow .int64 ()),
2548+ # Not alphabetical to test column order.
2549+ pyarrow .field ("colC" , pyarrow .float64 ()),
2550+ pyarrow .field ("colB" , pyarrow .utf8 ()),
2551+ ]
2552+ arrow_schema = pyarrow .schema (arrow_fields )
2553+
25322554 bqstorage_client = mock .create_autospec (
25332555 bigquery_storage_v1beta1 .BigQueryStorageClient
25342556 )
@@ -2539,10 +2561,8 @@ def test_to_dataframe_w_bqstorage_exits_on_keyboardinterrupt(self):
25392561 # ends early.
25402562 {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/1234" },
25412563 {"name" : "/projects/proj/dataset/dset/tables/tbl/streams/5678" },
2542- ]
2543- )
2544- session .avro_schema .schema = json .dumps (
2545- {"fields" : [{"name" : "colA" }, {"name" : "colB" }, {"name" : "colC" }]}
2564+ ],
2565+ arrow_schema = {"serialized_schema" : arrow_schema .serialize ().to_pybytes ()},
25462566 )
25472567 bqstorage_client .create_read_session .return_value = session
25482568
0 commit comments