Skip to content

Commit ed9b073

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #746 from Altinity/feature/fix_configuration_format
Fix format, structure and compression method detection for DataLake
1 parent b92e68f commit ed9b073

21 files changed

Lines changed: 119 additions & 99 deletions

src/Storages/ObjectStorage/Azure/Configuration.cpp

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,9 @@ void StorageAzureConfiguration::fromNamedCollection(const NamedCollection & coll
149149
if (collection.has("account_key"))
150150
account_key = collection.get<String>("account_key");
151151

152-
structure = collection.getOrDefault<String>("structure", "auto");
153-
format = collection.getOrDefault<String>("format", format);
154-
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
152+
setStructure(collection.getOrDefault<String>("structure", "auto"));
153+
setFormat(collection.getOrDefault<String>("format", getFormat()));
154+
setCompressionMethod(collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto")));
155155

156156
blobs_paths = {blob_path};
157157
connection_params = getConnectionParams(connection_url, container_name, account_name, account_key, context);
@@ -187,12 +187,12 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
187187
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
188188
if (is_format_arg(fourth_arg))
189189
{
190-
format = fourth_arg;
190+
setFormat(fourth_arg);
191191
}
192192
else
193193
{
194194
if (with_structure)
195-
structure = fourth_arg;
195+
setStructure(fourth_arg);
196196
else
197197
throw Exception(
198198
ErrorCodes::BAD_ARGUMENTS,
@@ -204,8 +204,8 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
204204
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
205205
if (is_format_arg(fourth_arg))
206206
{
207-
format = fourth_arg;
208-
compression_method = checkAndGetLiteralArgument<String>(engine_args[4], "compression");
207+
setFormat(fourth_arg);
208+
setCompressionMethod(checkAndGetLiteralArgument<String>(engine_args[4], "compression"));
209209
}
210210
else
211211
{
@@ -220,9 +220,9 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
220220
{
221221
if (with_structure)
222222
{
223-
format = fourth_arg;
224-
compression_method = checkAndGetLiteralArgument<String>(engine_args[4], "compression");
225-
structure = checkAndGetLiteralArgument<String>(engine_args[5], "structure");
223+
setFormat(fourth_arg);
224+
setCompressionMethod(checkAndGetLiteralArgument<String>(engine_args[4], "compression"));
225+
setStructure(checkAndGetLiteralArgument<String>(engine_args[5], "structure"));
226226
}
227227
else
228228
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Format and compression must be last arguments");
@@ -234,12 +234,12 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
234234
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/structure");
235235
if (is_format_arg(sixth_arg))
236236
{
237-
format = sixth_arg;
237+
setFormat(sixth_arg);
238238
}
239239
else
240240
{
241241
if (with_structure)
242-
structure = sixth_arg;
242+
setStructure(sixth_arg);
243243
else
244244
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
245245
}
@@ -258,8 +258,8 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
258258
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
259259
if (!is_format_arg(sixth_arg))
260260
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
261-
format = sixth_arg;
262-
compression_method = checkAndGetLiteralArgument<String>(engine_args[6], "compression");
261+
setFormat(sixth_arg);
262+
setCompressionMethod(checkAndGetLiteralArgument<String>(engine_args[6], "compression"));
263263
}
264264
else if (with_structure && engine_args.size() == 8)
265265
{
@@ -269,9 +269,9 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
269269
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format");
270270
if (!is_format_arg(sixth_arg))
271271
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
272-
format = sixth_arg;
273-
compression_method = checkAndGetLiteralArgument<String>(engine_args[6], "compression");
274-
structure = checkAndGetLiteralArgument<String>(engine_args[7], "structure");
272+
setFormat(sixth_arg);
273+
setCompressionMethod (checkAndGetLiteralArgument<String>(engine_args[6], "compression"));
274+
setStructure(checkAndGetLiteralArgument<String>(engine_args[7], "structure"));
275275
}
276276

277277
blobs_paths = {blob_path};

src/Storages/ObjectStorage/DataLakes/DataLakeConfiguration.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -366,6 +366,14 @@ class StorageIcebergConfiguration : public StorageObjectStorage::Configuration,
366366
return getImpl().createArgsWithAccessData();
367367
}
368368

369+
const String & getFormat() const override { return getImpl().getFormat(); }
370+
const String & getCompressionMethod() const override { return getImpl().getCompressionMethod(); }
371+
const String & getStructure() const override { return getImpl().getStructure(); }
372+
373+
void setFormat(const String & format_) override { getImpl().setFormat(format_); }
374+
void setCompressionMethod(const String & compression_method_) override { getImpl().setCompressionMethod(compression_method_); }
375+
void setStructure(const String & structure_) override { getImpl().setStructure(structure_); }
376+
369377
protected:
370378
void fromNamedCollection(const NamedCollection & collection, ContextPtr context) override
371379
{ return getImpl().fromNamedCollection(collection, context); }

src/Storages/ObjectStorage/DataLakes/HudiMetadata.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ Strings HudiMetadata::getDataFilesImpl() const
4444
{
4545
auto configuration_ptr = configuration.lock();
4646
auto log = getLogger("HudiMetadata");
47-
const auto keys = listFiles(*object_storage, *configuration_ptr, "", Poco::toLower(configuration_ptr->format));
47+
const auto keys = listFiles(*object_storage, *configuration_ptr, "", Poco::toLower(configuration_ptr->getFormat()));
4848

4949
using Partition = std::string;
5050
using FileID = std::string;

src/Storages/ObjectStorage/HDFS/Configuration.cpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,23 +111,23 @@ void StorageHDFSConfiguration::fromAST(ASTs & args, ContextPtr context, bool wit
111111

112112
if (args.size() > 1)
113113
{
114-
format = checkAndGetLiteralArgument<String>(args[1], "format_name");
114+
setFormat(checkAndGetLiteralArgument<String>(args[1], "format_name"));
115115
}
116116

117117
if (with_structure)
118118
{
119119
if (args.size() > 2)
120120
{
121-
structure = checkAndGetLiteralArgument<String>(args[2], "structure");
121+
setStructure(checkAndGetLiteralArgument<String>(args[2], "structure"));
122122
}
123123
if (args.size() > 3)
124124
{
125-
compression_method = checkAndGetLiteralArgument<String>(args[3], "compression_method");
125+
setCompressionMethod(checkAndGetLiteralArgument<String>(args[3], "compression_method"));
126126
}
127127
}
128128
else if (args.size() > 2)
129129
{
130-
compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
130+
setCompressionMethod(checkAndGetLiteralArgument<String>(args[2], "compression_method"));
131131
}
132132

133133
setURL(url_str);
@@ -143,10 +143,10 @@ void StorageHDFSConfiguration::fromNamedCollection(const NamedCollection & colle
143143
else
144144
url_str = collection.get<String>("url");
145145

146-
format = collection.getOrDefault<String>("format", "auto");
147-
compression_method = collection.getOrDefault<String>("compression_method",
148-
collection.getOrDefault<String>("compression", "auto"));
149-
structure = collection.getOrDefault<String>("structure", "auto");
146+
setFormat(collection.getOrDefault<String>("format", "auto"));
147+
setCompressionMethod(collection.getOrDefault<String>("compression_method",
148+
collection.getOrDefault<String>("compression", "auto")));
149+
setStructure(collection.getOrDefault<String>("structure", "auto"));
150150

151151
setURL(url_str);
152152
}

src/Storages/ObjectStorage/Local/Configuration.cpp

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
2424
void StorageLocalConfiguration::fromNamedCollection(const NamedCollection & collection, ContextPtr)
2525
{
2626
path = collection.get<String>("path");
27-
format = collection.getOrDefault<String>("format", "auto");
28-
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
29-
structure = collection.getOrDefault<String>("structure", "auto");
27+
setFormat(collection.getOrDefault<String>("format", "auto"));
28+
setCompressionMethod(collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto")));
29+
setStructure(collection.getOrDefault<String>("structure", "auto"));
3030
paths = {path};
3131
}
3232

@@ -46,23 +46,23 @@ void StorageLocalConfiguration::fromAST(ASTs & args, ContextPtr context, bool wi
4646

4747
if (args.size() > 1)
4848
{
49-
format = checkAndGetLiteralArgument<String>(args[1], "format_name");
49+
setFormat(checkAndGetLiteralArgument<String>(args[1], "format_name"));
5050
}
5151

5252
if (with_structure)
5353
{
5454
if (args.size() > 2)
5555
{
56-
structure = checkAndGetLiteralArgument<String>(args[2], "structure");
56+
setStructure(checkAndGetLiteralArgument<String>(args[2], "structure"));
5757
}
5858
if (args.size() > 3)
5959
{
60-
compression_method = checkAndGetLiteralArgument<String>(args[3], "compression_method");
60+
setCompressionMethod(checkAndGetLiteralArgument<String>(args[3], "compression_method"));
6161
}
6262
}
6363
else if (args.size() > 2)
6464
{
65-
compression_method = checkAndGetLiteralArgument<String>(args[2], "compression_method");
65+
setCompressionMethod(checkAndGetLiteralArgument<String>(args[2], "compression_method"));
6666
}
6767
paths = {path};
6868
}

src/Storages/ObjectStorage/ReadBufferIterator.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ ReadBufferIterator::ReadBufferIterator(
3838
, read_keys(read_keys_)
3939
, prev_read_keys_size(read_keys_.size())
4040
{
41-
if (configuration->format != "auto")
42-
format = configuration->format;
41+
if (configuration->getFormat() != "auto")
42+
format = configuration->getFormat();
4343
}
4444

4545
SchemaCache::Key ReadBufferIterator::getKeyForSchemaCache(const ObjectInfo & object_info, const String & format_name) const
@@ -152,7 +152,7 @@ std::unique_ptr<ReadBuffer> ReadBufferIterator::recreateLastReadBuffer()
152152
const auto & path = current_object_info->isArchive() ? current_object_info->getPathToArchive() : current_object_info->getPath();
153153
auto impl = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, context, getLogger("ReadBufferIterator"));
154154

155-
const auto compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->compression_method);
155+
const auto compression_method = chooseCompressionMethod(current_object_info->getFileName(), configuration->getCompressionMethod());
156156
const auto zstd_window = static_cast<int>(context->getSettingsRef()[Setting::zstd_window_log_max]);
157157

158158
return wrapReadBufferWithCompressionMethod(std::move(impl), compression_method, zstd_window);
@@ -268,13 +268,13 @@ ReadBufferIterator::Data ReadBufferIterator::next()
268268
using ObjectInfoInArchive = StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive;
269269
if (const auto * object_info_in_archive = dynamic_cast<const ObjectInfoInArchive *>(current_object_info.get()))
270270
{
271-
compression_method = chooseCompressionMethod(filename, configuration->compression_method);
271+
compression_method = chooseCompressionMethod(filename, configuration->getCompressionMethod());
272272
const auto & archive_reader = object_info_in_archive->archive_reader;
273273
read_buf = archive_reader->readFile(object_info_in_archive->path_in_archive, /*throw_on_not_found=*/true);
274274
}
275275
else
276276
{
277-
compression_method = chooseCompressionMethod(filename, configuration->compression_method);
277+
compression_method = chooseCompressionMethod(filename, configuration->getCompressionMethod());
278278
read_buf = StorageObjectStorageSource::createReadBuffer(*current_object_info, object_storage, getContext(), getLogger("ReadBufferIterator"));
279279
}
280280

src/Storages/ObjectStorage/S3/Configuration.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -180,9 +180,9 @@ void StorageS3Configuration::fromNamedCollection(const NamedCollection & collect
180180
auth_settings[S3AuthSetting::expiration_window_seconds] = collection.getOrDefault<UInt64>("expiration_window_seconds", S3::DEFAULT_EXPIRATION_WINDOW_SECONDS);
181181
auth_settings[S3AuthSetting::session_token] = collection.getOrDefault<String>("session_token", "");
182182

183-
format = collection.getOrDefault<String>("format", format);
184-
compression_method = collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto"));
185-
structure = collection.getOrDefault<String>("structure", "auto");
183+
setFormat(collection.getOrDefault<String>("format", getFormat()));
184+
setCompressionMethod(collection.getOrDefault<String>("compression_method", collection.getOrDefault<String>("compression", "auto")));
185+
setStructure(collection.getOrDefault<String>("structure", "auto"));
186186

187187
request_settings = S3::S3RequestSettings(collection, settings, /* validate_settings */true);
188188

@@ -369,14 +369,14 @@ void StorageS3Configuration::fromAST(ASTs & args, ContextPtr context, bool with_
369369
/// Set format to configuration only of it's not 'auto',
370370
/// because we can have default format set in configuration.
371371
if (format_ != "auto")
372-
format = format_;
372+
setFormat(format_);
373373
}
374374

375375
if (engine_args_to_idx.contains("structure"))
376-
structure = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["structure"]], "structure");
376+
setStructure(checkAndGetLiteralArgument<String>(args[engine_args_to_idx["structure"]], "structure"));
377377

378378
if (engine_args_to_idx.contains("compression_method"))
379-
compression_method = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["compression_method"]], "compression_method");
379+
setCompressionMethod(checkAndGetLiteralArgument<String>(args[engine_args_to_idx["compression_method"]], "compression_method"));
380380

381381
if (engine_args_to_idx.contains("access_key_id"))
382382
auth_settings[S3AuthSetting::access_key_id] = checkAndGetLiteralArgument<String>(args[engine_args_to_idx["access_key_id"]], "access_key_id");
@@ -602,10 +602,10 @@ ASTPtr StorageS3Configuration::createArgsWithAccessData() const
602602
arguments->children.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::secret_access_key].value));
603603
if (!auth_settings[S3AuthSetting::session_token].value.empty())
604604
arguments->children.push_back(std::make_shared<ASTLiteral>(auth_settings[S3AuthSetting::session_token].value));
605-
if (format != "auto")
606-
arguments->children.push_back(std::make_shared<ASTLiteral>(format));
607-
if (!compression_method.empty())
608-
arguments->children.push_back(std::make_shared<ASTLiteral>(compression_method));
605+
if (getFormat() != "auto")
606+
arguments->children.push_back(std::make_shared<ASTLiteral>(getFormat()));
607+
if (!getCompressionMethod().empty())
608+
arguments->children.push_back(std::make_shared<ASTLiteral>(getCompressionMethod()));
609609
}
610610

