|
21 | 21 | import io |
22 | 22 | import operator |
23 | 23 |
|
| 24 | +import google.api_core.retry |
24 | 25 | import pkg_resources |
25 | 26 | import pytest |
26 | 27 | import pytz |
|
41 | 42 | PANDAS_INT64_VERSION = pkg_resources.parse_version("1.0.0") |
42 | 43 |
|
43 | 44 |
|
| 45 | +class MissingDataError(Exception): |
| 46 | + pass |
| 47 | + |
| 48 | + |
44 | 49 | def test_load_table_from_dataframe_w_automatic_schema(bigquery_client, dataset_id): |
45 | 50 | """Test that a DataFrame with dtypes that map well to BigQuery types |
46 | 51 | can be uploaded without specifying a schema. |
@@ -666,27 +671,34 @@ def test_insert_rows_from_dataframe(bigquery_client, dataset_id): |
666 | 671 | ) |
667 | 672 | for errors in chunk_errors: |
668 | 673 | assert not errors |
669 | | - |
670 | | - # Use query to fetch rows instead of listing directly from the table so |
671 | | - # that we get values from the streaming buffer. |
672 | | - rows = list( |
673 | | - bigquery_client.query( |
674 | | - "SELECT * FROM `{}.{}.{}`".format( |
675 | | - table.project, table.dataset_id, table.table_id |
676 | | - ) |
677 | | - ) |
678 | | - ) |
679 | | - |
680 | | - sorted_rows = sorted(rows, key=operator.attrgetter("int_col")) |
681 | | - row_tuples = [r.values() for r in sorted_rows] |
682 | 674 | expected = [ |
683 | 675 | # Pandas often represents NULL values as NaN. Convert to None for |
684 | 676 | # easier comparison. |
685 | 677 | tuple(None if col != col else col for col in data_row) |
686 | 678 | for data_row in dataframe.itertuples(index=False) |
687 | 679 | ] |
688 | 680 |
|
689 | | - assert len(row_tuples) == len(expected) |
| 681 | + # Use query to fetch rows instead of listing directly from the table so |
| 682 | + # that we get values from the streaming buffer "within a few seconds". |
| 683 | + # https://cloud.google.com/bigquery/streaming-data-into-bigquery#dataavailability |
| 684 | + @google.api_core.retry.Retry( |
| 685 | + predicate=google.api_core.retry.if_exception_type(MissingDataError) |
| 686 | + ) |
| 687 | + def get_rows(): |
| 688 | + rows = list( |
| 689 | + bigquery_client.query( |
| 690 | + "SELECT * FROM `{}.{}.{}`".format( |
| 691 | + table.project, table.dataset_id, table.table_id |
| 692 | + ) |
| 693 | + ) |
| 694 | + ) |
| 695 | + if len(rows) != len(expected): |
| 696 | + raise MissingDataError() |
| 697 | + return rows |
| 698 | + |
| 699 | + rows = get_rows() |
| 700 | + sorted_rows = sorted(rows, key=operator.attrgetter("int_col")) |
| 701 | + row_tuples = [r.values() for r in sorted_rows] |
690 | 702 |
|
691 | 703 | for row, expected_row in zip(row_tuples, expected): |
692 | 704 | assert ( |
|
0 commit comments