9393 * <li>No need synchronized on {@link #sources}. {@link #sources} is a ConcurrentHashMap and there
9494 * is a Lock for peer id in {@link PeerProcedureHandlerImpl}. So there is no race for peer
9595 * operations.</li>
96- * <li>Need synchronized on {@link #walsById}. There are four methods which modify it,
97- * {@link #addPeer(String)}, {@link #removePeer(String)},
98- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and {@link #preLogRoll(Path)}.
99- * {@link #walsById} is a ConcurrentHashMap and there is a Lock for peer id in
100- * {@link PeerProcedureHandlerImpl}. So there is no race between {@link #addPeer(String)} and
101- * {@link #removePeer(String)}. {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)}
102- * is called by {@link ReplicationSourceInterface}. So no race with {@link #addPeer(String)}.
103- * {@link #removePeer(String)} will terminate the {@link ReplicationSourceInterface} firstly, then
104- * remove the wals from {@link #walsById}. So no race with {@link #removePeer(String)}. The only
105- * case need synchronized is {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
106- * {@link #preLogRoll(Path)}.</li>
107- * <li>No need synchronized on {@link #walsByIdRecoveredQueues}. There are three methods which
108- * modify it, {@link #removePeer(String)} ,
109- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} and
110- * {@link ReplicationSourceManager#claimQueue(ServerName, String)}.
111- * {@link #cleanOldLogs(String, boolean, ReplicationSourceInterface)} is called by
112- * {@link ReplicationSourceInterface}. {@link #removePeer(String)} will terminate the
113- * {@link ReplicationSourceInterface} firstly, then remove the wals from
114- * {@link #walsByIdRecoveredQueues}. And
115- * {@link ReplicationSourceManager#claimQueue(ServerName, String)} will add the wals to
116- * {@link #walsByIdRecoveredQueues} firstly, then start up a {@link ReplicationSourceInterface}. So
117- * there is no race here. For {@link ReplicationSourceManager#claimQueue(ServerName, String)} and
118- * {@link #removePeer(String)}, there is already synchronized on {@link #oldsources}. So no need
119- * synchronized on {@link #walsByIdRecoveredQueues}.</li>
12096 * <li>Need synchronized on {@link #latestPaths} to avoid the new open source miss new log.</li>
12197 * <li>Need synchronized on {@link #oldsources} to avoid adding recovered source for the
12298 * to-be-removed peer.</li>
@@ -144,15 +120,6 @@ public class ReplicationSourceManager {
144120 // All about stopping
145121 private final Server server ;
146122
147- // All logs we are currently tracking
148- // Index structure of the map is: queue_id->logPrefix/logGroup->logs
149- // For normal replication source, the peer id is same with the queue id
150- private final ConcurrentMap <String , Map <String , NavigableSet <String >>> walsById ;
151- // Logs for recovered sources we are currently tracking
152- // the map is: queue_id->logPrefix/logGroup->logs
153- // For recovered source, the queue id's format is peer_id-servername-*
154- private final ConcurrentMap <String , Map <String , NavigableSet <String >>> walsByIdRecoveredQueues ;
155-
156123 private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager ;
157124
158125 private final Configuration conf ;
@@ -212,8 +179,6 @@ public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
212179 this .queueStorage = queueStorage ;
213180 this .replicationPeers = replicationPeers ;
214181 this .server = server ;
215- this .walsById = new ConcurrentHashMap <>();
216- this .walsByIdRecoveredQueues = new ConcurrentHashMap <>();
217182 this .oldsources = new ArrayList <>();
218183 this .conf = conf ;
219184 this .fs = fs ;
@@ -322,7 +287,6 @@ public void removePeer(String peerId) {
322287 // Delete queue from storage and memory and queue id is same with peer id for normal
323288 // source
324289 deleteQueue (peerId );
325- this .walsById .remove (peerId );
326290 }
327291 ReplicationPeerConfig peerConfig = peer .getPeerConfig ();
328292 if (peerConfig .isSyncReplication ()) {
@@ -364,15 +328,10 @@ ReplicationSourceInterface addSource(String peerId) throws IOException {
364328 // synchronized on latestPaths to avoid missing the new log
365329 synchronized (this .latestPaths ) {
366330 this .sources .put (peerId , src );
367- Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
368- this .walsById .put (peerId , walsByGroup );
369331 // Add the latest wal to that source's queue
370332 if (!latestPaths .isEmpty ()) {
371333 for (Map .Entry <String , Path > walPrefixAndPath : latestPaths .entrySet ()) {
372334 Path walPath = walPrefixAndPath .getValue ();
373- NavigableSet <String > wals = new TreeSet <>();
374- wals .add (walPath .getName ());
375- walsByGroup .put (walPrefixAndPath .getKey (), wals );
376335 // Abort RS and throw exception to make add peer failed
377336 abortAndThrowIOExceptionWhenFail (
378337 () -> this .queueStorage .addWAL (server .getServerName (), peerId , walPath .getName ()));
@@ -426,7 +385,10 @@ public void drainSources(String peerId) throws IOException, ReplicationException
426385 // map from walsById since later we may fail to delete them from the replication queue
427386 // storage, and when we retry next time, we can not know the wal files that need to be deleted
428387 // from the replication queue storage.
429- walsById .get (peerId ).forEach ((k , v ) -> wals .put (k , new TreeSet <>(v )));
388+ this .queueStorage .getWALsInQueue (this .server .getServerName (), peerId ).forEach (wal -> {
389+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
390+ wals .computeIfAbsent (walPrefix , p -> new TreeSet <>()).add (wal );
391+ });
430392 }
431393 LOG .info ("Startup replication source for " + src .getPeerId ());
432394 src .startup ();
@@ -435,15 +397,6 @@ public void drainSources(String peerId) throws IOException, ReplicationException
435397 queueStorage .removeWAL (server .getServerName (), peerId , wal );
436398 }
437399 }
438- synchronized (walsById ) {
439- Map <String , NavigableSet <String >> oldWals = walsById .get (peerId );
440- wals .forEach ((k , v ) -> {
441- NavigableSet <String > walsByGroup = oldWals .get (k );
442- if (walsByGroup != null ) {
443- walsByGroup .removeAll (v );
444- }
445- });
446- }
447400 // synchronized on oldsources to avoid race with NodeFailoverWorker. Since NodeFailoverWorker is
448401 // a background task, we will delete the file from replication queue storage under the lock to
449402 // simplify the logic.
@@ -455,7 +408,6 @@ public void drainSources(String peerId) throws IOException, ReplicationException
455408 oldSource .terminate (terminateMessage );
456409 oldSource .getSourceMetrics ().clear ();
457410 queueStorage .removeQueue (server .getServerName (), queueId );
458- walsByIdRecoveredQueues .remove (queueId );
459411 iter .remove ();
460412 }
461413 }
@@ -468,7 +420,7 @@ public void drainSources(String peerId) throws IOException, ReplicationException
468420 * replication queue storage and only to enqueue all logs to the new replication source
469421 * @param peerId the id of the replication peer
470422 */
471- public void refreshSources (String peerId ) throws IOException {
423+ public void refreshSources (String peerId ) throws ReplicationException , IOException {
472424 String terminateMessage = "Peer " + peerId +
473425 " state or config changed. Will close the previous replication source and open a new one" ;
474426 ReplicationPeer peer = replicationPeers .getPeer (peerId );
@@ -481,9 +433,8 @@ public void refreshSources(String peerId) throws IOException {
481433 // Do not clear metrics
482434 toRemove .terminate (terminateMessage , null , false );
483435 }
484- for (NavigableSet <String > walsByGroup : walsById .get (peerId ).values ()) {
485- walsByGroup .forEach (wal -> src .enqueueLog (new Path (this .logDir , wal )));
486- }
436+ this .queueStorage .getWALsInQueue (this .server .getServerName (), peerId )
437+ .forEach (wal -> src .enqueueLog (new Path (this .logDir , wal )));
487438 }
488439 LOG .info ("Startup replication source for " + src .getPeerId ());
489440 src .startup ();
@@ -504,9 +455,8 @@ public void refreshSources(String peerId) throws IOException {
504455 for (String queueId : previousQueueIds ) {
505456 ReplicationSourceInterface recoveredReplicationSource = createSource (queueId , peer );
506457 this .oldsources .add (recoveredReplicationSource );
507- for (SortedSet <String > walsByGroup : walsByIdRecoveredQueues .get (queueId ).values ()) {
508- walsByGroup .forEach (wal -> recoveredReplicationSource .enqueueLog (new Path (wal )));
509- }
458+ this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId )
459+ .forEach (wal -> recoveredReplicationSource .enqueueLog (new Path (wal )));
510460 toStartup .add (recoveredReplicationSource );
511461 }
512462 }
@@ -526,7 +476,6 @@ private boolean removeRecoveredSource(ReplicationSourceInterface src) {
526476 LOG .info ("Done with the recovered queue {}" , src .getQueueId ());
527477 // Delete queue from storage and memory
528478 deleteQueue (src .getQueueId ());
529- this .walsByIdRecoveredQueues .remove (src .getQueueId ());
530479 return true ;
531480 }
532481
@@ -549,8 +498,6 @@ void removeSource(ReplicationSourceInterface src) {
549498 this .sources .remove (src .getPeerId ());
550499 // Delete queue from storage and memory
551500 deleteQueue (src .getQueueId ());
552- this .walsById .remove (src .getQueueId ());
553-
554501 }
555502
556503 /**
@@ -635,42 +582,19 @@ public void logPositionAndCleanOldLogs(ReplicationSourceInterface source,
635582 * @param inclusive whether we should also remove the given log file
636583 * @param source the replication source
637584 */
638- void cleanOldLogs (String log , boolean inclusive , ReplicationSourceInterface source ) {
639- String logPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (log );
640- if (source .isRecovered ()) {
641- NavigableSet <String > wals = walsByIdRecoveredQueues .get (source .getQueueId ()).get (logPrefix );
642- if (wals != null ) {
643- NavigableSet <String > walsToRemove = wals .headSet (log , inclusive );
644- if (walsToRemove .isEmpty ()) {
645- return ;
646- }
647- cleanOldLogs (walsToRemove , source );
648- walsToRemove .clear ();
649- }
650- } else {
651- NavigableSet <String > wals ;
652- NavigableSet <String > walsToRemove ;
653- // synchronized on walsById to avoid race with preLogRoll
654- synchronized (this .walsById ) {
655- wals = walsById .get (source .getQueueId ()).get (logPrefix );
656- if (wals == null ) {
657- return ;
658- }
659- walsToRemove = wals .headSet (log , inclusive );
660- if (walsToRemove .isEmpty ()) {
661- return ;
662- }
663- walsToRemove = new TreeSet <>(walsToRemove );
664- }
665- // cleanOldLogs may spend some time, especially for sync replication where we may want to
666- // remove remote wals as the remote cluster may have already been down, so we do it outside
667- // the lock to avoid block preLogRoll
668- cleanOldLogs (walsToRemove , source );
669- // now let's remove the files in the set
670- synchronized (this .walsById ) {
671- wals .removeAll (walsToRemove );
672- }
585+ void cleanOldLogs (String log , boolean inclusive ,
586+ ReplicationSourceInterface source ) {
587+ NavigableSet <String > walsToRemove ;
588+ synchronized (this .latestPaths ) {
589+ walsToRemove = getWalsToRemove (source .getQueueId (), log , inclusive );
590+ }
591+ if (walsToRemove .isEmpty ()) {
592+ return ;
673593 }
594+ // cleanOldLogs may spend some time, especially for sync replication where we may want to
595+ // remove remote wals as the remote cluster may have already been down, so we do it outside
596+ // the lock to avoid block preLogRoll
597+ cleanOldLogs (walsToRemove , source );
674598 }
675599
676600 private void removeRemoteWALs (String peerId , String remoteWALDir , Collection <String > wals )
@@ -750,37 +674,6 @@ public void preLogRoll(Path newLog) throws IOException {
750674 abortAndThrowIOExceptionWhenFail (
751675 () -> this .queueStorage .addWAL (server .getServerName (), source .getQueueId (), logName ));
752676 }
753-
754- // synchronized on walsById to avoid race with cleanOldLogs
755- synchronized (this .walsById ) {
756- // Update walsById map
757- for (Map .Entry <String , Map <String , NavigableSet <String >>> entry : this .walsById
758- .entrySet ()) {
759- String peerId = entry .getKey ();
760- Map <String , NavigableSet <String >> walsByPrefix = entry .getValue ();
761- boolean existingPrefix = false ;
762- for (Map .Entry <String , NavigableSet <String >> walsEntry : walsByPrefix .entrySet ()) {
763- SortedSet <String > wals = walsEntry .getValue ();
764- if (this .sources .isEmpty ()) {
765- // If there's no slaves, don't need to keep the old wals since
766- // we only consider the last one when a new slave comes in
767- wals .clear ();
768- }
769- if (logPrefix .equals (walsEntry .getKey ())) {
770- wals .add (logName );
771- existingPrefix = true ;
772- }
773- }
774- if (!existingPrefix ) {
775- // The new log belongs to a new group, add it into this peer
776- LOG .debug ("Start tracking logs for wal group {} for peer {}" , logPrefix , peerId );
777- NavigableSet <String > wals = new TreeSet <>();
778- wals .add (logName );
779- walsByPrefix .put (logPrefix , wals );
780- }
781- }
782- }
783-
784677 // Add to latestPaths
785678 latestPaths .put (logPrefix , newLog );
786679 }
@@ -887,18 +780,6 @@ void claimQueue(ServerName deadRS, String queue) {
887780 return ;
888781 }
889782 }
890- // track sources in walsByIdRecoveredQueues
891- Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
892- walsByIdRecoveredQueues .put (queueId , walsByGroup );
893- for (String wal : walsSet ) {
894- String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
895- NavigableSet <String > wals = walsByGroup .get (walPrefix );
896- if (wals == null ) {
897- wals = new TreeSet <>();
898- walsByGroup .put (walPrefix , wals );
899- }
900- wals .add (wal );
901- }
902783 oldsources .add (src );
903784 LOG .info ("Added source for recovered queue {}" , src .getQueueId ());
904785 for (String wal : walsSet ) {
@@ -926,15 +807,37 @@ public void join() {
926807 * Get a copy of the wals of the normal sources on this rs
927808 * @return a sorted set of wal names
928809 */
929- public Map <String , Map <String , NavigableSet <String >>> getWALs () {
810+ public Map <String , Map <String , NavigableSet <String >>> getWALs ()
811+ throws ReplicationException {
812+ Map <String , Map <String , NavigableSet <String >>> walsById = new HashMap <>();
813+ for (ReplicationSourceInterface source : sources .values ()) {
814+ String queueId = source .getQueueId ();
815+ Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
816+ walsById .put (queueId , walsByGroup );
817+ for (String wal : this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId )) {
818+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
819+ walsByGroup .computeIfAbsent (walPrefix , p -> new TreeSet <>()).add (wal );
820+ }
821+ }
930822 return Collections .unmodifiableMap (walsById );
931823 }
932824
933825 /**
934826 * Get a copy of the wals of the recovered sources on this rs
935827 * @return a sorted set of wal names
936828 */
937- Map <String , Map <String , NavigableSet <String >>> getWalsByIdRecoveredQueues () {
829+ Map <String , Map <String , NavigableSet <String >>> getWalsByIdRecoveredQueues ()
830+ throws ReplicationException {
831+ Map <String , Map <String , NavigableSet <String >>> walsByIdRecoveredQueues = new HashMap <>();
832+ for (ReplicationSourceInterface source : oldsources ) {
833+ String queueId = source .getQueueId ();
834+ Map <String , NavigableSet <String >> walsByGroup = new HashMap <>();
835+ walsByIdRecoveredQueues .put (queueId , walsByGroup );
836+ for (String wal : this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId )) {
837+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
838+ walsByGroup .computeIfAbsent (walPrefix , p -> new TreeSet <>()).add (wal );
839+ }
840+ }
938841 return Collections .unmodifiableMap (walsByIdRecoveredQueues );
939842 }
940843
@@ -1165,4 +1068,21 @@ private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo reg
11651068 ReplicationQueueStorage getQueueStorage () {
11661069 return queueStorage ;
11671070 }
1071+
1072+ private NavigableSet <String > getWalsToRemove (String queueId , String log , boolean inclusive ) {
1073+ NavigableSet <String > walsToRemove = new TreeSet <>();
1074+ String logPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (log );
1075+ try {
1076+ this .queueStorage .getWALsInQueue (this .server .getServerName (), queueId ).forEach (wal -> {
1077+ String walPrefix = AbstractFSWALProvider .getWALPrefixFromWALName (wal );
1078+ if (walPrefix .equals (logPrefix )) {
1079+ walsToRemove .add (wal );
1080+ }
1081+ });
1082+ } catch (ReplicationException e ) {
1083+ // Just log the exception here, as the recovered replication source will try to cleanup again.
1084+ LOG .warn ("Failed to read wals in queue {}" , queueId , e );
1085+ }
1086+ return walsToRemove .headSet (log , inclusive );
1087+ }
11681088}
0 commit comments