Skip to content
Merged
Show file tree
Hide file tree
Changes from 37 commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
37c8553
add cdc client read data
JNSimba Nov 24, 2025
cd4e1e9
add a part of stream load sink
JNSimba Nov 25, 2025
4b56052
fix write split chunk api
JNSimba Nov 26, 2025
03fa875
add be request cdc client
JNSimba Nov 28, 2025
aa85742
add fe rpc interface and test
JNSimba Nov 28, 2025
c12ca09
fix cdc client manager compile
JNSimba Nov 28, 2025
3f90ae6
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Nov 28, 2025
56ac92c
add streaming job split chunks and streaming multi task
JNSimba Dec 2, 2025
38507ae
add fe create streamt multi ask
JNSimba Dec 4, 2025
3c2d358
fix be forward request
JNSimba Dec 5, 2025
124d2ae
add multi table sync and checkstyle
JNSimba Dec 5, 2025
06c8ae7
add build script
JNSimba Dec 6, 2025
8f8f676
update check style
JNSimba Dec 6, 2025
59a5cf2
fix be fork
JNSimba Dec 8, 2025
2b783cd
fix offset bug
JNSimba Dec 8, 2025
6b2443f
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Dec 8, 2025
79a71ed
fix create table and offset consumer bug
JNSimba Dec 9, 2025
4f3a1e5
add case for mysql sync
JNSimba Dec 10, 2025
8d9b14b
Merge branch 'master' into mysql-cdc
JNSimba Dec 10, 2025
f63e984
code style
JNSimba Dec 10, 2025
f4e1a9e
fix multi table bug
JNSimba Dec 10, 2025
0a27f8a
fix case
JNSimba Dec 11, 2025
7748211
fix
JNSimba Dec 11, 2025
b26a7b2
fix
JNSimba Dec 11, 2025
91c3bd2
fix
JNSimba Dec 11, 2025
3f8813d
extend to entity fe-common
JNSimba Dec 11, 2025
cfff8f9
extend fe model to common
JNSimba Dec 11, 2025
948aaf8
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Dec 11, 2025
2dd280b
rename cdc clientmgr
JNSimba Dec 15, 2025
84a87f3
add create alter case
JNSimba Dec 15, 2025
4c30029
fix restart case
JNSimba Dec 15, 2025
3576a29
add priv and dup case
JNSimba Dec 16, 2025
2007303
add exclude tbls
JNSimba Dec 16, 2025
96405ff
fix delete job meta tbl when drop job
JNSimba Dec 17, 2025
0aff3ec
Merge branch 'master' into mysql-cdc
JNSimba Dec 17, 2025
59ae5d7
fix params
JNSimba Dec 17, 2025
4713bbc
be format
JNSimba Dec 17, 2025
f9cc4ff
add all type case
JNSimba Dec 18, 2025
4bdb638
fix
JNSimba Dec 18, 2025
9210541
fix
JNSimba Dec 18, 2025
d0c9124
fix be ut
JNSimba Dec 18, 2025
f8103f7
fix get split more concurrency
JNSimba Dec 19, 2025
985a736
Merge branch 'mysql-cdc' of https://github.com/JNSimba/doris into mys…
JNSimba Dec 19, 2025
3b2ff9a
fix
JNSimba Dec 19, 2025
585a0cb
fix be coverage
JNSimba Dec 20, 2025
e3d2aca
fix ut
JNSimba Dec 20, 2025
ba9ea3a
fix ut
JNSimba Dec 20, 2025
185aa0f
fix ut
JNSimba Dec 20, 2025
32e4834
fix coverage
JNSimba Dec 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ DEFINE_Int32(brpc_port, "8060");

DEFINE_Int32(arrow_flight_sql_port, "8050");

DEFINE_Int32(cdc_client_port, "9096");

// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
Expand Down Expand Up @@ -629,6 +631,9 @@ DEFINE_mBool(enable_stream_load_commit_txn_on_be, "false");
// The buffer size to store stream table function schema info
DEFINE_Int64(stream_tvf_buffer_size, "1048576"); // 1MB

// request cdc client timeout
DEFINE_mInt32(request_cdc_client_timeout_ms, "60000");

// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
// You may need to lower the speed when the sink receiver bes are too busy.
DEFINE_mInt32(olap_table_sink_send_interval_microseconds, "1000");
Expand Down
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ DECLARE_Int32(brpc_port);
// Default -1, do not start arrow flight sql server.
DECLARE_Int32(arrow_flight_sql_port);

// port for cdc client scan oltp cdc data
DECLARE_Int32(cdc_client_port);

