Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@
# include <azure/core/diagnostics/logger.hpp>
#endif

#if USE_PARQUET
# include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#endif


/// A minimal file used when the server is run without installation
constexpr unsigned char resource_embedded_xml[] =
Expand Down Expand Up @@ -415,6 +419,7 @@ namespace ServerSetting
extern const ServerSettingsUInt64 keeper_server_socket_send_timeout_sec;
extern const ServerSettingsString hdfs_libhdfs3_conf;
extern const ServerSettingsString config_file;
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
}

namespace ErrorCodes
Expand Down Expand Up @@ -2739,6 +2744,10 @@ try

auto replicas_reconnector = ReplicasReconnector::init(global_context);

#if USE_PARQUET
ParquetFileMetaDataCache::instance()->setMaxSizeInBytes(server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size]);
#endif

/// Set current database name before loading tables and databases because
/// system logs may copy global context.
std::string default_database = server_settings[ServerSetting::default_database];
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ enum class AccessType : uint8_t
M(SYSTEM_DROP_SCHEMA_CACHE, "SYSTEM CLEAR SCHEMA CACHE, SYSTEM DROP SCHEMA CACHE, DROP SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_FORMAT_SCHEMA_CACHE, "SYSTEM CLEAR FORMAT SCHEMA CACHE, SYSTEM DROP FORMAT SCHEMA CACHE, DROP FORMAT SCHEMA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_S3_CLIENT_CACHE, "SYSTEM CLEAR S3 CLIENT CACHE, SYSTEM DROP S3 CLIENT, DROP S3 CLIENT CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_PARQUET_METADATA_CACHE, "SYSTEM DROP PARQUET METADATA CACHE", GLOBAL, SYSTEM_DROP_CACHE) \
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \
Expand Down
3 changes: 2 additions & 1 deletion src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1298,7 +1298,8 @@ The server successfully detected this situation and will download merged part fr
M(RuntimeFilterRowsChecked, "Number of rows checked by JOIN Runtime Filters", ValueType::Number) \
M(RuntimeFilterRowsPassed, "Number of rows that passed (not filtered out by) JOIN Runtime Filters", ValueType::Number) \
M(RuntimeFilterRowsSkipped, "Number of rows in blocks that were skipped by JOIN Runtime Filters", ValueType::Number) \

M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \

#ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)
Expand Down
3 changes: 1 addition & 2 deletions src/Core/FormatFactorySettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -1527,8 +1527,7 @@ Allow to write information about geo columns in parquet metadata and encode colu
DECLARE(Bool, into_outfile_create_parent_directories, false, R"(
Automatically create parent directories when using INTO OUTFILE if they do not already exists.
)", 0) \


DECLARE(Bool, input_format_parquet_use_metadata_cache, true, R"(Enable parquet file metadata caching)", 0) \
// End of FORMAT_FACTORY_SETTINGS

#define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \
Expand Down
5 changes: 3 additions & 2 deletions src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,8 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
```xml
<skip_check_for_incorrect_settings>1</skip_check_for_incorrect_settings>
```
)", 0)
)", 0) \
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0)

