Skip to content

Commit 717a11c

Browse files
author
Andrei Nadyktov
committed
IGNITE-22530 Add already existing caches to CDC after new regex filters are set
1 parent d81bd13 commit 717a11c

6 files changed

Lines changed: 187 additions & 54 deletions

File tree

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/AbstractIgniteCdcStreamer.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.nio.file.Path;
2121
import java.util.Iterator;
22+
import java.util.List;
2223
import java.util.Set;
2324
import java.util.stream.Collectors;
2425

@@ -119,7 +120,7 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
119120
}
120121

121122
/** {@inheritDoc} */
122-
@Override public void start(MetricRegistry reg, Path cdcDir) {
123+
@Override public void start(MetricRegistry reg, Path cdcDir, List<String> cacheNames) {
123124
A.notEmpty(caches, "caches");
124125

125126
regexManager = new CdcRegexManager(cdcDir, log);
@@ -131,6 +132,8 @@ public abstract class AbstractIgniteCdcStreamer implements CdcConsumerEx {
131132

132133
regexManager.compileRegexp(includeTemplate, excludeTemplate);
133134

135+
regexManager.match(cacheNames);
136+
134137
regexManager.getSavedCaches().stream()
135138
.map(CU::cacheId)
136139
.forEach(cachesIds::add);

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/CdcRegexManager.java

Lines changed: 36 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.nio.file.Files;
2222
import java.nio.file.Path;
23+
import java.util.Collections;
2324
import java.util.List;
2425
import java.util.Optional;
2526
import java.util.regex.Pattern;
@@ -73,13 +74,23 @@ public CdcRegexManager(Path cdcDir, IgniteLogger log) {
7374
* @return True if cache name matches user's regexp patterns.
7475
*/
7576
public boolean match(String cacheName) {
76-
return matchAndSave(cacheName);
77+
return matchAndSave(Collections.singletonList(cacheName));
7778
}
7879

7980
/**
80-
* Get actual list of names of caches added by regex templates from cache list file.
81-
* Caches that added to replication through regex templates during the work of CDC application,
82-
* are saved to file so they can be restored after application restart.
81+
* Finds and processes match between a set of cache names and user's regexp patterns.
82+
*
83+
* @param cacheNames Cache names.
84+
* @return True if cache name matches user's regexp patterns.
85+
*/
86+
public boolean match(List<String> cacheNames) {
87+
return matchAndSave(cacheNames);
88+
}
89+
90+
/**
91+
* Get actual list of names of caches added by regex templates from cache list file.<br/>
92+
* All new caches started during the work of CDC application are saved to file so they can be added to CDC later if
93+
* appropriate regex filter is set.
8394
*
8495
* @return Caches names list.
8596
*/
@@ -95,31 +106,34 @@ public List<String> getSavedCaches() {
95106
}
96107

97108
/**
98-
* Finds match between cache name and user's regex templates.
99-
* If match is found, saves cache name to file.
109+
* Finds match between caches names and user's regex templates and saves cache name to a file.
100110
*
101-
* @param cacheName Cache name.
111+
* @param cacheNames Cache names.
102112
* @return True if cache name matches user's regexp patterns.
103113
*/
104-
private boolean matchAndSave(String cacheName) {
105-
if (matchesFilters(cacheName)) {
106-
try {
107-
List<String> caches = loadCaches();
114+
private boolean matchAndSave(List<String> cacheNames) {
115+
try {
116+
List<String> caches = loadCaches();
108117

109-
caches.add(cacheName);
118+
caches.addAll(cacheNames);
110119

111-
save(caches);
112-
}
113-
catch (IOException e) {
114-
throw new IgniteException(e);
115-
}
120+
save(caches);
121+
}
122+
catch (IOException e) {
123+
throw new IgniteException(e);
124+
}
116125

117-
if (log.isInfoEnabled())
118-
log.info("Cache has been added to replication [cacheName=" + cacheName + "]");
126+
List<String> matchingCaches = cacheNames.stream()
127+
.filter(this::matchesFilters)
128+
.collect(Collectors.toList());
119129

120-
return true;
121-
}
122-
return false;
130+
if (matchingCaches.isEmpty())
131+
return false;
132+
133+
if (log.isInfoEnabled())
134+
log.info("Cache(s) has been added to replication [cacheNames=" + matchingCaches + "]");
135+
136+
return true;
123137
}
124138

125139
/**

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/IgniteToIgniteCdcStreamer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.cdc;
1919

2020
import java.nio.file.Path;
21+
import java.util.List;
2122

2223
import org.apache.ignite.IgniteException;
2324
import org.apache.ignite.Ignition;
@@ -61,8 +62,8 @@ public class IgniteToIgniteCdcStreamer extends AbstractIgniteCdcStreamer impleme
6162
private volatile boolean alive = true;
6263

6364
/** {@inheritDoc} */
64-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
65-
super.start(mreg, cdcDir);
65+
@Override public void start(MetricRegistry mreg, Path cdcDir, List<String> cacheNames) {
66+
super.start(mreg, cdcDir, cacheNames);
6667

6768
if (log.isInfoEnabled())
6869
log.info("Ignite To Ignite Streamer [cacheIds=" + cachesIds + ']');

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/kafka/IgniteToKafkaCdcStreamer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ private <T> void sendOneBatch(
336336
}
337337

338338
/** {@inheritDoc} */
339-
@Override public void start(MetricRegistry reg, Path cdcDir) {
339+
@Override public void start(MetricRegistry reg, Path cdcDir, List<String> cacheNames) {
340340
A.notNull(kafkaProps, "Kafka properties");
341341
A.notNull(evtTopic, "Kafka topic");
342342
A.notNull(metadataTopic, "Kafka metadata topic");
@@ -355,6 +355,8 @@ private <T> void sendOneBatch(
355355

356356
regexManager.compileRegexp(includeTemplate, excludeTemplate);
357357

358+
regexManager.match(cacheNames);
359+
358360
regexManager.getSavedCaches().stream()
359361
.map(CU::cacheId)
360362
.forEach(cachesIds::add);

modules/cdc-ext/src/main/java/org/apache/ignite/cdc/thin/IgniteToIgniteClientCdcStreamer.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.ignite.cdc.thin;
1919

2020
import java.nio.file.Path;
21+
import java.util.List;
2122

2223
import org.apache.ignite.Ignition;
2324
import org.apache.ignite.cdc.AbstractIgniteCdcStreamer;
@@ -68,8 +69,8 @@ public class IgniteToIgniteClientCdcStreamer extends AbstractIgniteCdcStreamer {
6869
private long aliveCheckTimeout = DFLT_ALIVE_CHECK_TIMEOUT;
6970

7071
/** {@inheritDoc} */
71-
@Override public void start(MetricRegistry mreg, Path cdcDir) {
72-
super.start(mreg, cdcDir);
72+
@Override public void start(MetricRegistry mreg, Path cdcDir, List<String> cacheNames) {
73+
super.start(mreg, cdcDir, cacheNames);
7374

7475
if (log.isInfoEnabled())
7576
log.info("Ignite To Ignite Client Streamer [cacheIds=" + cachesIds + ']');

0 commit comments

Comments
 (0)