Skip to content

Commit ac5dede

Browse files
authored
Merge pull request #835 from Altinity/feature/antalya-25.3/object_storage_remote_initiator
25.3 Antalya port of #756 - object storage cluster function
2 parents 1637aa2 + 75f8115 commit ac5dede

21 files changed

Lines changed: 185 additions & 20 deletions

src/Core/Settings.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6135,8 +6135,12 @@ Possible values:
61356135
/** Experimental tsToGrid aggregate function. */ \
61366136
DECLARE(Bool, allow_experimental_ts_to_grid_aggregate_function, false, R"(
61376137
Experimental tsToGrid aggregate function for Prometheus-like timeseries resampling. Cloud only
6138+
)", EXPERIMENTAL) \
6139+
DECLARE(Bool, object_storage_remote_initiator, false, R"(
6140+
Execute request to object storage as remote on one of object_storage_cluster nodes.
61386141
)", EXPERIMENTAL) \
61396142
\
6143+
61406144
/* ####################################################### */ \
61416145
/* ############ END OF EXPERIMENTAL FEATURES ############# */ \
61426146
/* ####################################################### */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
8787
{"allow_experimental_database_unity_catalog", false, false, "Allow experimental database engine DataLakeCatalog with catalog_type = 'unity'"},
8888
{"allow_experimental_database_glue_catalog", false, false, "Allow experimental database engine DataLakeCatalog with catalog_type = 'glue'"},
8989
{"use_page_cache_with_distributed_cache", false, false, "New setting"},
90+
{"object_storage_remote_initiator", false, false, "New setting."},
9091
{"use_iceberg_metadata_files_cache", true, true, "New setting"},
9192
{"use_query_condition_cache", false, false, "New setting."},
9293
{"iceberg_timestamp_ms", 0, 0, "New setting."},

src/IO/ReadBufferFromS3.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,12 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t attempt, si
431431
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}, Offset: {}",
432432
bucket, key, version_id.empty() ? "Latest" : version_id, range_begin);
433433
}
434+
else
435+
{
436+
LOG_TEST(
437+
log, "Read S3 object. Bucket: {}, Key: {}, Version: {}",
438+
bucket, key, version_id.empty() ? "Latest" : version_id);
439+
}
434440

435441
ProfileEvents::increment(ProfileEvents::S3GetObject);
436442
if (client_ptr->isClientForDisk())

src/Storages/IStorageCluster.cpp

Lines changed: 79 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#include <Storages/IStorageCluster.h>
22

3+
#include <pcg_random.hpp>
4+
#include <Common/randomSeed.h>
5+
36
#include <Common/Exception.h>
47
#include <Core/Settings.h>
58
#include <Core/QueryProcessingStage.h>
@@ -12,6 +15,7 @@
1215
#include <Interpreters/AddDefaultDatabaseVisitor.h>
1316
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
1417
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
18+
#include <Planner/Utils.h>
1519
#include <Processors/Sources/NullSource.h>
1620
#include <Processors/Sources/RemoteSource.h>
1721
#include <QueryPipeline/narrowPipe.h>
@@ -21,6 +25,9 @@
2125
#include <Storages/IStorage.h>
2226
#include <Storages/SelectQueryInfo.h>
2327
#include <Storages/StorageDictionary.h>
28+
#include <Storages/StorageDistributed.h>
29+
#include <TableFunctions/TableFunctionFactory.h>
30+
#include <Storages/extractTableFunctionFromSelectQuery.h>
2431

2532
#include <algorithm>
2633
#include <memory>
@@ -39,6 +46,7 @@ namespace Setting
3946
extern const SettingsString cluster_for_parallel_replicas;
4047
extern const SettingsNonZeroUInt64 max_parallel_replicas;
4148
extern const SettingsUInt64 object_storage_max_nodes;
49+
extern const SettingsBool object_storage_remote_initiator;
4250
}
4351