/// Settings with a path are server settings with at least one layer of nesting that have a fixed structure (no lists, lists, enumerations, repetitions, ...).
#define LIST_OF_SERVER_SETTINGS_WITH_PATH(DECLARE, ALIAS) \
Expand Down Expand Up @@ -1542,7 +1543,7 @@ The policy on how to perform a scheduling of CPU slots specified by `concurrent_
DECLARE(UInt64, keeper_server_socket_receive_timeout_sec, DBMS_DEFAULT_RECEIVE_TIMEOUT_SEC, R"(Keeper socket receive timeout.)", 0, "keeper_server.socket_receive_timeout_sec") \
DECLARE(UInt64, keeper_server_socket_send_timeout_sec, DBMS_DEFAULT_SEND_TIMEOUT_SEC, R"(Keeper socket send timeout.)", 0, "keeper_server.socket_send_timeout_sec") \
DECLARE(String, hdfs_libhdfs3_conf, "", R"(Points libhdfs3 to the right location for its config.)", 0, "hdfs.libhdfs3_conf") \
DECLARE(String, config_file, "config.xml", R"(Points to the server config file.)", 0, "config-file")
DECLARE(String, config_file, "config.xml", R"(Points to the server config file.)", 0, "config-file")

// clang-format on

Expand Down
6 changes: 6 additions & 0 deletions src/Core/SettingsChangesHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", true, false, "It becomes obsolete."},
{"database_datalake_require_metadata_access", true, true, "New setting."},
{"automatic_parallel_replicas_min_bytes_per_replica", 0, 1_MiB, "Better default value derived from testing results"},
{"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"},
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Baselines generated with v25.12.1 (pre-release)

});
addSettingsChanges(settings_changes_history, "25.12",
{
Expand Down Expand Up @@ -434,6 +435,11 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
{"parallel_hash_join_threshold", 0, 0, "New setting"},
/// Release closed. Please use 25.4
});
addSettingsChanges(settings_changes_history, "24.12.2.20000",
{
// Altinity Antalya modifications atop of 24.12
{"input_format_parquet_use_metadata_cache", true, true, "New setting, turned ON by default"}, // https://github.com/Altinity/ClickHouse/pull/586
});
addSettingsChanges(settings_changes_history, "25.2",
{
/// Release closed. Please use 25.3
Expand Down
15 changes: 15 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@
#include <Formats/ProtobufSchemas.h>
#endif

#if USE_PARQUET
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#endif

#if USE_AWS_S3
#include <IO/S3/Client.h>
#endif
Expand Down Expand Up @@ -453,6 +457,16 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->clearQueryResultCache(query.query_result_cache_tag);
break;
}
case Type::DROP_PARQUET_METADATA_CACHE:
{
#if USE_PARQUET
getContext()->checkAccess(AccessType::SYSTEM_DROP_PARQUET_METADATA_CACHE);
ParquetFileMetaDataCache::instance()->clear();
break;
#else
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "The server was compiled without the support for Parquet");
#endif
}
case Type::CLEAR_COMPILED_EXPRESSION_CACHE:
#if USE_EMBEDDED_COMPILER
getContext()->checkAccess(AccessType::SYSTEM_DROP_COMPILED_EXPRESSION_CACHE);
Expand Down Expand Up @@ -1997,6 +2011,7 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
case Type::CLEAR_PAGE_CACHE:
case Type::CLEAR_SCHEMA_CACHE:
case Type::CLEAR_FORMAT_SCHEMA_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::CLEAR_S3_CLIENT_CACHE:
{
required_access.emplace_back(AccessType::SYSTEM_DROP_CACHE);
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,7 @@ void ASTSystemQuery::formatImpl(WriteBuffer & ostr, const FormatSettings & setti
case Type::CLEAR_TEXT_INDEX_CACHES:
case Type::CLEAR_COMPILED_EXPRESSION_CACHE:
case Type::CLEAR_S3_CLIENT_CACHE:
case Type::DROP_PARQUET_METADATA_CACHE:
case Type::CLEAR_ICEBERG_METADATA_CACHE:
case Type::RESET_COVERAGE:
case Type::RESTART_REPLICAS:
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
CLEAR_QUERY_CONDITION_CACHE,
CLEAR_QUERY_CACHE,
CLEAR_COMPILED_EXPRESSION_CACHE,
DROP_PARQUET_METADATA_CACHE,
CLEAR_ICEBERG_METADATA_CACHE,
CLEAR_FILESYSTEM_CACHE,
CLEAR_DISTRIBUTED_CACHE,
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <base/types.h>
#include <Core/BlockMissingValues.h>
#include <Processors/ISource.h>
#include <Core/Settings.h>


namespace DB
Expand Down Expand Up @@ -128,6 +129,9 @@ class IInputFormat : public ISource

void needOnlyCount() { need_only_count = true; }

/// Set additional info/key/id related to underlying storage of the ReadBuffer
virtual void setStorageRelatedUniqueKey(const Settings & /*settings*/, const String & /*key*/) {}

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

Expand Down
76 changes: 75 additions & 1 deletion src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

#if USE_PARQUET

#include <Core/Settings.h>
#include <Core/ServerSettings.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Common/setThreadName.h>
Expand All @@ -34,6 +37,7 @@
#include <Common/FieldAccurateComparison.h>
#include <Processors/Formats/Impl/Parquet/parquetBloomFilterHash.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>
#include <Interpreters/convertFieldToType.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Processors/Formats/Impl/ParquetV3BlockInputFormat.h>
Expand All @@ -45,6 +49,8 @@ namespace ProfileEvents
extern const Event ParquetFetchWaitTimeMicroseconds;
extern const Event ParquetReadRowGroups;
extern const Event ParquetPrunedRowGroups;
extern const Event ParquetMetaDataCacheHits;
extern const Event ParquetMetaDataCacheMisses;
}

namespace CurrentMetrics
Expand All @@ -61,6 +67,16 @@ namespace CurrentMetrics
namespace DB
{

namespace Setting
{
extern const SettingsBool input_format_parquet_use_metadata_cache;
}

namespace ServerSetting
{
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
}

namespace ErrorCodes
{
extern const int INCORRECT_DATA;
Expand Down Expand Up @@ -545,6 +561,49 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return hyperrectangle;
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::readMetadataFromFile()
{
createArrowFileIfNotCreated();
return parquet::ReadMetaData(arrow_file);
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData()
{
// in-memory cache is not implemented for local file operations, only for remote files
// there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation
// and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key
if (!metadata_cache.use_cache || metadata_cache.key.empty())
{
return readMetadataFromFile();
}

auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance()->getOrSet(
metadata_cache.key,
[&]()
{
return readMetadataFromFile();
}
);
if (loaded)
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
else
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);
return parquet_file_metadata;
}

void ParquetBlockInputFormat::createArrowFileIfNotCreated()
{
if (arrow_file)
{
return;
}

// Create arrow file adapter.
// TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that
// we'll need to read (which we know in advance). Use max_download_threads for that.
arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
}

std::unordered_set<std::size_t> getBloomFilterFilteringColumnKeys(const KeyCondition::RPN & rpn)
{
std::unordered_set<std::size_t> column_keys;
Expand Down Expand Up @@ -691,7 +750,7 @@ void ParquetBlockInputFormat::initializeIfNeeded()
if (is_stopped)
return;

metadata = parquet::ReadMetaData(arrow_file);
metadata = getFileMetaData();
if (buckets_to_read)
{
std::unordered_set<size_t> set_to_read(buckets_to_read->row_group_ids.begin(), buckets_to_read->row_group_ids.end());
Expand Down Expand Up @@ -807,6 +866,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
}
}

bool has_row_groups_to_read = false;

auto skip_row_group_based_on_filters = [&](int row_group)
{
if (!format_settings.parquet.filter_push_down && !format_settings.parquet.bloom_filter_push_down)
Expand Down Expand Up @@ -865,7 +926,20 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().total_bytes_compressed += row_group_size;
auto rows = adaptive_chunk_size(row_group);
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;

has_row_groups_to_read = true;
}

if (has_row_groups_to_read)
{
createArrowFileIfNotCreated();
}
}

void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const Settings & settings, const String & key_)
{
metadata_cache.key = key_;
metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache];
}

void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx)
Expand Down
15 changes: 15 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class ParquetBlockInputFormat : public IInputFormat
size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }

void setBucketsToRead(const FileBucketInfoPtr & buckets_to_read_) override;
void setStorageRelatedUniqueKey(const Settings & settings, const String & key_) override;

private:
Chunk read() override;
Expand All @@ -113,6 +114,13 @@ class ParquetBlockInputFormat : public IInputFormat

void threadFunction(size_t row_group_batch_idx);

void createArrowFileIfNotCreated();
std::shared_ptr<parquet::FileMetaData> readMetadataFromFile();

std::shared_ptr<parquet::FileMetaData> getFileMetaData();

inline bool supportPrefetch() const;

// Data layout in the file:
//
// row group 0
Expand Down Expand Up @@ -361,6 +369,13 @@ class ParquetBlockInputFormat : public IInputFormat
bool is_initialized = false;
std::optional<std::unordered_map<String, String>> parquet_names_to_clickhouse;
std::optional<std::unordered_map<String, String>> clickhouse_names_to_parquet;
struct Cache
{
String key;
bool use_cache = false;
};

Cache metadata_cache;
};

class ArrowParquetSchemaReader : public ISchemaReader
Expand Down
20 changes: 20 additions & 0 deletions src/Processors/Formats/Impl/ParquetFileMetaDataCache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#include <Processors/Formats/Impl/ParquetFileMetaDataCache.h>

#if USE_PARQUET

namespace DB
{

ParquetFileMetaDataCache::ParquetFileMetaDataCache()
: CacheBase<String, parquet::FileMetaData>(CurrentMetrics::end(), CurrentMetrics::end(), 0)
{}

ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance()
{
static ParquetFileMetaDataCache instance;
return &instance;
}

}

#endif
30 changes: 30 additions & 0 deletions src/Processors/Formats/Impl/ParquetFileMetaDataCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#pragma once

#include "config.h"

#if USE_PARQUET

namespace parquet
{

class FileMetaData;

}

#include <Common/CacheBase.h>

namespace DB
{

class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
{
public:
static ParquetFileMetaDataCache * instance();

private:
ParquetFileMetaDataCache();
};

}

#endif
Loading
Loading