Skip to content

Commit fefd2c0

Browse files
authored
Merge pull request #797 from Altinity/feature/antalya-25.3/rendezvous_hashing
25.3 Antalya port of #709, #760 - Rendezvous hashing
2 parents 43718e5 + 0ece70e commit fefd2c0

14 files changed

Lines changed: 301 additions & 31 deletions

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -738,8 +738,12 @@ void RemoteQueryExecutor::processReadTaskRequest()
738738
if (!extension || !extension->task_iterator)
739739
throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized");
740740

741+
if (!extension->replica_info)
742+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Replica info is not initialized");
743+
741744
ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived);
742-
auto response = (*extension->task_iterator)();
745+
746+
auto response = (*extension->task_iterator)(extension->replica_info->number_of_current_replica);
743747
connections->sendReadTaskResponse(response);
744748
}
745749

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class RemoteQueryExecutorReadContext;
2828
class ParallelReplicasReadingCoordinator;
2929

3030
/// This is the same type as StorageS3Source::IteratorWrapper
31-
using TaskIterator = std::function<String()>;
31+
using TaskIterator = std::function<String(size_t)>;
3232

3333
/// This class allows one to launch queries on remote replicas of one shard and get results
3434
class RemoteQueryExecutor

src/Storages/IStorageCluster.cpp

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ void ReadFromCluster::createExtension(const ActionsDAG::Node * predicate)
118118
if (extension)
119119
return;
120120

121-
extension = storage->getTaskIteratorExtension(predicate, context);
121+
extension = storage->getTaskIteratorExtension(predicate, context, cluster);
122122
}
123123

124124
/// The code executes on initiator
@@ -206,8 +206,6 @@ SinkToStoragePtr IStorageCluster::write(
206206

207207
void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
208208
{
209-
createExtension(nullptr);
210-
211209
const Scalars & scalars = context->hasQueryContext() ? context->getQueryContext()->getScalars() : Scalars{};
212210
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
213211

@@ -220,6 +218,10 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
220218
if (current_settings[Setting::max_parallel_replicas] > 1)
221219
max_replicas_to_use = std::min(max_replicas_to_use, current_settings[Setting::max_parallel_replicas].value);
222220

221+
size_t replica_index = 0;
222+
223+
createExtension(nullptr);
224+
223225
for (const auto & shard_info : cluster->getShardsInfo())
224226
{
225227
if (pipes.size() >= max_replicas_to_use)
@@ -237,6 +239,8 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
237239
if (try_results.empty())
238240
continue;
239241

242+
IConnections::ReplicaInfo replica_info{ .number_of_current_replica = replica_index++ };
243+
240244
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
241245
std::vector<IConnectionPool::Entry>{try_results.front()},
242246
query_to_send->formatWithSecretsOneLine(),
@@ -246,7 +250,7 @@ void ReadFromCluster::initializePipeline(QueryPipelineBuilder & pipeline, const
246250
scalars,
247251
Tables(),
248252
processed_stage,
249-
extension);
253+
RemoteQueryExecutor::Extension{.task_iterator = extension->task_iterator, .replica_info = std::move(replica_info)});
250254

251255
remote_query_executor->setLogger(log);
252256
pipes.emplace_back(std::make_shared<RemoteSource>(

src/Storages/IStorageCluster.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,10 @@ class IStorageCluster : public IStorage
4040
ClusterPtr getCluster(ContextPtr context) const { return getClusterImpl(context, cluster_name); }
4141

4242
/// Query is needed for pruning by virtual columns (_file, _path)
43-
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const = 0;
43+
virtual RemoteQueryExecutor::Extension getTaskIteratorExtension(
44+
const ActionsDAG::Node * predicate,
45+
const ContextPtr & context,
46+
ClusterPtr cluster) const = 0;
4447

4548
QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override;
4649

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <Storages/ObjectStorage/Utils.h>
2020
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
2121
#include <Storages/extractTableFunctionFromSelectQuery.h>
22+
#include <Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h>
2223

2324
namespace DB
2425
{
@@ -357,24 +358,34 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
357358
}
358359

359360
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
360-
const ActionsDAG::Node * predicate, const ContextPtr & local_context) const
361+
const ActionsDAG::Node * predicate,
362+
const ContextPtr & local_context,
363+
ClusterPtr cluster) const
361364
{
362365
auto iterator = StorageObjectStorageSource::createFileIterator(
363366
configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false,
364367
local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback(), /*ignore_archive_globs=*/true);
365368

366-
auto callback = std::make_shared<std::function<String()>>([iterator]() mutable -> String
369+
std::vector<std::string> ids_of_hosts;
370+
for (const auto & shard : cluster->getShardsInfo())
367371
{
368-
auto object_info = iterator->next(0);
369-
if (!object_info)
370-
return "";
372+
if (shard.per_replica_pools.empty())
373+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {} with empty shard {}", cluster->getName(), shard.shard_num);
374+
for (const auto & replica : shard.per_replica_pools)
375+
{
376+
if (!replica)
377+
throw Exception(ErrorCodes::LOGICAL_ERROR, "Cluster {}, shard {} with empty node", cluster->getName(), shard.shard_num);
378+
ids_of_hosts.push_back(replica->getAddress());
379+
}
380+
}
381+
382+
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
371383

372-
auto archive_object_info = std::dynamic_pointer_cast<StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive>(object_info);
373-
if (archive_object_info)
374-
return archive_object_info->getPathToArchive();
384+
auto callback = std::make_shared<TaskIterator>(
385+
[task_distributor](size_t number_of_current_replica) mutable -> String {
386+
return task_distributor->getNextTask(number_of_current_replica).value_or("");
387+
});
375388

376-
return object_info->getPath();
377-
});
378389
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
379390
}
380391

src/Storages/ObjectStorage/StorageObjectStorageCluster.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ class StorageObjectStorageCluster : public IStorageCluster
3030
std::string getName() const override;
3131

3232
RemoteQueryExecutor::Extension getTaskIteratorExtension(
33-
const ActionsDAG::Node * predicate, const ContextPtr & context) const override;
33+
const ActionsDAG::Node * predicate,
34+
const ContextPtr & context,
35+
ClusterPtr cluster) const override;
3436