4452
namespace ErrorCodes
@@ -96,15 +104,16 @@ void IStorageCluster::read(
96104

97105
storage_snapshot->check(column_names);
98106

99-
updateBeforeRead(context);
100-
auto cluster = getClusterImpl(context, cluster_name_from_settings, context->getSettingsRef()[Setting::object_storage_max_nodes]);
107+
const auto & settings = context->getSettingsRef();
108+
109+
auto cluster = getClusterImpl(context, cluster_name_from_settings, settings[Setting::object_storage_max_nodes]);
101110

102111
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
103112

104113
Block sample_block;
105114
ASTPtr query_to_send = query_info.query;
106115

107-
if (context->getSettingsRef()[Setting::allow_experimental_analyzer])
116+
if (settings[Setting::allow_experimental_analyzer])
108117
{
109118
sample_block = InterpreterSelectQueryAnalyzer::getSampleBlock(query_info.query, context, SelectQueryOptions(processed_stage));
110119
}
@@ -117,6 +126,17 @@ void IStorageCluster::read(
117126

118127
updateQueryToSendIfNeeded(query_to_send, storage_snapshot, context);
119128

129+
if (settings[Setting::object_storage_remote_initiator])
130+
{
131+
auto storage_and_context = convertToRemote(cluster, context, cluster_name_from_settings, query_to_send);
132+
auto src_distributed = std::dynamic_pointer_cast<StorageDistributed>(storage_and_context.storage);
133+
auto modified_query_info = query_info;
134+
modified_query_info.cluster = src_distributed->getCluster();
135+
auto new_storage_snapshot = storage_and_context.storage->getStorageSnapshot(storage_snapshot->metadata, storage_and_context.context);
136+
storage_and_context.storage->read(query_plan, column_names, new_storage_snapshot, modified_query_info, storage_and_context.context, processed_stage, max_block_size, num_streams);
137+
return;
138+
}
139+
120140
RestoreQualifiedNamesVisitor::Data data;
121141
data.distributed_table = DatabaseAndTableWithAlias(*getTableExpression(query_info.query->as<ASTSelectQuery &>(), 0));
122142
data.remote_table.database = context->getCurrentDatabase();
@@ -144,6 +164,62 @@ void IStorageCluster::read(
144164
query_plan.addStep(std::move(reading));
145165
}
146166

167+
IStorageCluster::RemoteCallVariables IStorageCluster::convertToRemote(
168+
ClusterPtr cluster,
169+
ContextPtr context,
170+
const std::string & cluster_name_from_settings,
171+
ASTPtr query_to_send)
172+
{
173+
auto host_addresses = cluster->getShardsAddresses();
174+
if (host_addresses.empty())
175+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty cluster {}", cluster_name_from_settings);
176+
177+
static pcg64 rng(randomSeed());
178+
size_t shard_num = rng() % host_addresses.size();
179+
auto shard_addresses = host_addresses[shard_num];
180+
/// After getClusterImpl each shard must have exactly 1 replica
181+
if (shard_addresses.size() != 1)
182+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Size of shard {} in cluster {} is not equal 1", shard_num, cluster_name_from_settings);
183+
auto host_name = shard_addresses[0].toString();
184+
185+
LOG_INFO(log, "Choose remote initiator '{}'", host_name);
186+
187+
bool secure = shard_addresses[0].secure == Protocol::Secure::Enable;
188+
std::string remote_function_name = secure ? "remoteSecure" : "remote";
189+
190+
/// Clean object_storage_remote_initiator setting to avoid infinite remote call
191+
auto new_context = Context::createCopy(context);
192+
new_context->setSetting("object_storage_remote_initiator", false);
193+
194+
auto * select_query = query_to_send->as<ASTSelectQuery>();
195+
if (!select_query)
196+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expected SELECT query");
197+
198+
auto query_settings = select_query->settings();
199+
if (query_settings)
200+
{
201+
auto & settings_ast = query_settings->as<ASTSetQuery &>();
202+
if (settings_ast.changes.removeSetting("object_storage_remote_initiator") && settings_ast.changes.empty())
203+
{
204+
select_query->setExpression(ASTSelectQuery::Expression::SETTINGS, {});
205+
}
206+
}
207+
208+
ASTTableExpression * table_expression = extractTableExpressionASTPtrFromSelectQuery(query_to_send);
209+
if (!table_expression)
210+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't find table expression");
211+
212+
auto remote_query = makeASTFunction(remote_function_name, std::make_shared<ASTLiteral>(host_name), table_expression->table_function);
213+
214+
table_expression->table_function = remote_query;
215+
216+
auto remote_function = TableFunctionFactory::instance().get(remote_query, new_context);
217+
218+
auto storage = remote_function->execute(query_to_send, new_context, remote_function_name);
219+
220+
return RemoteCallVariables{storage, new_context};
221+
}
222+
147223
SinkToStoragePtr IStorageCluster::write(
148224
const ASTPtr & query,
149225
const StorageMetadataPtr & metadata_snapshot,

src/Storages/IStorageCluster.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,20 @@ class IStorageCluster : public IStorage
5858
virtual String getClusterName(ContextPtr /* context */) const { return getOriginalClusterName(); }
5959

6060
protected:
61-
virtual void updateBeforeRead(const ContextPtr &) {}
6261
virtual void updateQueryToSendIfNeeded(ASTPtr & /*query*/, const StorageSnapshotPtr & /*storage_snapshot*/, const ContextPtr & /*context*/) {}
6362

63+
struct RemoteCallVariables
64+
{
65+
StoragePtr storage;
66+
ContextPtr context;
67+
};
68+
69+
RemoteCallVariables convertToRemote(
70+
ClusterPtr cluster,
71+
ContextPtr context,
72+
const std::string & cluster_name_from_settings,
73+
ASTPtr query_to_send);
74+
6475
virtual void readFallBackToPure(
6576
QueryPlan & /* query_plan */,
6677
const Names & /* column_names */,

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,16 @@ class DataLakeConfiguration : public BaseStorageConfiguration, public std::enabl
8585
return std::nullopt;
8686
}
8787

88+
std::optional<String> tryGetSamplePathFromMetadata() const override
89+
{
90+
if (!current_metadata)
91+
return std::nullopt;
92+
auto data_files = current_metadata->getDataFiles();
93+
if (!data_files.empty())
94+
return data_files[0];
95+
return std::nullopt;
96+
}
97+
8898
std::optional<size_t> totalRows() override
8999
{
90100
if (!current_metadata)
@@ -465,8 +475,14 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
465475
createDynamicStorage(type);
466476
}
467477

478+
std::optional<String> tryGetSamplePathFromMetadata() const override
479+
{
480+
return getImpl().tryGetSamplePathFromMetadata();
481+
}
482+
468483
virtual void assertInitialized() const override { return getImpl().assertInitialized(); }
469484

485+
470486
private:
471487
inline StorageObjectStorage::Configuration & getImpl() const
472488
{

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadata.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class DeltaLakeMetadata final : public IDataLakeMetadata
4141

4242
DeltaLakeMetadata(ObjectStoragePtr object_storage_, ConfigurationObserverPtr configuration_, ContextPtr context_);
4343

44+
Strings getDataFiles() const override { return data_files; }
45+
4446
NamesAndTypesList getTableSchema() const override { return schema; }
4547

4648
DeltaLakePartitionColumns getPartitionColumns() const { return partition_columns; }

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ bool DeltaLakeMetadataDeltaKernel::update(const ContextPtr &)
3232
return table_snapshot->update();
3333
}
3434

35+
Strings DeltaLakeMetadataDeltaKernel::getDataFiles() const
36+
{
37+
throwNotImplemented("getDataFiles()");
38+
}
39+
3540
ObjectIterator DeltaLakeMetadataDeltaKernel::iterate(
3641
const ActionsDAG * filter_dag,
3742
FileProgressCallback callback,

src/Storages/ObjectStorage/DataLakes/DeltaLakeMetadataDeltaKernel.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ class DeltaLakeMetadataDeltaKernel final : public IDataLakeMetadata
4040

4141
bool update(const ContextPtr & context) override;
4242

43+
Strings getDataFiles() const override;
44+
4345
NamesAndTypesList getTableSchema() const override;
4446

4547
NamesAndTypesList getReadSchema() const override;

src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,19 +91,19 @@ HudiMetadata::HudiMetadata(ObjectStoragePtr object_storage_, ConfigurationObserv
9191
{
9292
}
9393

94-
Strings HudiMetadata::getDataFiles(const ActionsDAG *) const
94+
Strings HudiMetadata::getDataFiles() const
9595
{
9696
if (data_files.empty())
9797
data_files = getDataFilesImpl();
9898
return data_files;
9999
}
100100

101101
ObjectIterator HudiMetadata::iterate(
102-
const ActionsDAG * filter_dag,
102+
const ActionsDAG * /* filter_dag */,
103103
FileProgressCallback callback,
104104
size_t /* list_batch_size */) const
105105
{
106-
return createKeysIterator(getDataFiles(filter_dag), object_storage, callback);
106+
return createKeysIterator(getDataFiles(), object_storage, callback);
107107
}
108108

109109
}

0 commit comments

Comments
 (0)