-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSnowflakeImportAdapter.php
More file actions
80 lines (70 loc) · 2.35 KB
/
SnowflakeImportAdapter.php
File metadata and controls
80 lines (70 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
<?php
declare(strict_types=1);
namespace Keboola\Db\ImportExport\Storage\Snowflake;
use Generator;
use Keboola\Db\ImportExport\Backend\ImportState;
use Keboola\Db\ImportExport\ImportOptions;
use Keboola\Db\ImportExport\Backend\Snowflake\SnowflakeImportAdapterInterface;
use Keboola\Db\ImportExport\Storage\DestinationInterface;
use Keboola\Db\ImportExport\Storage\SourceInterface;
use Keboola\SnowflakeDbAdapter\Connection;
use Keboola\SnowflakeDbAdapter\QueryBuilder;
class SnowflakeImportAdapter implements SnowflakeImportAdapterInterface
{
/**
* @var Table
*/
private $source;
/**
* @param Table $source
*/
public function __construct(SourceInterface $source)
{
$this->source = $source;
}
/**
* @param Table $destination
*/
public function executeCopyCommands(
Generator $commands,
Connection $connection,
DestinationInterface $destination,
ImportOptions $importOptions,
ImportState $importState
): int {
$importState->startTimer('copyToStaging');
$connection->query($commands->current());
$rows = $connection->fetchAll(sprintf(
'SELECT COUNT(*) AS "count" FROM %s.%s',
QueryBuilder::quoteIdentifier($destination->getSchema()),
QueryBuilder::quoteIdentifier($importState->getStagingTableName())
));
$importState->stopTimer('copyToStaging');
return (int) $rows[0]['count'];
}
/**
* @param Table $destination
*/
public function getCopyCommands(
DestinationInterface $destination,
ImportOptions $importOptions,
string $stagingTableName
): Generator {
$quotedColumns = array_map(function ($column) {
return QueryBuilder::quoteIdentifier($column);
}, $importOptions->getColumns());
$sql = sprintf(
'INSERT INTO %s.%s (%s)',
QueryBuilder::quoteIdentifier($destination->getSchema()),
QueryBuilder::quoteIdentifier($stagingTableName),
implode(', ', $quotedColumns)
);
$sql .= sprintf(
' SELECT %s FROM %s.%s',
implode(', ', $quotedColumns),
QueryBuilder::quoteIdentifier($this->source->getSchema()),
QueryBuilder::quoteIdentifier($this->source->getTableName()),
);
yield $sql;
}
}