Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/Core/Settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7103,9 +7103,6 @@ Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distri
)", EXPERIMENTAL) \
DECLARE(Bool, allow_experimental_iceberg_read_optimization, true, R"(
Allow Iceberg read optimization based on Iceberg metadata.
)", EXPERIMENTAL) \
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
Allow retries in cluster request, when one node goes offline
)", EXPERIMENTAL) \
DECLARE(Bool, object_storage_remote_initiator, false, R"(
Execute request to object storage as remote on one of object_storage_cluster nodes.
Expand Down Expand Up @@ -7227,6 +7224,7 @@ Sets the evaluation time to be used with promql dialect. 'auto' means the curren
MAKE_OBSOLETE(M, Bool, allow_experimental_shared_set_join, true) \
MAKE_OBSOLETE(M, UInt64, min_external_sort_block_bytes, 100_MiB) \
MAKE_OBSOLETE(M, UInt64, distributed_cache_read_alignment, 0) \
MAKE_OBSOLETE(M, Bool, allow_retries_in_cluster_requests, false) \
/** The section above is for obsolete settings. Do not add anything there. */
#endif /// __CLION_IDE__

Expand Down
25 changes: 2 additions & 23 deletions src/QueryPipeline/RemoteQueryExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ namespace Setting
extern const SettingsBool use_hedged_requests;
extern const SettingsBool push_external_roles_in_interserver_queries;
extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms;
extern const SettingsBool allow_retries_in_cluster_requests;
}

namespace ErrorCodes
Expand Down Expand Up @@ -83,7 +82,6 @@ RemoteQueryExecutor::RemoteQueryExecutor(
, extension(extension_)
, priority_func(priority_func_)
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
{
if (stage == QueryProcessingStage::QueryPlan && !query_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage");
Expand Down Expand Up @@ -468,8 +466,7 @@ int RemoteQueryExecutor::sendQueryAsync()
read_context = std::make_unique<ReadContext>(
*this,
/*suspend_when_query_sent*/ true,
read_packet_type_separately,
allow_retries_in_cluster_requests);
read_packet_type_separately);

/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
/// because we can still be in process of sending scalars or external tables.
Expand Down Expand Up @@ -542,8 +539,7 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
read_context = std::make_unique<ReadContext>(
*this,
/*suspend_when_query_sent*/ false,
read_packet_type_separately,
allow_retries_in_cluster_requests);
read_packet_type_separately);
recreate_read_context = false;
}

Expand Down Expand Up @@ -734,18 +730,6 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
break;

case Protocol::Server::ConnectionLost:
if (allow_retries_in_cluster_requests)
{
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
{
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
{
finished = true;
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
return ReadResult(Block{});
}
}
}
packet.exception->rethrow();
break;

Expand Down Expand Up @@ -1016,11 +1000,6 @@ void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback)
profile_info_callback = std::move(callback);
}

bool RemoteQueryExecutor::skipUnavailableShards() const
{
return context->getSettingsRef()[Setting::skip_unavailable_shards];
}

bool RemoteQueryExecutor::needToSkipUnavailableShard() const
{
return context->getSettingsRef()[Setting::skip_unavailable_shards] && (0 == connections->size());
Expand Down
4 changes: 0 additions & 4 deletions src/QueryPipeline/RemoteQueryExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,8 +233,6 @@ class RemoteQueryExecutor

IConnections & getConnections() { return *connections; }

bool skipUnavailableShards() const;

bool needToSkipUnavailableShard() const;

bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }
Expand Down Expand Up @@ -339,8 +337,6 @@ class RemoteQueryExecutor

const bool read_packet_type_separately = false;

const bool allow_retries_in_cluster_requests = false;

std::unordered_set<size_t> replica_has_processed_data;

/// Send all scalars to remote servers
Expand Down
52 changes: 11 additions & 41 deletions src/QueryPipeline/RemoteQueryExecutorReadContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@ namespace ErrorCodes
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
RemoteQueryExecutor & executor_,
bool suspend_when_query_sent_,
bool read_packet_type_separately_,
bool allow_retries_in_cluster_requests_)
bool read_packet_type_separately_)
: AsyncTaskExecutor(std::make_unique<Task>(*this))
, executor(executor_)
, suspend_when_query_sent(suspend_when_query_sent_)
, read_packet_type_separately(read_packet_type_separately_)
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
{
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
Expand Down Expand Up @@ -59,49 +57,21 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
if (read_context.executor.needToSkipUnavailableShard())
return;

try
while (true)
{
while (true)
{
try
{
read_context.has_read_packet_part = PacketPart::None;

if (read_context.read_packet_type_separately)
{
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Type;
suspend_callback();
}
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;
if (read_context.packet.type == Protocol::Server::Data)
read_context.has_data_packets = true;
}
catch (const Exception & e)
{
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
/// If initiator did not process any data packets before, this fact can be ignored.
/// Unprocessed tasks will be executed on other nodes.
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
&& !read_context.has_data_packets.load() && read_context.executor.skipUnavailableShards())
{
read_context.has_read_packet_part = PacketPart::None;
}
else
throw;
}
read_context.has_read_packet_part = PacketPart::None;

if (read_context.read_packet_type_separately)
{
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Type;
suspend_callback();
Comment on lines +85 to +92
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge ConnectionLost packet never marked as readable

When handling an early EOF from a shard (ATTEMPT_TO_READ_AFTER_EOF with skipUnavailableShards()), the new branch builds a ConnectionLost packet but leaves has_read_packet_part at PacketPart::None. RemoteQueryExecutor::readAsync() later unconditionally calls getPacket() for the completed read and chasserts hasReadPacket() (see RemoteQueryExecutor.cpp around lines 590-595 and RemoteQueryExecutorReadContext.cpp lines 205-208). In this shutdown-before-data scenario, the code will now hit that assert instead of cleanly rescheduling/ignoring the shard. The packet should be marked as a body (or handled without getPacket()) before suspending, mirroring other packet paths.

Useful? React with 👍 / 👎.

}
}
catch (const Exception &)
{
if (!read_context.allow_retries_in_cluster_requests)
throw;
read_context.packet.type = Protocol::Server::ConnectionLost;
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
read_context.has_read_packet_part = PacketPart::Body;
if (read_context.packet.type == Protocol::Server::Data)
read_context.has_data_packets = true;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge skip_unavailable_shards no longer masks early socket EOF

When a shard connection drops before any data is received (e.g., node crash) and skip_unavailable_shards is enabled, receivePacketUnlocked now throws ATTEMPT_TO_READ_AFTER_EOF and the exception bubbles out because Task::run no longer catches it. The prior logic swallowed this specific EOF when no data packets had arrived so the shard could be skipped; after this change the query fails instead of continuing on remaining shards, breaking the advertised skip behavior.

Useful? React with 👍 / 👎.


suspend_callback();
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/QueryPipeline/RemoteQueryExecutorReadContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
explicit RemoteQueryExecutorReadContext(
RemoteQueryExecutor & executor_,
bool suspend_when_query_sent_,
bool read_packet_type_separately_,
bool allow_retries_in_cluster_requests_);
bool read_packet_type_separately_);

~RemoteQueryExecutorReadContext() override;

Expand Down Expand Up @@ -112,7 +111,6 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
bool suspend_when_query_sent = false;
bool is_query_sent = false;
const bool read_packet_type_separately = false;
const bool allow_retries_in_cluster_requests = false;
};

}
Expand Down
Loading