forked from ClickHouse/ClickHouse
-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathSelectStreamFactory.h
More file actions
133 lines (106 loc) · 3.69 KB
/
SelectStreamFactory.h
File metadata and controls
133 lines (106 loc) · 3.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#pragma once
#include <Analyzer/IQueryTreeNode.h>
#include <Client/ConnectionPool.h>
#include <Core/QueryProcessingStage.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/StorageID.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/StorageSnapshot.h>
namespace DB
{
struct Settings;
class Cluster;
class Throttler;
struct SelectQueryInfo;
class Pipe;
using Pipes = std::vector<Pipe>;
class QueryPlan;
using QueryPlanPtr = std::unique_ptr<QueryPlan>;
struct StorageID;
class PreparedSets;
using PreparedSetsPtr = std::shared_ptr<PreparedSets>;
class PlannerContext;
using PlannerContextPtr = std::shared_ptr<PlannerContext>;
namespace ClusterProxy
{
/// select query has database, table and table function names as AST pointers
/// Creates a copy of query, changes database, table and table function names.
ASTPtr rewriteSelectQuery(
ContextPtr context,
const ASTPtr & query,
const std::string & remote_database,
const std::string & remote_table,
ASTPtr table_function_ptr = nullptr,
ASTPtr additional_filter = nullptr);
using ColumnsDescriptionByShardNum = std::unordered_map<UInt32, ColumnsDescription>;
using AdditionalShardFilterGenerator = std::function<ASTPtr(uint64_t)>;
class SelectStreamFactory
{
public:
struct Shard
{
/// Query and header may be changed depending on shard.
ASTPtr query;
QueryTreeNodePtr query_tree;
PlannerContextPtr planner_context;
std::shared_ptr<QueryPlan> query_plan;
/// Used to check the table existence on remote node
StorageID main_table;
Block header;
bool has_missing_objects = false;
Cluster::ShardInfo shard_info;
/// If we connect to replicas lazily.
/// (When there is a local replica with big delay).
bool lazy = false;
AdditionalShardFilterGenerator shard_filter_generator{};
};
using Shards = std::vector<Shard>;
SelectStreamFactory(
const Block & header_,
const ColumnsDescriptionByShardNum & objects_by_shard_,
const StorageSnapshotPtr & storage_snapshot_,
QueryProcessingStage::Enum processed_stage_);
void createForShard(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator);
void createForShard(
const Cluster::ShardInfo & shard_info,
const QueryTreeNodePtr & query_tree,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator);
const Block header;
const ColumnsDescriptionByShardNum objects_by_shard;
const StorageSnapshotPtr storage_snapshot;
QueryProcessingStage::Enum processed_stage;
private:
void createForShardImpl(
const Cluster::ShardInfo & shard_info,
const ASTPtr & query_ast,
const QueryTreeNodePtr & query_tree,
const StorageID & main_table,
const ASTPtr & table_func_ptr,
ContextPtr context,
std::vector<QueryPlanPtr> & local_plans,
Shards & remote_shards,
UInt32 shard_count,
bool parallel_replicas_enabled,
AdditionalShardFilterGenerator shard_filter_generator,
bool has_missing_objects = false);
};
}
}