Skip to content

Commit cf79bed

Browse files
authored
Merge pull request #232 from keboola/zajca/DMD-379/BQ-retry
DMD-379 Handle hanging sessions in BigQuery
2 parents b994bff + b866a99 commit cf79bed

1 file changed

Lines changed: 32 additions & 3 deletions

File tree

src/Connection/Bigquery/BigQueryClientWrapper.php

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,23 @@
99
use Google\Cloud\BigQuery\JobConfigurationInterface;
1010
use Google\Cloud\BigQuery\QueryResults;
1111
use InvalidArgumentException;
12+
use Psr\Log\LoggerInterface;
13+
use Psr\Log\NullLogger;
1214
use Retry\BackOff\BackOffPolicyInterface;
1315
use Retry\BackOff\ExponentialBackOffPolicy;
1416
use Retry\BackOff\ExponentialRandomBackOffPolicy;
1517
use Retry\Policy\SimpleRetryPolicy;
1618
use Retry\RetryProxy;
19+
use Throwable;
1720

1821
class BigQueryClientWrapper extends BigQueryClient
1922
{
2023
private BackOffPolicyInterface $backOffPolicy;
2124

25+
private LoggerInterface $logger;
26+
2227
private readonly QueryTags $queryTags;
28+
2329
/**
2430
* @inheritdoc
2531
* @param array<mixed> $config
@@ -31,6 +37,7 @@ public function __construct(
3137
string $runId = '',
3238
array $queryTags = [],
3339
BackOffPolicyInterface|null $backOffPolicy = null,
40+
LoggerInterface|null $logger = null,
3441
) {
3542
parent::__construct($config);
3643
if ($backOffPolicy === null) {
@@ -47,6 +54,10 @@ public function __construct(
4754
$queryTags[QueryTagKey::RUN_ID->value] = $runId;
4855
}
4956
$this->queryTags = new QueryTags($queryTags);
57+
if ($logger === null) {
58+
$logger = new NullLogger();
59+
}
60+
$this->logger = $logger;
5061
}
5162

5263
/**
@@ -75,9 +86,27 @@ public function runJob(JobConfigurationInterface $config, array $options = []):
7586
assert($options['backOffPolicy'] instanceof BackOffPolicyInterface);
7687
$retryPolicy = new SimpleRetryPolicy($options['retryCount']);
7788
$proxy = new RetryProxy($retryPolicy, $options['backOffPolicy']);
78-
$job = $proxy->call(function () use ($config, $options): Job {
79-
return $this->startJob($config, $options);
80-
});
89+
try {
90+
$job = $proxy->call(function () use ($config, $options): Job {
91+
return $this->startJob($config, $options);
92+
});
93+
} catch (Throwable $e) {
94+
$jobConfig = $config->toArray();
95+
$this->logger->warning('BigQuery job failed to start.', [
96+
'exception' => $e,
97+
'jobConfig' => $jobConfig,
98+
]);
99+
if (str_contains($e->getMessage(), 'Already Exists: Job')) {
100+
// Job with the same ID already exists - get the existing job
101+
if (!isset($jobConfig['jobReference']['jobId'])) {
102+
// this should never happen as ['jobReference']['jobId'] is assigned in the JobConfigurationTrait
103+
throw $e;
104+
}
105+
$job = $this->job($jobConfig['jobReference']['jobId']);
106+
} else {
107+
throw $e;
108+
}
109+
}
81110
assert($job instanceof Job);
82111
$context = $this->backOffPolicy->start();
83112
do {

0 commit comments

Comments
 (0)