Skip to content

Commit 20e0261

Browse files
JNSimbaYour Name
authored andcommitted
[Fix](streamingjob) modify select backend policy for streaming job (#59705)
### What problem does this PR solve? Issue Number: close #xxx Related PR: #58898
1 parent 0221a73 commit 20e0261

14 files changed

Lines changed: 30 additions & 24 deletions

fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void run() throws JobException {
112112
}
113113

114114
private void sendWriteRequest() throws JobException {
115-
Backend backend = StreamingJobUtils.selectBackend(jobId);
115+
Backend backend = StreamingJobUtils.selectBackend();
116116
WriteRecordRequest params = buildRequestParams();
117117
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
118118
.setApi("/api/writeRecords")

fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ public void updateOffset(Offset offset) {
183183

184184
@Override
185185
public void fetchRemoteMeta(Map<String, String> properties) throws Exception {
186-
Backend backend = StreamingJobUtils.selectBackend(jobId);
186+
Backend backend = StreamingJobUtils.selectBackend();
187187
JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties);
188188
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
189189
.setApi("/api/fetchEndOffset")
@@ -258,7 +258,7 @@ public boolean hasMoreDataToConsume() {
258258

259259
private boolean compareOffset(Map<String, String> offsetFirst, Map<String, String> offsetSecond)
260260
throws JobException {
261-
Backend backend = StreamingJobUtils.selectBackend(jobId);
261+
Backend backend = StreamingJobUtils.selectBackend();
262262
CompareOffsetRequest requestParams =
263263
new CompareOffsetRequest(getJobId(), sourceType.name(), sourceProperties, offsetFirst, offsetSecond);
264264
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
@@ -444,7 +444,7 @@ private void saveChunkMeta(Map<String, List<SnapshotSplit>> tableSplits) throws
444444
}
445445

446446
private List<SnapshotSplit> requestTableSplits(String table) throws JobException {
447-
Backend backend = StreamingJobUtils.selectBackend(jobId);
447+
Backend backend = StreamingJobUtils.selectBackend();
448448
FetchTableSplitsRequest requestParams =
449449
new FetchTableSplitsRequest(getJobId(), sourceType.name(), sourceProperties, table);
450450
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
@@ -493,7 +493,7 @@ private boolean checkNeedSplitChunks(Map<String, String> sourceProperties) {
493493
public void cleanMeta(Long jobId) throws JobException {
494494
// clean meta table
495495
StreamingJobUtils.deleteJobMeta(jobId);
496-
Backend backend = StreamingJobUtils.selectBackend(jobId);
496+
Backend backend = StreamingJobUtils.selectBackend();
497497
JobBaseConfig requestParams = new JobBaseConfig(getJobId(), sourceType.name(), sourceProperties);
498498
InternalService.PRequestCdcClientRequest request = InternalService.PRequestCdcClientRequest.newBuilder()
499499
.setApi("/api/close")

fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ public class StreamingJobUtils {
9797

9898
private static final ObjectMapper objectMapper = new ObjectMapper();
9999

100+
private static int lastSelectedBackendIndex = 0;
101+
100102
public static void createMetaTableIfNotExist() throws Exception {
101103
Optional<Database> optionalDatabase =
102104
Env.getCurrentEnv().getInternalCatalog()
@@ -213,27 +215,31 @@ private static JdbcClient getJdbcClient(DataSourceType sourceType, Map<String, S
213215
return JdbcClient.createJdbcClient(config);
214216
}
215217

216-
public static Backend selectBackend(Long jobId) throws JobException {
218+
public static Backend selectBackend() throws JobException {
217219
Backend backend = null;
218220
BeSelectionPolicy policy = null;
219221

220-
policy = new BeSelectionPolicy.Builder()
221-
.setEnableRoundRobin(true)
222-
.needLoadAvailable().build();
222+
policy = new BeSelectionPolicy.Builder().setEnableRoundRobin(true).needLoadAvailable().build();
223+
policy.nextRoundRobinIndex = getLastSelectedBackendIndexAndUpdate();
224+
223225
List<Long> backendIds;
224-
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, -1);
226+
backendIds = Env.getCurrentSystemInfo().selectBackendIdsByPolicy(policy, 1);
225227
if (backendIds.isEmpty()) {
226228
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
227229
}
228-
// jobid % backendSize
229-
long index = backendIds.get(jobId.intValue() % backendIds.size());
230-
backend = Env.getCurrentSystemInfo().getBackend(index);
230+
backend = Env.getCurrentSystemInfo().getBackend(backendIds.get(0));
231231
if (backend == null) {
232232
throw new JobException(SystemInfoService.NO_BACKEND_LOAD_AVAILABLE_MSG + ", policy: " + policy);
233233
}
234234
return backend;
235235
}
236236

237+
private static synchronized int getLastSelectedBackendIndexAndUpdate() {
238+
int index = lastSelectedBackendIndex;
239+
lastSelectedBackendIndex = (index >= Integer.MAX_VALUE - 1) ? 0 : index + 1;
240+
return index;
241+
}
242+
237243
public static List<CreateTableCommand> generateCreateTableCmds(String targetDb, DataSourceType sourceType,
238244
Map<String, String> properties, Map<String, String> targetProperties)
239245
throws JobException {

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
2020

2121
import static java.util.concurrent.TimeUnit.SECONDS
2222

23-
suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_docker_mysql") {
23+
suite("test_streaming_mysql_job", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
2424
def jobName = "test_streaming_mysql_job_name"
2525
def currentDb = (sql "select database()")[0][0]
2626
def table1 = "user_info_normal1"

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_all_type.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
2020

2121
import static java.util.concurrent.TimeUnit.SECONDS
2222

23-
suite("test_streaming_mysql_job_all_type", "p0,external,mysql,external_docker,external_docker_mysql") {
23+
suite("test_streaming_mysql_job_all_type", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
2424
def jobName = "test_streaming_mysql_job_all_type_name"
2525
def currentDb = (sql "select database()")[0][0]
2626
def table1 = "streaming_all_types_nullable_with_pk"

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
2020

2121
import static java.util.concurrent.TimeUnit.SECONDS
2222

23-
suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docker,external_docker_mysql") {
23+
suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
2424
def jobName = "test_streaming_mysql_job_create_alter"
2525
def currentDb = (sql "select database()")[0][0]
2626
def table1 = "create_alter_user_info"

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_dup.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
suite("test_streaming_mysql_job_dup", "p0,external,mysql,external_docker,external_docker_mysql") {
18+
suite("test_streaming_mysql_job_dup", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
1919
def jobName = "test_streaming_mysql_job_name"
2020
def currentDb = (sql "select database()")[0][0]
2121
def table1 = "test_streaming_mysql_job_dup"

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_exclude.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
2020

2121
import static java.util.concurrent.TimeUnit.SECONDS
2222

23-
suite("test_streaming_mysql_job_exclude", "p0,external,mysql,external_docker,external_docker_mysql") {
23+
suite("test_streaming_mysql_job_exclude", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
2424
def jobName = "test_streaming_mysql_job_exclude_name"
2525
def currentDb = (sql "select database()")[0][0]
2626
def table1 = "user_info_exclude1"

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import org.awaitility.Awaitility
1919

2020
import static java.util.concurrent.TimeUnit.SECONDS
2121

22-
suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,external_docker_mysql") {
22+
suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
2323
def tableName = "test_streaming_mysql_job_priv_tbl"
2424
def jobName = "test_streaming_mysql_job_priv_name"
2525

regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import org.awaitility.Awaitility
2020

2121
import static java.util.concurrent.TimeUnit.SECONDS
2222

23-
suite("test_streaming_mysql_job_restart_fe", "docker,mysql,external_docker,external_docker_mysql") {
23+
suite("test_streaming_mysql_job_restart_fe", "docker,mysql,external_docker,external_docker_mysql,nondatalake") {
2424
def jobName = "test_streaming_mysql_job_restart_fe"
2525
def options = new ClusterOptions()
2626
options.setFeNum(1)

0 commit comments

Comments
 (0)