// If the external client cannot directly access priority_networks, set public_host to be accessible
// to external client.
// There are usually two usage scenarios:
Expand Down Expand Up @@ -667,6 +670,9 @@ DECLARE_mBool(enable_stream_load_commit_txn_on_be);
// The buffer size to store stream table function schema info
DECLARE_Int64(stream_tvf_buffer_size);

// request cdc client timeout
DECLARE_mInt32(request_cdc_client_timeout_ms);

// OlapTableSink sender's send interval, should be less than the real response time of a tablet writer rpc.
// You may need to lower the speed when the sink receiver bes are too busy.
DECLARE_mInt32(olap_table_sink_send_interval_microseconds);
Expand Down
304 changes: 304 additions & 0 deletions be/src/runtime/cdc_client_mgr.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,304 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "runtime/cdc_client_mgr.h"

#include <brpc/closure_guard.h>
#include <fmt/core.h>
#include <gen_cpp/internal_service.pb.h>
#include <google/protobuf/stubs/callback.h>
#include <signal.h>
#include <sys/stat.h>
#include <sys/wait.h>
#ifndef __APPLE__
#include <sys/prctl.h>
#endif

#include <atomic>
#include <chrono>
#include <mutex>
#include <string>
#include <thread>

#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "http/http_client.h"

