2121import java .io .IOException ;
2222import java .util .ArrayList ;
2323import java .util .Arrays ;
24+ import java .util .Collections ;
2425import java .util .LinkedList ;
2526import java .util .List ;
2627import java .util .Map ;
3132import org .apache .hadoop .conf .Configured ;
3233import org .apache .hadoop .fs .FileStatus ;
3334import org .apache .hadoop .fs .FileSystem ;
35+ import org .apache .hadoop .fs .Path ;
3436import org .apache .hadoop .hbase .Abortable ;
3537import org .apache .hadoop .hbase .HBaseConfiguration ;
3638import org .apache .hadoop .hbase .ServerName ;
4042import org .apache .hadoop .hbase .client .replication .TableCFs ;
4143import org .apache .hadoop .hbase .io .WALLink ;
4244import org .apache .hadoop .hbase .procedure2 .util .StringUtils ;
45+ import org .apache .hadoop .hbase .replication .ReplicationException ;
46+ import org .apache .hadoop .hbase .replication .ReplicationGroupOffset ;
4347import org .apache .hadoop .hbase .replication .ReplicationPeerConfig ;
4448import org .apache .hadoop .hbase .replication .ReplicationPeerDescription ;
45- import org .apache .hadoop .hbase .replication .ReplicationQueueInfo ;
49+ import org .apache .hadoop .hbase .replication .ReplicationQueueData ;
50+ import org .apache .hadoop .hbase .replication .ReplicationQueueId ;
4651import org .apache .hadoop .hbase .replication .ReplicationQueueStorage ;
52+ import org .apache .hadoop .hbase .replication .ReplicationStorageFactory ;
4753import org .apache .hadoop .hbase .util .EnvironmentEdgeManager ;
48- import org .apache .hadoop .hbase .zookeeper .ZKDump ;
54+ import org .apache .hadoop .hbase .wal .AbstractFSWALProvider ;
55+ import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
4956import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
5057import org .apache .hadoop .util .Tool ;
5158import org .apache .hadoop .util .ToolRunner ;
5259import org .apache .yetus .audience .InterfaceAudience ;
5360import org .slf4j .Logger ;
5461import org .slf4j .LoggerFactory ;
5562
63+ import org .apache .hbase .thirdparty .com .google .common .collect .ImmutableMap ;
5664import org .apache .hbase .thirdparty .com .google .common .util .concurrent .AtomicLongMap ;
5765
5866/**
59- * TODO: reimplement this tool
6067 * <p/>
6168 * Provides information about the existing states of replication, replication peers and queues.
6269 * Usage: hbase org.apache.hadoop.hbase.replication.regionserver.DumpReplicationQueues [args]
6370 * Arguments: --distributed Polls each RS to dump information about the queue --hdfs Reports HDFS
64- * usage by the replication queues (note: can be overestimated).
71+ * usage by the replication queues (note: can be overestimated). In the new version, we
72+ * reimplemented the DumpReplicationQueues tool to support obtaining information from replication
73+ * table.
6574 */
6675@ InterfaceAudience .Private
6776public class DumpReplicationQueues extends Configured implements Tool {
@@ -185,7 +194,7 @@ protected static void printUsage(final String className, final String message) {
185194 System .err .println ("General Options:" );
186195 System .err .println (" -h|--h|--help Show this help and exit." );
187196 System .err .println (" --distributed Poll each RS and print its own replication queue. "
188- + "Default only polls ZooKeeper " );
197+ + "Default only polls replication table. " );
189198 System .err .println (" --hdfs Use HDFS to calculate usage of WALs by replication."
190199 + " It could be overestimated if replicating to multiple peers."
191200 + " --distributed flag is also needed." );
@@ -229,21 +238,45 @@ private int dumpReplicationQueues(DumpOptions opts) throws Exception {
229238 LOG .info ("Found [--distributed], will poll each RegionServer." );
230239 Set <String > peerIds =
231240 peers .stream ().map ((peer ) -> peer .getPeerId ()).collect (Collectors .toSet ());
232- System .out .println (dumpQueues (zkw , peerIds , opts .isHdfs ()));
241+ System .out .println (dumpQueues (zkw , connection , peerIds , opts .isHdfs ()));
233242 System .out .println (dumpReplicationSummary ());
234243 } else {
235- // use ZK instead
236- System .out .print ("Dumping replication znodes via ZooKeeper: " );
237- System .out .println (ZKDump . getReplicationZnodesDump ( zkw ));
244+ // use replication table instead
245+ System .out .println ("Dumping replication info via replication table. " );
246+ System .out .println (dumpReplicationViaTable ( connection ));
238247 }
239248 return (0 );
240249 } catch (IOException e ) {
241250 return (-1 );
242251 } finally {
243- zkw .close ();
252+ connection .close ();
244253 }
245254 }
246255
256+ public String dumpReplicationViaTable (Connection connection ) throws ReplicationException {
257+ StringBuilder sb = new StringBuilder ();
258+ ReplicationQueueStorage queueStorage =
259+ ReplicationStorageFactory .getReplicationQueueStorage (connection , getConf ());
260+ List <ReplicationQueueData > replicationQueueDataList = queueStorage .listAllQueues ();
261+ for (ReplicationQueueData replicationQueueData : replicationQueueDataList ) {
262+ sb .append (replicationQueueData .getId ().getPeerId ()).append ("\n " );
263+ sb .append (replicationQueueData .getId ().getServerName ().getServerName ());
264+ }
265+
266+ for (ReplicationQueueData replicationQueueData : replicationQueueDataList ) {
267+ for (ImmutableMap .Entry <String , ReplicationGroupOffset > entry : replicationQueueData
268+ .getOffsets ().entrySet ()) {
269+ sb .append ("\n " ).append (entry .getKey ()).append ("/" ).append (entry .getValue ().getWal ())
270+ .append (": " ).append (entry .getValue ().getOffset ());
271+ }
272+ }
273+ Set <String > allHFileRefs = queueStorage .getAllHFileRefs ();
274+ for (String hfileRef : allHFileRefs ) {
275+ sb .append ("\n " ).append (hfileRef );
276+ }
277+ return sb .toString ();
278+ }
279+
247280 public String dumpReplicationSummary () {
248281 StringBuilder sb = new StringBuilder ();
249282 if (!deletedQueues .isEmpty ()) {
@@ -255,7 +288,7 @@ public String dumpReplicationSummary() {
255288 }
256289 if (!deadRegionServers .isEmpty ()) {
257290 sb .append ("Found " + deadRegionServers .size () + " dead regionservers"
258- + ", restart one regionserver to transfer the queues of dead regionservers\n " );
291+ + ", restart one regionServer to transfer the queues of dead regionservers\n " );
259292 for (String deadRs : deadRegionServers ) {
260293 sb .append (" " + deadRs + "\n " );
261294 }
@@ -294,80 +327,107 @@ public String dumpPeersState(List<ReplicationPeerDescription> peers) throws Exce
294327 return sb .toString ();
295328 }
296329
297- public String dumpQueues (ZKWatcher zkw , Set <String > peerIds , boolean hdfs ) throws Exception {
298- ReplicationQueueStorage queueStorage ;
330+ public String dumpQueues (ZKWatcher zkw , Connection connection , Set <String > peerIds , boolean hdfs )
331+ throws Exception {
299332 StringBuilder sb = new StringBuilder ();
333+ ReplicationQueueStorage queueStorage =
334+ ReplicationStorageFactory .getReplicationQueueStorage (connection , getConf ());
335+ Set <ServerName > liveRegionServers = ZKUtil .listChildrenNoWatch (zkw , zkw .getZNodePaths ().rsZNode )
336+ .stream ().map (ServerName ::parseServerName ).collect (Collectors .toSet ());
337+
338+ List <ServerName > regionServers = queueStorage .listAllReplicators ();
339+ if (regionServers == null || regionServers .isEmpty ()) {
340+ return sb .toString ();
341+ }
342+ for (ServerName regionServer : regionServers ) {
343+ List <ReplicationQueueId > queueIds = queueStorage .listAllQueueIds (regionServer );
300344
301- // queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
302- // Set<ServerName> liveRegionServers = ZKUtil.listChildrenNoWatch(zkw,
303- // zkw.getZNodePaths().rsZNode)
304- // .stream().map(ServerName::parseServerName).collect(Collectors.toSet());
305- //
306- // Loops each peer on each RS and dumps the queues
307- // List<ServerName> regionservers = queueStorage.getListOfReplicators();
308- // if (regionservers == null || regionservers.isEmpty()) {
309- // return sb.toString();
310- // }
311- // for (ServerName regionserver : regionservers) {
312- // List<String> queueIds = queueStorage.getAllQueues(regionserver);
313- // if (!liveRegionServers.contains(regionserver)) {
314- // deadRegionServers.add(regionserver.getServerName());
315- // }
316- // for (String queueId : queueIds) {
317- // ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
318- // List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
319- // Collections.sort(wals);
320- // if (!peerIds.contains(queueInfo.getPeerId())) {
321- // deletedQueues.add(regionserver + "/" + queueId);
322- // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
323- // } else {
324- // sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
325- // }
326- // }
327- // }
345+ if (!liveRegionServers .contains (regionServer )) {
346+ deadRegionServers .add (regionServer .getServerName ());
347+ }
348+ for (ReplicationQueueId queueId : queueIds ) {
349+ List <String > wals = null ;
350+ if (queueId .isRecovered ()) {
351+ wals = AbstractFSWALProvider
352+ .getArchivedWALFiles (connection .getConfiguration (), queueId .getSourceServerName ().get (),
353+ queueId .getSourceServerName ().get ().toString ())
354+ .stream ().map (Path ::toString ).collect (Collectors .toList ());
355+ } else {
356+ wals = AbstractFSWALProvider
357+ .getArchivedWALFiles (connection .getConfiguration (), queueId .getServerName (),
358+ queueId .getServerName ().toString ())
359+ .stream ().map (Path ::toString ).collect (Collectors .toList ());
360+ }
361+ Collections .sort (wals );
362+ if (!peerIds .contains (queueId .getPeerId ())) {
363+ deletedQueues .add (regionServer + "/" + queueId );
364+ sb .append (formatQueue (regionServer , queueStorage , wals , queueId , true , hdfs ));
365+ } else {
366+ sb .append (formatQueue (regionServer , queueStorage , wals , queueId , false , hdfs ));
367+ }
368+ }
369+ }
328370 return sb .toString ();
329371 }
330372
331- private String formatQueue (ServerName regionserver , ReplicationQueueStorage queueStorage ,
332- ReplicationQueueInfo queueInfo , String queueId , List <String > wals , boolean isDeleted ,
333- boolean hdfs ) throws Exception {
373+ private String formatQueue (ServerName regionServer , ReplicationQueueStorage queueStorage ,
374+ List <String > wals , ReplicationQueueId queueId , boolean isDeleted , boolean hdfs )
375+ throws Exception {
334376 StringBuilder sb = new StringBuilder ();
335377
336- List <ServerName > deadServers ;
337-
338- sb .append ("Dumping replication queue info for RegionServer: [" + regionserver + "]" + "\n " );
378+ sb .append ("Dumping replication queue info for RegionServer: [" + regionServer + "]" + "\n " );
339379 sb .append (" Queue znode: " + queueId + "\n " );
340- sb .append (" PeerID: " + queueInfo .getPeerId () + "\n " );
341- sb .append (" Recovered: " + queueInfo . isQueueRecovered () + "\n " );
342- deadServers = queueInfo . getDeadRegionServers ();
343- if (deadServers . isEmpty ()) {
344- sb .append (" No dead RegionServers found in this queue." + "\n " );
380+ sb .append (" PeerID: " + queueId .getPeerId () + "\n " );
381+ sb .append (" Recovered: " + queueId . isRecovered () + "\n " );
382+ // In new version, we only record the first dead RegionServer in queueId.
383+ if (queueId . getSourceServerName (). isPresent ()) {
384+ sb .append (" Dead RegionServer: " + queueId . getSourceServerName (). get () + "\n " );
345385 } else {
346- sb .append (" Dead RegionServers: " + deadServers + "\n " );
386+ sb .append (" No dead RegionServer found in this queue." + "\n " );
347387 }
348388 sb .append (" Was deleted: " + isDeleted + "\n " );
349389 sb .append (" Number of WALs in replication queue: " + wals .size () + "\n " );
350- peersQueueSize .addAndGet (queueInfo .getPeerId (), wals .size ());
351-
352- for (String wal : wals ) {
353- // long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
354- // sb.append(" Replication position for " + wal + ": "
355- // + (position > 0 ? position : "0" + " (not started or nothing to replicate)") + "\n");
390+ peersQueueSize .addAndGet (queueId .getPeerId (), wals .size ());
391+
392+ Set <Map .Entry <String , ReplicationGroupOffset >> offsets =
393+ queueStorage .getOffsets (queueId ).entrySet ();
394+
395+ for (Map .Entry <String , ReplicationGroupOffset > entry : offsets ) {
396+ String walGroup = null ;
397+ walGroup = entry .getKey ();
398+ for (String wal : wals ) {
399+ ReplicationGroupOffset offset = entry .getValue ();
400+ if (offset .getWal ().equals (wal )) {
401+ long position = offset .getOffset ();
402+ sb .append (
403+ " Replication position for " + (walGroup != null ? walGroup + "/" + wal : wal ) + ": " );
404+
405+ // Position is -1, which means that the file has already been fully replicated,
406+ // the logic here is different from the previous version.
407+ if (position == -1 ) {
408+ sb .append ("-1 (has been replicated.)" );
409+ } else if (position == 0 ) {
410+ sb .append ("0 (not started or nothing to replicate)" );
411+ } else if (position > 0 ) {
412+ sb .append (position );
413+ }
414+ sb .append ("\n " );
415+ }
416+ }
356417 }
357418
358419 if (hdfs ) {
359420 FileSystem fs = FileSystem .get (getConf ());
360421 sb .append (" Total size of WALs on HDFS for this queue: "
361- + StringUtils .humanSize (getTotalWALSize (fs , wals , regionserver )) + "\n " );
422+ + StringUtils .humanSize (getTotalWALSize (fs , wals , regionServer )) + "\n " );
362423 }
363424 return sb .toString ();
364425 }
365426
366427 /**
367428 * return total size in bytes from a list of WALs
368429 */
369- private long getTotalWALSize (FileSystem fs , List <String > wals , ServerName server )
370- throws IOException {
430+ private long getTotalWALSize (FileSystem fs , List <String > wals , ServerName server ) {
371431 long size = 0 ;
372432 FileStatus fileStatus ;
373433
0 commit comments