611611
return arguments;

src/Storages/ObjectStorage/StorageObjectStorage.cpp

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ StorageObjectStorage::StorageObjectStorage(
9999
, distributed_processing(distributed_processing_)
100100
, log(getLogger(fmt::format("Storage{}({})", configuration->getEngineName(), table_id_.getFullTableName())))
101101
{
102-
bool do_lazy_init = lazy_init && !columns_.empty() && !configuration->format.empty();
102+
bool do_lazy_init = lazy_init && !columns_.empty() && !configuration->getFormat().empty();
103103
bool failed_init = false;
104104
auto do_init = [&]()
105105
{
@@ -114,7 +114,7 @@ StorageObjectStorage::StorageObjectStorage(
114114
{
115115
// If we don't have format or schema yet, we can't ignore failed configuration update,
116116
// because relevant configuration is crucial for format and schema inference
117-
if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->format == "auto"))
117+
if (mode <= LoadingStrictnessLevel::CREATE || columns_.empty() || (configuration->getFormat() == "auto"))
118118
{
119119
throw;
120120
}
@@ -131,7 +131,7 @@ StorageObjectStorage::StorageObjectStorage(
131131

132132
std::string sample_path;
133133
ColumnsDescription columns{columns_};
134-
resolveSchemaAndFormat(columns, configuration->format, object_storage, configuration, format_settings, sample_path, context);
134+
resolveSchemaAndFormat(columns, object_storage, configuration, format_settings, sample_path, context);
135135
configuration->check(context);
136136

137137
StorageInMemoryMetadata metadata;
@@ -171,17 +171,17 @@ String StorageObjectStorage::getName() const
171171

172172
bool StorageObjectStorage::prefersLargeBlocks() const
173173
{
174-
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration->format);
174+
return FormatFactory::instance().checkIfOutputFormatPrefersLargeBlocks(configuration->getFormat());
175175
}
176176

177177
bool StorageObjectStorage::parallelizeOutputAfterReading(ContextPtr context) const
178178
{
179-
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->format, context);
179+
return FormatFactory::instance().checkParallelizeOutputAfterReading(configuration->getFormat(), context);
180180
}
181181

182182
bool StorageObjectStorage::supportsSubsetOfColumns(const ContextPtr & context) const
183183
{
184-
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->format, context, format_settings);
184+
return FormatFactory::instance().checkIfFormatSupportsSubsetOfColumns(configuration->getFormat(), context, format_settings);
185185
}
186186

187187
void StorageObjectStorage::Configuration::update(ObjectStoragePtr object_storage_ptr, ContextPtr context)
@@ -515,7 +515,7 @@ ColumnsDescription StorageObjectStorage::resolveSchemaFromData(
515515

516516
ObjectInfos read_keys;
517517
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
518-
auto schema = readSchemaFromFormat(configuration->format, format_settings, *iterator, context);
518+
auto schema = readSchemaFromFormat(configuration->getFormat(), format_settings, *iterator, context);
519519
sample_path = iterator->getLastFilePath();
520520
return schema;
521521
}
@@ -536,7 +536,7 @@ std::string StorageObjectStorage::resolveFormatFromData(
536536

537537
std::pair<ColumnsDescription, std::string> StorageObjectStorage::resolveSchemaAndFormatFromData(
538538
const ObjectStoragePtr & object_storage,
539-
const ConfigurationPtr & configuration,
539+
ConfigurationPtr & configuration,
540540
const std::optional<FormatSettings> & format_settings,
541541
std::string & sample_path,
542542
const ContextPtr & context)
@@ -545,13 +545,13 @@ std::pair<ColumnsDescription, std::string> StorageObjectStorage::resolveSchemaAn
545545
auto iterator = createReadBufferIterator(object_storage, configuration, format_settings, read_keys, context);
546546
auto [columns, format] = detectFormatAndReadSchema(format_settings, *iterator, context);
547547
sample_path = iterator->getLastFilePath();
548-
configuration->format = format;
548+
configuration->setFormat(format);
549549
return std::pair(columns, format);
550550
}
551551

552552
void StorageObjectStorage::addInferredEngineArgsToCreateQuery(ASTs & args, const ContextPtr & context) const
553553
{
554-
configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->format, context, /*with_structure=*/false);
554+
configuration->addStructureAndFormatToArgsIfNeeded(args, "", configuration->getFormat(), context, /*with_structure=*/false);
555555
}
556556

557557
SchemaCache & StorageObjectStorage::getSchemaCache(const ContextPtr & context, const std::string & storage_type_name)

0 commit comments

Comments
 (0)