namespace doris {

namespace {
// Handle SIGCHLD signal to prevent zombie processes
void handle_sigchld(int sig_no) {
int status = 0;
pid_t pid;
while ((pid = waitpid(-1, &status, WNOHANG)) > 0) {
}
}

// Check CDC client health
Status check_cdc_client_health(int retry_times, int sleep_time, std::string& health_response) {
const std::string cdc_health_url =
"http://127.0.0.1:" + std::to_string(doris::config::cdc_client_port) +
"/actuator/health";

auto health_request = [cdc_health_url, &health_response](HttpClient* client) {
RETURN_IF_ERROR(client->init(cdc_health_url));
client->set_timeout_ms(5000);
RETURN_IF_ERROR(client->execute(&health_response));
return Status::OK();
};

Status status = HttpClient::execute_with_retry(retry_times, sleep_time, health_request);

if (!status.ok()) {
return Status::InternalError(
fmt::format("CDC client health check failed: url={}", cdc_health_url));
}

bool is_up = health_response.find("UP") != std::string::npos;

if (!is_up) {
return Status::InternalError(fmt::format("CDC client unhealthy: url={}, response={}",
cdc_health_url, health_response));
}

return Status::OK();
}

} // anonymous namespace

CdcClientMgr::CdcClientMgr() = default;

CdcClientMgr::~CdcClientMgr() {
stop();
LOG(INFO) << "CdcClientMgr is destroyed";
}

void CdcClientMgr::stop() {
pid_t pid = _child_pid.load();
if (pid > 0) {
// Check if process is still alive
if (kill(pid, 0) == 0) {
LOG(INFO) << "Stopping CDC client process, pid=" << pid;

// Send SIGTERM for graceful shutdown
if (kill(pid, SIGTERM) == 0) {
// Wait up to 10 seconds for graceful shutdown
for (int i = 0; i < 10; ++i) {
std::this_thread::sleep_for(std::chrono::seconds(1));
if (kill(pid, 0) != 0) {
// Process has exited
_child_pid.store(0);
LOG(INFO) << "CDC client process stopped gracefully";
return;
}
}
}

// Force kill if still alive
if (kill(pid, 0) == 0) {
LOG(WARNING) << "Force killing CDC client process, pid=" << pid;
kill(pid, SIGKILL);
// Wait for process to exit
int status = 0;
waitpid(pid, &status, 0);
}
_child_pid.store(0);
} else {
// Process already dead
_child_pid.store(0);
}
}

LOG(INFO) << "CdcClientMgr is stopped";
}

Status CdcClientMgr::start_cdc_client(PRequestCdcClientResult* result) {
std::lock_guard<std::mutex> lock(_start_mutex);

pid_t existing_pid = _child_pid.load();
if (existing_pid > 0) {
// Check if process is still alive
if (kill(existing_pid, 0) == 0) {
std::string check_response;
auto check_st = check_cdc_client_health(1, 0, check_response);
if (check_st.ok()) {
LOG(INFO) << "cdc client already started, pid=" << existing_pid;
return Status::OK();
} else {
// Process exists but not healthy, reset PID
LOG(WARNING) << "CDC client process exists but unhealthy, pid=" << existing_pid;
_child_pid.store(0);
}
} else {
// Process is dead, reset PID
_child_pid.store(0);
}
}

Status st = Status::OK();

const char* doris_home = getenv("DORIS_HOME");
if (!doris_home) {
st = Status::InternalError("DORIS_HOME environment variable is not set");
if (result) {
st.to_protobuf(result->mutable_status());
}
return st;
}

const char* log_dir = getenv("LOG_DIR");
if (!log_dir) {
st = Status::InternalError("LOG_DIR environment variable is not set");
if (result) {
st.to_protobuf(result->mutable_status());
}
return st;
}

const std::string cdc_jar_path = std::string(doris_home) + "/lib/cdc_client/cdc-client.jar";
const std::string cdc_jar_port =
"--server.port=" + std::to_string(doris::config::cdc_client_port);
const std::string backend_http_port =
"--backend.http.port=" + std::to_string(config::webserver_port);
const std::string java_opts = "-Dlog.path=" + std::string(log_dir);

// check cdc jar exists
struct stat buffer;
if (stat(cdc_jar_path.c_str(), &buffer) != 0) {
st = Status::InternalError("Can not find cdc-client.jar.");
if (result) {
st.to_protobuf(result->mutable_status());
}
return st;
}

// check cdc process already started
std::string check_response;
auto check_st = check_cdc_client_health(1, 0, check_response);
if (check_st.ok()) {
LOG(INFO) << "cdc client already started.";
return Status::OK();
} else {
LOG(INFO) << "cdc client not started, to start.";
}

const auto* java_home = getenv("JAVA_HOME");
if (!java_home) {
st = Status::InternalError("Can not find java home.");
if (result) {
st.to_protobuf(result->mutable_status());
}
return st;
}
std::string path(java_home);

// Capture signal to prevent child process from becoming a zombie process
struct sigaction act;
act.sa_flags = 0;
act.sa_handler = handle_sigchld;
sigaction(SIGCHLD, &act, NULL);
LOG(INFO) << "Start to fork cdc client process with " << path;

pid_t pid = ::fork();
if (pid < 0) {
// Fork failed
st = Status::InternalError("Fork cdc client failed.");
if (result) {
st.to_protobuf(result->mutable_status());
}
return st;
} else if (pid == 0) {
// Child process
// When the parent process is killed, the child process also needs to exit
#ifndef __APPLE__
prctl(PR_SET_PDEATHSIG, SIGKILL);
#endif

LOG(INFO) << "Cdc client child process ready to start";
std::cout << "Cdc client child process ready to start." << std::endl;
std::string java_bin = path + "/bin/java";
// java -jar -Dlog.path=xx cdc-client.jar --server.port=9096 --backend.http.port=8040
execlp(java_bin.c_str(), "java", java_opts.c_str(), "-jar", cdc_jar_path.c_str(),
cdc_jar_port.c_str(), backend_http_port.c_str(), (char*)NULL);
std::cerr << "Cdc client child process error." << std::endl;
exit(1);
} else {
// Parent process: save PID and wait for startup
_child_pid.store(pid);

// Waiting for cdc to start, failed after more than 30 seconds
std::string health_response;
Status status = check_cdc_client_health(5, 6, health_response);
if (!status.ok()) {
// Reset PID if startup failed
_child_pid.store(0);
st = Status::InternalError("Start cdc client failed.");
if (result) {
st.to_protobuf(result->mutable_status());
}
} else {
LOG(INFO) << "Start cdc client success, pid=" << pid
<< ", status=" << status.to_string() << ", response=" << health_response;
}
}
return st;
}

void CdcClientMgr::request_cdc_client_impl(const PRequestCdcClientRequest* request,
PRequestCdcClientResult* result,
google::protobuf::Closure* done) {
VLOG_RPC << "request to cdc client, api " << request->api();
brpc::ClosureGuard closure_guard(done);

// Start CDC client if not started
Status start_st = start_cdc_client(result);
if (!start_st.ok()) {
LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
start_st.to_protobuf(result->mutable_status());
return;
}

std::string cdc_response;
Status st = send_request_to_cdc_client(request->api(), request->params(), &cdc_response);
result->set_response(cdc_response);
st.to_protobuf(result->mutable_status());
}

Status CdcClientMgr::send_request_to_cdc_client(const std::string& api,
const std::string& params_body,
std::string* response) {
std::string remote_url_prefix =
fmt::format("http://127.0.0.1:{}{}", doris::config::cdc_client_port, api);

auto cdc_request = [&remote_url_prefix, response, &params_body](HttpClient* client) {
RETURN_IF_ERROR(client->init(remote_url_prefix));
client->set_timeout_ms(doris::config::request_cdc_client_timeout_ms);
if (!params_body.empty()) {
client->set_payload(params_body);
}
client->set_content_type("application/json");
client->set_method(POST);
RETURN_IF_ERROR(client->execute(response));
return Status::OK();
};

return HttpClient::execute_with_retry(3, 1, cdc_request);
}

} // namespace doris
Loading
Loading