Skip to content

Commit e872a5e

Browse files
authored
Merge pull request #185 from keboola/zajca-ct-1842-bq-improvements
CT-1842 add Retry class for BQ
2 parents 69a4b20 + 6cf40c5 commit e872a5e

3 files changed

Lines changed: 375 additions & 0 deletions

File tree

src/Connection/Bigquery/Retry.php

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\TableBackendUtils\Connection\Bigquery;
6+
7+
use Closure;
8+
use GuzzleHttp\Exception\RequestException;
9+
use JsonException;
10+
use Psr\Log\LoggerInterface;
11+
use Psr\Log\LogLevel;
12+
use Throwable;
13+
14+
final class Retry
15+
{
16+
private const RETRY_MISSING_CREATE_JOB = 'bigquery.jobs.create';
17+
private const RETRY_SERVICE_ACCOUNT_NOT_EXIST = 'IAM setPolicy failed for Dataset';
18+
private const RETRY_ON_REASON = [
19+
'rateLimitExceeded',
20+
'userRateLimitExceeded',
21+
'backendError',
22+
'jobRateLimitExceeded',
23+
];
24+
private const ALWAYS_RETRY_STATUS_CODES = [429, 500, 503];
25+
26+
/**
27+
* helper method to overcome some irregular behavior of google bigquery client
28+
*/
29+
public static function getRestRetryFunction(LoggerInterface $logger, bool $includeUnauthorized = false): Closure
30+
{
31+
return static function () use ($logger, $includeUnauthorized): Closure|bool {
32+
// BigQuery client sometimes calls directly restRetryFunction with exception as first argument
33+
// But in other cases it expects to return callable which accepts exception as first argument
34+
$argsNum = func_num_args();
35+
if ($argsNum === 2) {
36+
$ex = func_get_arg(0);
37+
if ($ex instanceof Throwable) {
38+
return Retry::getRetryDecider($logger, $includeUnauthorized)($ex);
39+
}
40+
}
41+
return Retry::getRetryDecider($logger, $includeUnauthorized);
42+
};
43+
}
44+
45+
/**
46+
* @param bool $includeUnauthorized default false, google cloud sometimes returns 401 even when credentials are
47+
* correct, but it is bit tricky since in case of invalid credentials for real, it could cause long waiting
48+
* loop
49+
*/
50+
public static function getRetryDecider(LoggerInterface $logger, bool $includeUnauthorized = false): Closure
51+
{
52+
return static function (Throwable $ex) use ($logger, $includeUnauthorized): bool {
53+
$statusCode = $ex->getCode();
54+
55+
$retryOnStatusCodes = self::ALWAYS_RETRY_STATUS_CODES;
56+
if ($includeUnauthorized) {
57+
$retryOnStatusCodes[] = 401;
58+
}
59+
if (in_array($statusCode, $retryOnStatusCodes)) {
60+
Retry::logRetry($statusCode, [], $logger);
61+
return true;
62+
}
63+
if ($statusCode >= 200 && $statusCode < 300) {
64+
return false;
65+
}
66+
67+
$message = $ex->getMessage();
68+
if ($ex instanceof RequestException && $ex->hasResponse()) {
69+
$message = (string) $ex->getResponse()?->getBody();
70+
}
71+
if (str_contains($message, self::RETRY_SERVICE_ACCOUNT_NOT_EXIST)) {
72+
Retry::logRetry($statusCode, [$message], $logger);
73+
return true;
74+
}
75+
if (str_contains($message, self::RETRY_MISSING_CREATE_JOB)) {
76+
Retry::logRetry($statusCode, $message, $logger);
77+
return true;
78+
}
79+
80+
try {
81+
$message = json_decode($message, true, 512, JSON_THROW_ON_ERROR);
82+
assert(is_array($message));
83+
} catch (JsonException) {
84+
Retry::logNotRetry($statusCode, $message, $logger);
85+
return false;
86+
}
87+
88+
if (!array_key_exists('error', $message)) {
89+
Retry::logNotRetry($statusCode, $message, $logger);
90+
return false;
91+
}
92+
93+
if (!array_key_exists('errors', $message['error'])) {
94+
Retry::logNotRetry($statusCode, $message, $logger);
95+
return false;
96+
}
97+
98+
if (!is_array($message['error']['errors'])) {
99+
Retry::logNotRetry($statusCode, $message, $logger);
100+
return false;
101+
}
102+
103+
foreach ($message['error']['errors'] as $error) {
104+
if (array_key_exists('reason', $error) && in_array($error['reason'], self::RETRY_ON_REASON, false)) {
105+
Retry::logRetry($statusCode, $message, $logger);
106+
return true;
107+
}
108+
}
109+
110+
Retry::logNotRetry($statusCode, $message, $logger);
111+
112+
return false;
113+
};
114+
}
115+
116+
/**
117+
* @param array<mixed> $message
118+
* @throws JsonException
119+
*/
120+
private static function logRetry(int $statusCode, array|string $message, LoggerInterface $logger): void
121+
{
122+
if (is_array($message)) {
123+
$message = json_encode($message, JSON_THROW_ON_ERROR);
124+
}
125+
126+
$logger->log(
127+
LogLevel::INFO,
128+
sprintf(
129+
'Retrying [%s] request with exception::%s',
130+
$statusCode,
131+
$message,
132+
),
133+
);
134+
}
135+
136+
/**
137+
* @param array<mixed> $message
138+
* @throws JsonException
139+
*/
140+
private static function logNotRetry(int $statusCode, string|array $message, LoggerInterface $logger): void
141+
{
142+
if (is_array($message)) {
143+
$message = json_encode($message, JSON_THROW_ON_ERROR);
144+
}
145+
$logger->log(
146+
LogLevel::INFO,
147+
sprintf(
148+
'Not retrying [%s] request with exception::%s',
149+
$statusCode,
150+
$message,
151+
),
152+
);
153+
}
154+
}