3537
String getPathSample(StorageInMemoryMetadata metadata, ContextPtr context);
3638

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
#include "StorageObjectStorageStableTaskDistributor.h"
2+
#include <Common/SipHash.h>
3+
#include <consistent_hashing.h>
4+
#include <optional>
5+
6+
namespace DB
7+
{
8+
9+
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
10+
std::shared_ptr<IObjectIterator> iterator_,
11+
std::vector<std::string> ids_of_nodes_)
12+
: iterator(std::move(iterator_))
13+
, connection_to_files(ids_of_nodes_.size())
14+
, ids_of_nodes(ids_of_nodes_)
15+
, iterator_exhausted(false)
16+
{
17+
}
18+
19+
std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
20+
{
21+
LOG_TRACE(
22+
log,
23+
"Received a new connection from replica {} looking for a file",
24+
number_of_current_replica
25+
);
26+
27+
// 1. Check pre-queued files first
28+
if (auto file = getPreQueuedFile(number_of_current_replica))
29+
return file;
30+
31+
// 2. Try to find a matching file from the iterator
32+
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
33+
return file;
34+
35+
// 3. Process unprocessed files if iterator is exhausted
36+
return getAnyUnprocessedFile(number_of_current_replica);
37+
}
38+
39+
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
40+
{
41+
size_t nodes_count = ids_of_nodes.size();
42+
43+
/// Trivial case
44+
if (nodes_count < 2)
45+
return 0;
46+
47+
/// Rendezvous hashing
48+
size_t best_id = 0;
49+
UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path);
50+
for (size_t id = 1; id < nodes_count; ++id)
51+
{
52+
UInt64 weight = sipHash64(ids_of_nodes[id] + file_path);
53+
if (weight > best_weight)
54+
{
55+
best_weight = weight;
56+
best_id = id;
57+
}
58+
}
59+
return best_id;
60+
}
61+
62+
std::optional<String> StorageObjectStorageStableTaskDistributor::getPreQueuedFile(size_t number_of_current_replica)
63+
{
64+
std::lock_guard lock(mutex);
65+
66+
if (connection_to_files.size() <= number_of_current_replica)
67+
throw Exception(
68+
ErrorCodes::LOGICAL_ERROR,
69+
"Replica number {} is out of range. Expected range: [0, {})",
70+
number_of_current_replica,
71+
connection_to_files.size()
72+
);
73+
74+
auto & files = connection_to_files[number_of_current_replica];
75+
76+
while (!files.empty())
77+
{
78+
String next_file = files.back();
79+
files.pop_back();
80+
81+
auto it = unprocessed_files.find(next_file);
82+
if (it == unprocessed_files.end())
83+
continue;
84+
85+
unprocessed_files.erase(it);
86+
87+
LOG_TRACE(
88+
log,
89+
"Assigning pre-queued file {} to replica {}",
90+
next_file,
91+
number_of_current_replica
92+
);
93+
94+
return next_file;
95+
}
96+
97+
return std::nullopt;
98+
}
99+
100+
std::optional<String> StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(size_t number_of_current_replica)
101+
{
102+
{
103+
std::lock_guard lock(mutex);
104+
if (iterator_exhausted)
105+
return std::nullopt;
106+
}
107+
108+
while (true)
109+
{
110+
ObjectInfoPtr object_info;
111+
112+
{
113+
std::lock_guard lock(mutex);
114+
object_info = iterator->next(0);
115+
116+
if (!object_info)
117+
{
118+
iterator_exhausted = true;
119+
break;
120+
}
121+
}
122+
123+
String file_path;
124+
125+
auto archive_object_info = std::dynamic_pointer_cast<StorageObjectStorageSource::ArchiveIterator::ObjectInfoInArchive>(object_info);
126+
if (archive_object_info)
127+
{
128+
file_path = archive_object_info->getPathToArchive();
129+
}
130+
else
131+
{
132+
file_path = object_info->getPath();
133+
}
134+
135+
size_t file_replica_idx = getReplicaForFile(file_path);
136+
if (file_replica_idx == number_of_current_replica)
137+
{
138+
LOG_TRACE(
139+
log,
140+
"Found file {} for replica {}",
141+
file_path,
142+
number_of_current_replica
143+
);
144+
145+
return file_path;
146+
}
147+
148+
// Queue file for its assigned replica
149+
{
150+
std::lock_guard lock(mutex);
151+
unprocessed_files.insert(file_path);
152+
connection_to_files[file_replica_idx].push_back(file_path);
153+
}
154+
}
155+
156+
return std::nullopt;
157+
}
158+
159+
std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(size_t number_of_current_replica)
160+
{
161+
std::lock_guard lock(mutex);
162+
163+
if (!unprocessed_files.empty())
164+
{
165+
auto it = unprocessed_files.begin();
166+
String next_file = *it;
167+
unprocessed_files.erase(it);
168+
169+
LOG_TRACE(
170+
log,
171+
"Iterator exhausted. Assigning unprocessed file {} to replica {}",
172+
next_file,
173+
number_of_current_replica
174+
);
175+
176+
return next_file;
177+
}
178+
179+
return std::nullopt;
180+
}
181+
182+
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#pragma once
2+
3+
#include <Client/Connection.h>
4+
#include <Common/Logger.h>
5+
#include <Interpreters/Cluster.h>
6+
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
7+
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
8+
#include <unordered_set>
9+
#include <vector>
10+
#include <mutex>
11+
#include <memory>
12+
13+
namespace DB
14+
{
15+
16+
class StorageObjectStorageStableTaskDistributor
17+
{
18+
public:
19+
StorageObjectStorageStableTaskDistributor(
20+
std::shared_ptr<IObjectIterator> iterator_,
21+
std::vector<std::string> ids_of_nodes_);
22+
23+
std::optional<String> getNextTask(size_t number_of_current_replica);
24+
25+
private:
26+
size_t getReplicaForFile(const String & file_path);
27+
std::optional<String> getPreQueuedFile(size_t number_of_current_replica);
28+
std::optional<String> getMatchingFileFromIterator(size_t number_of_current_replica);
29+
std::optional<String> getAnyUnprocessedFile(size_t number_of_current_replica);
30+
31+
std::shared_ptr<IObjectIterator> iterator;
32+
33+
std::vector<std::vector<String>> connection_to_files;
34+
std::unordered_set<String> unprocessed_files;
35+
36+
std::vector<std::string> ids_of_nodes;
37+
38+
std::mutex mutex;
39+
bool iterator_exhausted = false;
40+
41+
LoggerPtr log = getLogger("StorageClusterTaskDistributor");
42+
};
43+
44+
}

0 commit comments

Comments
 (0)