Skip to content

Commit 7dd40bd

Browse files
branch-4.1: [Feature](tvf) Support cdc stream tvf for mysql and pg #60116 (#61840)
Cherry-picked from #60116 Co-authored-by: wudi <wudi@selectdb.com>
1 parent 86a9d57 commit 7dd40bd

30 files changed

Lines changed: 1099 additions & 80 deletions

File tree

be/src/io/fs/http_file_reader.cpp

Lines changed: 120 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@
2222

2323
#include <algorithm>
2424

25+
#include "common/config.h"
2526
#include "common/logging.h"
27+
#include "gen_cpp/internal_service.pb.h"
28+
#include "runtime/cdc_client_mgr.h"
29+
#include "runtime/exec_env.h"
2630

2731
namespace doris::io {
2832

@@ -84,44 +88,100 @@ HttpFileReader::HttpFileReader(const OpenFileInfo& fileInfo, std::string url, in
8488
}
8589
}
8690

91+
// Parse chunk response configuration; chunk response implies no Range support
92+
auto chunk_iter = _extend_kv.find("http.enable.chunk.response");
93+
if (chunk_iter != _extend_kv.end()) {
94+
std::string value = chunk_iter->second;
95+
std::transform(value.begin(), value.end(), value.begin(), ::tolower);
96+
_enable_chunk_response = (value == "true" || value == "1");
97+
if (_enable_chunk_response) {
98+
_range_supported = false;
99+
}
100+
}
101+
87102
_read_buffer = std::make_unique<char[]>(READ_BUFFER_SIZE);
88103
}
89104

90105
HttpFileReader::~HttpFileReader() {
91106
static_cast<void>(close());
92107
}
93108

109+
Status HttpFileReader::setup_cdc_client() {
110+
auto enable_cdc_iter = _extend_kv.find("enable_cdc_client");
111+
if (enable_cdc_iter == _extend_kv.end() || enable_cdc_iter->second != "true") {
112+
return Status::OK();
113+
}
114+
115+
LOG(INFO) << "CDC client is enabled, starting CDC client for " << _url;
116+
ExecEnv* env = ExecEnv::GetInstance();
117+
if (env == nullptr || env->cdc_client_mgr() == nullptr) {
118+
return Status::InternalError("ExecEnv or CdcClientMgr is not initialized");
119+
}
120+
121+
PRequestCdcClientResult result;
122+
Status start_st = env->cdc_client_mgr()->start_cdc_client(&result);
123+
if (!start_st.ok()) {
124+
LOG(ERROR) << "Failed to start CDC client, status=" << start_st.to_string();
125+
return start_st;
126+
}
127+
128+
// Replace CDC_CLIENT_PORT placeholder with actual CDC client port
129+
const std::string placeholder = "CDC_CLIENT_PORT";
130+
size_t pos = _url.find(placeholder);
131+
if (pos != std::string::npos) {
132+
_url.replace(pos, placeholder.size(), std::to_string(doris::config::cdc_client_port));
133+
}
134+
LOG(INFO) << "CDC client started successfully for " << _url;
135+
return Status::OK();
136+
}
137+
94138
Status HttpFileReader::open(const FileReaderOptions& opts) {
139+
// CDC client setup must run before the _initialized guard.
140+
// See setup_cdc_client() for lifecycle details.
141+
RETURN_IF_ERROR(setup_cdc_client());
142+
143+
// Skip metadata detection when file size was pre-supplied by the caller.
95144
if (_initialized) {
96145
return Status::OK();
97146
}
98147

99-
// Step 1: HEAD request to get file metadata
100-
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
101-
_client->set_method(HttpMethod::HEAD);
102-
RETURN_IF_ERROR(_client->execute());
148+
// Step 1: HEAD request to get file metadata (skip for chunk response)
149+
if (_enable_chunk_response) {
150+
// Chunk streaming response: size is unknown until the stream completes.
151+
// _range_supported is already false (set in constructor).
152+
_size_known = false;
153+
// Reset _file_size from the SIZE_MAX default to 0 so that any caller of
154+
// size() (e.g. NewJsonReader::_read_one_message) does not attempt to
155+
// allocate SIZE_MAX bytes before the download completes.
156+
_file_size = 0;
157+
LOG(INFO) << "Chunk response mode enabled, skipping HEAD request for " << _url;
158+
} else {
159+
// Normal mode: execute HEAD request to get file metadata
160+
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/true));
161+
_client->set_method(HttpMethod::HEAD);
162+
RETURN_IF_ERROR(_client->execute());
103163

104-
uint64_t content_length = 0;
105-
RETURN_IF_ERROR(_client->get_content_length(&content_length));
164+
uint64_t content_length = 0;
165+
RETURN_IF_ERROR(_client->get_content_length(&content_length));
106166

107-
_file_size = content_length;
108-
_size_known = true;
167+
_file_size = content_length;
168+
_size_known = true;
169+
}
109170