tests/Functional/Bigquery/BigqueryBaseCase.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,11 @@
88
use GuzzleHttp\Client;
99
use Keboola\TableBackendUtils\Connection\Bigquery\BigQueryClientHandler;
1010
use Keboola\TableBackendUtils\Connection\Bigquery\BigQueryClientWrapper;
11+
use Keboola\TableBackendUtils\Connection\Bigquery\Retry;
1112
use Keboola\TableBackendUtils\Escaping\Bigquery\BigqueryQuote;
1213
use LogicException;
1314
use PHPUnit\Framework\TestCase;
15+
use Psr\Log\NullLogger;
1416

1517
class BigqueryBaseCase extends TestCase
1618
{
@@ -94,6 +96,7 @@ private function getBigqueryClient(): BigQueryClient
9496
[
9597
'keyFile' => $this->getCredentials(),
9698
'httpHandler' => new BigQueryClientHandler(new Client()),
99+
'restRetryFunction' => Retry::getRestRetryFunction(new NullLogger(), true),
97100
],
98101
'e2e-utils-lib',
99102
);
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Keboola\TableBackendUtils\Unit\Connection\Bigquery;
6+
7+
use Exception;
8+
use Generator;
9+
use GuzzleHttp\Exception\RequestException;
10+
use GuzzleHttp\Psr7\Utils;
11+
use Keboola\TableBackendUtils\Connection\Bigquery\Retry;
12+
use PHPUnit\Framework\TestCase;
13+
use Psr\Http\Message\RequestInterface;
14+
use Psr\Http\Message\ResponseInterface;
15+
use Psr\Log\NullLogger;
16+
use Throwable;
17+
18+
class RetryTest extends TestCase
19+
{
20+
private function getException(int $code, string $message = ''): Throwable
21+
{
22+
return new Exception($message, $code);
23+
}
24+
25+
private function getRequestException(int $code, ?string $message = ''): Throwable
26+
{
27+
$response = $this->createMock(ResponseInterface::class);
28+
$response->method('getBody')->willReturn(Utils::streamFor($message));
29+
return new RequestException(
30+
'',
31+
$this->createMock(RequestInterface::class),
32+
$response,
33+
$this->getException($code),
34+
);
35+
}
36+
37+
/**
38+
* @return int[][]
39+
*/
40+
public function retryCodesProvider(): array
41+
{
42+
return [
43+
[200],
44+
[210],
45+
[299],
46+
];
47+
}
48+
49+
/**
50+
* @dataProvider retryCodesProvider
51+
*/
52+
public function testSuccessResponse(int $statusCode): void
53+
{
54+
$fn = Retry::getRetryDecider(new NullLogger());
55+
$ex = $this->getException($statusCode);
56+
$this->assertFalse($fn($ex));
57+
}
58+
59+
/**
60+
* @return int[][]
61+
*/
62+
public function retryCodesErrorProvider(): array
63+
{
64+
return [
65+
[429],
66+
[500],
67+
[503],
68+
];
69+
}
70+
71+
/**
72+
* @dataProvider retryCodesErrorProvider
73+
*/
74+
public function testRetryOnCodesResponse(int $statusCode): void
75+
{
76+
$fn = Retry::getRetryDecider(new NullLogger());
77+
$ex = $this->getException($statusCode);
78+
$this->assertTrue($fn($ex));
79+
}
80+
81+
public function testNotJsonResponse(): void
82+
{
83+
$fn = Retry::getRetryDecider(new NullLogger());
84+
$ex = $this->getException(418, 'not json');
85+
$this->assertFalse($fn($ex));
86+
}
87+
88+
public function testNotExpectedContentResponse(): void
89+
{
90+
$fn = Retry::getRetryDecider(new NullLogger());
91+
$ex = $this->getException(418, '{"data" : "test"}');
92+
$this->assertFalse($fn($ex));
93+
}
94+
95+
public function responseContentProvider(): Generator
96+
{
97+
foreach (['Throwable', 'RequestException'] as $exceptionType) {
98+
yield 'not error response ' . ' ' . $exceptionType => [
99+
'{"data" : "test"}',
100+
false,
101+
$exceptionType,
102+
];
103+
104+
yield 'errors not array' . ' ' . $exceptionType => [
105+
'{"error": { "errors" : "test" }}',
106+
false,
107+
$exceptionType,
108+
];
109+
110+
yield 'errors empty array' . ' ' . $exceptionType => [
111+
'{"error": { "errors" : [] }}',
112+
false,
113+
$exceptionType,
114+
];
115+
yield 'errors expected errors[0]' . ' ' . $exceptionType => [
116+
'{"error": { "errors" : [{"test":"test"}] }}',
117+
false,
118+
$exceptionType,
119+
];
120+
121+
yield 'errors no reason ' . $exceptionType => [
122+
'{"error": { "errors" : [{"message":"bigquery.jobs.create"}] }}',
123+
true,
124+
$exceptionType,
125+
];
126+
127+
yield 'errors no message ' . $exceptionType => [
128+
'{"error": { "errors" : [{"reason":"userRateLimitExceeded"}] }}',
129+
true,
130+
$exceptionType,
131+
];
132+
133+
yield 'unknown reason and message ' . $exceptionType => [
134+
'{"error": { "errors" : [{"reason":"unknown","message": "unknown"}] }}',
135+
false,
136+
$exceptionType,
137+
];
138+
139+
/**
140+
* @var array{error:array{
141+
* code:int,
142+
* message:string,
143+
* status:string,
144+
* errors:array<array{
145+
* message:string,
146+
* domain:string,
147+
* reason:string
148+
* }>
149+
* }} $json
150+
*/
151+
$json = json_decode(<<<EOD
152+
{
153+
"error": {
154+
"code": 404,
155+
"message": "Not found: xxx",
156+
"errors": [
157+
{
158+
"message": "Not found: xxx",
159+
"domain": "global",
160+
"reason": "notFound"
161+
}
162+
],
163+
"status": "NOT_FOUND"
164+
}
165+
}
166+
EOD, true, 512, JSON_THROW_ON_ERROR);
167+
168+
foreach ([
169+
'rateLimitExceeded',
170+
'userRateLimitExceeded',
171+
'backendError',
172+
'jobRateLimitExceeded',
173+
] as $reason) {
174+
$json['error']['errors'][0]['reason'] = $reason;
175+
yield 'retry on ' . $reason . ' ' . $exceptionType => [
176+
json_encode($json, JSON_THROW_ON_ERROR),
177+
true,
178+
$exceptionType,
179+
];
180+
}
181+
182+
$json['error']['errors'][0]['reason'] = 'unknown';
183+
yield 'not retry on unknown reason ' . $exceptionType => [
184+
json_encode($json, JSON_THROW_ON_ERROR),
185+
false,
186+
$exceptionType,
187+
];
188+
189+
foreach ([
190+
'bigquery.jobs.create',
191+
//phpcs:ignore
192+
'IAM setPolicy failed for Dataset xxxx:WORKSPACE_11111: Service account xxxx@xxxx.iam.gserviceaccount.com does not exist.',
193+
] as $msg) {
194+
$json['error']['errors'][0]['reason'] = 'unknown';
195+
$json['error']['errors'][0]['message'] = $msg;
196+
yield sprintf('retry on message "%s" "%s"', $msg, $exceptionType) => [
197+
json_encode($json, JSON_THROW_ON_ERROR),
198+
true,
199+
$exceptionType,
200+
];
201+
}
202+
}
203+
}
204+
205+
/**
206+
* @dataProvider responseContentProvider
207+
*/
208+
public function testResponseContent(string $json, bool $expectToRetry, string $exceptionType): void
209+
{
210+
$fn = Retry::getRetryDecider(new NullLogger());
211+
$ex = $this->getException(418, $json);
212+
if ($exceptionType === 'RequestException') {
213+
$this->getRequestException(418, $json);
214+
}
215+
216+
$this->assertSame($expectToRetry, $fn($ex));
217+
}
218+
}

0 commit comments

Comments
 (0)