110-
// Step 2: Check if Range request is disabled by configuration
111-
if (!_enable_range_request) {
112-
// User explicitly disabled Range requests, use non-Range mode directly
171+
// Step 2: Check if Range request is disabled by configuration.
172+
// Chunk response mode always has _range_supported=false (set in constructor), so only
173+
// the non-chunk non-Range path needs the file size guard.
174+
if (_enable_chunk_response) {
175+
// Nothing to do: _range_supported already false, size check not applicable
176+
} else if (!_enable_range_request) {
113177
_range_supported = false;
114-
LOG(INFO) << "Range requests disabled by configuration for " << _url
115-
<< ", using non-Range mode. File size: " << _file_size << " bytes";
116-
117-
// Check if file size exceeds limit for non-Range mode
178+
LOG(INFO) << "Range requests disabled by configuration for " << _url;
118179
if (_file_size > _max_request_size_bytes) {
119180
return Status::InternalError(
120-
"Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} bytes, "
121-
"configured by http.max.request.size.bytes). URL: {}",
181+
"Non-Range mode: file size ({} bytes) exceeds maximum allowed size ({} "
182+
"bytes, configured by http.max.request.size.bytes). URL: {}",
122183
_file_size, _max_request_size_bytes, _url);
123184
}
124-
125185
LOG(INFO) << "Non-Range mode validated for " << _url << ", file size: " << _file_size
126186
<< " bytes, max allowed: " << _max_request_size_bytes << " bytes";
127187
} else {
@@ -224,9 +284,29 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
224284
VLOG(2) << "Issuing HTTP GET request: offset=" << offset << " req_len=" << req_len
225285
<< " with_range=" << _range_supported;
226286

227-
// Prepare and initialize the HTTP client for GET request
287+
// Prepare and initialize the HTTP client for request
228288
RETURN_IF_ERROR(prepare_client(/*set_fail_on_error=*/false));
229-
_client->set_method(HttpMethod::GET);
289+
290+
// Determine HTTP method from configuration (default: GET)
291+
HttpMethod method = HttpMethod::GET;
292+
auto method_iter = _extend_kv.find("http.method");
293+
if (method_iter != _extend_kv.end()) {
294+
method = to_http_method(method_iter->second.c_str());
295+
if (method == HttpMethod::UNKNOWN) {
296+
LOG(WARNING) << "Invalid http.method value: " << method_iter->second
297+
<< ", falling back to GET";
298+
method = HttpMethod::GET;
299+
}
300+
}
301+
_client->set_method(method);
302+
303+
// Set payload if configured (supports POST, PUT, DELETE, etc.)
304+
auto payload_iter = _extend_kv.find("http.payload");
305+
if (payload_iter != _extend_kv.end() && !payload_iter->second.empty()) {
306+
_client->set_payload(payload_iter->second);
307+
_client->set_content_type("application/json");
308+
VLOG(2) << "HTTP request with payload, size=" << payload_iter->second.size();
309+
}
230310

231311
_client->set_header("Expect", "");
232312
_client->set_header("Connection", "close");
@@ -270,6 +350,21 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
270350
long http_status = _client->get_http_status();
271351
VLOG(2) << "HTTP response: status=" << http_status << " received_bytes=" << buf.size();
272352

353+
// Check for HTTP error status codes (4xx, 5xx)
354+
if (http_status >= 400) {
355+
std::string error_body;
356+
if (buf.empty()) {
357+
error_body = "(empty response body)";
358+
} else {
359+
// Limit error message to 1024 bytes to avoid excessive logging
360+
size_t max_len = std::min(buf.size(), static_cast<size_t>(1024));
361+
error_body = buf.substr(0, max_len);
362+
}
363+
364+
return Status::InternalError("HTTP request failed with status {}: {}.", http_status,
365+
error_body);
366+
}
367+
273368
if (buf.empty()) {
274369
*bytes_read = buffer_offset;
275370
return Status::OK();
@@ -295,6 +390,11 @@ Status HttpFileReader::read_at_impl(size_t offset, Slice result, size_t* bytes_r
295390
// Cache the complete file content for subsequent reads
296391
_full_file_cache = std::move(buf);
297392
_full_file_cached = true;
393+
// Now that the full content is in hand, update _file_size to the actual
394+
// byte count. This replaces the 0 placeholder set in open() for chunk
395+
// response mode, so subsequent calls to size() return a correct value.
396+
_file_size = _full_file_cache.size();
397+
_size_known = true;
298398

299399
VLOG(2) << "Cached full file: " << _full_file_cache.size() << " bytes";
300400

be/src/io/fs/http_file_reader.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ class HttpFileReader final : public FileReader {
6262
// Returns OK on success with _range_supported set appropriately
6363
Status detect_range_support();
6464

65+
// Start the CDC client process
66+
// Called at the start of open() when enable_cdc_client=true.
67+
Status setup_cdc_client();
68+
6569
std::unique_ptr<char[]> _read_buffer;
6670
static constexpr size_t READ_BUFFER_SIZE = 1 << 20; // 1MB
6771
// Default maximum file size for servers that don't support Range requests
@@ -89,6 +93,8 @@ class HttpFileReader final : public FileReader {
8993
// Full file cache for non-Range mode to avoid repeated downloads
9094
std::string _full_file_cache; // Cache complete file content
9195
bool _full_file_cached = false; // Whether full file has been cached
96+
97+
bool _enable_chunk_response = false; // Whether server returns chunk streaming response
9298
};
9399

94100
} // namespace doris::io

fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
public class DataSourceConfigKeys {
2121
public static final String JDBC_URL = "jdbc_url";
22+
public static final String TYPE = "type";
2223
public static final String DRIVER_URL = "driver_url";
2324
public static final String DRIVER_CLASS = "driver_class";
2425
public static final String USER = "user";

fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CompareOffsetRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public CompareOffsetRequest(Long jobId,
3838
String frontendAddress,
3939
Map<String, String> offsetFirst,
4040
Map<String, String> offsetSecond) {
41-
super(jobId, sourceType, sourceProperties, frontendAddress);
41+
super(jobId.toString(), sourceType, sourceProperties, frontendAddress);
4242
this.offsetFirst = offsetFirst;
4343
this.offsetSecond = offsetSecond;
4444
}

fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchRecordRequest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@
2323
@Data
2424
@EqualsAndHashCode(callSuper = true)
2525
public class FetchRecordRequest extends JobBaseRecordRequest {
26+
private String taskId;
2627
}

fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/FetchTableSplitsRequest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public class FetchTableSplitsRequest extends JobBaseConfig {
3535

3636
public FetchTableSplitsRequest(Long jobId, String name,
3737
Map<String, String> sourceProperties, String frontendAddress, String snapshotTable) {
38-
super(jobId, name, sourceProperties, frontendAddress);
38+
super(jobId.toString(), name, sourceProperties, frontendAddress);
3939
this.snapshotTable = snapshotTable;
4040
}
4141
}

fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
@AllArgsConstructor
2828
@NoArgsConstructor
2929
public class JobBaseConfig {
30-
private Long jobId;
30+
private String jobId;
3131
private String dataSource;
3232
private Map<String, String> config;
3333
private String frontendAddress;

fe/fe-core/src/main/java/org/apache/doris/catalog/BuiltinTableValuedFunctions.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.nereids.trees.expressions.functions.table.Backends;
2121
import org.apache.doris.nereids.trees.expressions.functions.table.Catalogs;
22+
import org.apache.doris.nereids.trees.expressions.functions.table.CdcStream;
2223
import org.apache.doris.nereids.trees.expressions.functions.table.File;
2324
import org.apache.doris.nereids.trees.expressions.functions.table.Frontends;
2425
import org.apache.doris.nereids.trees.expressions.functions.table.FrontendsDisks;
@@ -75,7 +76,8 @@ public class BuiltinTableValuedFunctions implements FunctionHelper {
7576
tableValued(ParquetMeta.class, "parquet_meta"),
7677
tableValued(ParquetFileMetadata.class, "parquet_file_metadata"),
7778
tableValued(ParquetKvMetadata.class, "parquet_kv_metadata"),
78-
tableValued(ParquetBloomProbe.class, "parquet_bloom_probe")
79+
tableValued(ParquetBloomProbe.class, "parquet_bloom_probe"),
80+
tableValued(CdcStream.class, "cdc_stream")
7981
);
8082

8183
public static final BuiltinTableValuedFunctions INSTANCE = new BuiltinTableValuedFunctions();

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ private String getToken() throws JobException {
181181
private WriteRecordRequest buildRequestParams() throws JobException {
182182
JdbcOffset offset = (JdbcOffset) runningOffset;
183183
WriteRecordRequest request = new WriteRecordRequest();
184-
request.setJobId(getJobId());
184+
request.setJobId(String.valueOf(getJobId()));
185185
request.setConfig(sourceProperties);
186186

187187
request.setDataSource(dataSourceType.name());

fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public void updateOffset(Offset offset) {
198198
public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
199199
Backend backend = StreamingJobUtils.selectBackend();
200200
JobBaseConfig requestParams =
201-
new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties, getFrontendAddress());
201+
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
202202
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
203203
.setApi("/api/fetchEndOffset")
204204
.setParams(new Gson().toJson(requestParams)).build();
@@ -570,7 +570,7 @@ public boolean hasReachedEnd() {
570570
private void initSourceReader() throws JobException {
571571
Backend backend = StreamingJobUtils.selectBackend();
572572
JobBaseConfig requestParams =
573-
new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties, getFrontendAddress());
573+
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
574574
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
575575
.setApi("/api/initReader")
576576
.setParams(new Gson().toJson(requestParams)).build();
@@ -618,7 +618,7 @@ public void cleanMeta(Long jobId) throws JobException {
618618
StreamingJobUtils.deleteJobMeta(jobId);
619619
Backend backend = StreamingJobUtils.selectBackend();
620620
JobBaseConfig requestParams =
621-
new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties, getFrontendAddress());
621+
new JobBaseConfig(getJobId().toString(), sourceType.name(), sourceProperties, getFrontendAddress());
622622
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
623623
.setApi("/api/close")
624624
.setParams(new Gson().toJson(requestParams)).build();

0 commit comments

Comments
 (0)