1818
1919package org .apache .hadoop .hbase .replication ;
2020
21+ import static org .apache .hadoop .hbase .HConstants .DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT ;
22+ import static org .apache .hadoop .hbase .HConstants .HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY ;
23+
2124import java .io .IOException ;
2225import java .util .ArrayList ;
2326import java .util .Collections ;
2831
2932import org .apache .hadoop .conf .Configuration ;
3033import org .apache .hadoop .fs .Path ;
34+ import org .apache .hadoop .hbase .Abortable ;
3135import org .apache .hadoop .hbase .HBaseConfiguration ;
36+ import org .apache .hadoop .hbase .ChoreService ;
3237import org .apache .hadoop .hbase .client .AsyncClusterConnection ;
3338import org .apache .hadoop .hbase .client .AsyncRegionServerAdmin ;
3439import org .apache .hadoop .hbase .client .AsyncReplicationServerAdmin ;
3540import org .apache .hadoop .hbase .client .ClusterConnectionFactory ;
3641import org .apache .hadoop .hbase .protobuf .ReplicationProtobufUtil ;
42+ import org .apache .hadoop .hbase .ScheduledChore ;
43+ import org .apache .hadoop .hbase .Server ;
44+ import org .apache .hadoop .hbase .ServerName ;
3745import org .apache .hadoop .hbase .security .User ;
46+ import org .apache .hadoop .hbase .security .UserProvider ;
47+ import org .apache .hadoop .hbase .util .FutureUtils ;
3848import org .apache .hadoop .hbase .wal .WAL ;
39- import org .apache .hadoop .hbase .zookeeper .ZKListener ;
40- import org .apache .yetus .audience .InterfaceAudience ;
41- import org .apache .hadoop .hbase .Abortable ;
42- import org .apache .hadoop .hbase .ServerName ;
4349import org .apache .hadoop .hbase .zookeeper .ZKClusterId ;
50+ import org .apache .hadoop .hbase .zookeeper .ZKListener ;
4451import org .apache .hadoop .hbase .zookeeper .ZKUtil ;
4552import org .apache .hadoop .hbase .zookeeper .ZKWatcher ;
53+ import org .apache .yetus .audience .InterfaceAudience ;
4654import org .apache .zookeeper .KeeperException ;
4755import org .apache .zookeeper .KeeperException .AuthFailedException ;
4856import org .apache .zookeeper .KeeperException .ConnectionLossException ;
5260
5361import org .apache .hbase .thirdparty .com .google .common .annotations .VisibleForTesting ;
5462import org .apache .hbase .thirdparty .com .google .common .collect .Maps ;
63+ import org .apache .hbase .thirdparty .com .google .protobuf .ServiceException ;
64+
65+ import org .apache .hadoop .hbase .shaded .protobuf .ProtobufUtil ;
66+ import org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProtos .ListReplicationSinkServersRequest ;
67+ import org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProtos .ListReplicationSinkServersResponse ;
68+ import org .apache .hadoop .hbase .shaded .protobuf .generated .MasterProtos .MasterService ;
5569
5670/**
5771 * A {@link BaseReplicationEndpoint} for replication endpoints whose
@@ -63,6 +77,13 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
6377
6478 private static final Logger LOG = LoggerFactory .getLogger (HBaseReplicationEndpoint .class );
6579
80+ public static final String FETCH_SERVERS_USE_ZK_CONF_KEY =
81+ "hbase.replication.fetch.servers.usezk" ;
82+
83+ public static final String FETCH_SERVERS_INTERVAL_CONF_KEY =
84+ "hbase.replication.fetch.servers.interval" ;
85+ public static final int DEFAULT_FETCH_SERVERS_INTERVAL = 10 * 60 * 1000 ; // 10 mins
86+
6687 private ZKWatcher zkw = null ;
6788
6889 protected Configuration conf ;
@@ -93,6 +114,11 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
93114
94115 private List <ServerName > sinkServers = new ArrayList <>(0 );
95116
117+ private AsyncClusterConnection peerConnection ;
118+ private boolean fetchServersUseZk = false ;
119+ private FetchServersChore fetchServersChore ;
120+ private int shortOperationTimeout ;
121+
96122 /*
97123 * Some implementations of HBaseInterClusterReplicationEndpoint may require instantiate different
98124 * Connection implementations, or initialize it in a different way, so defining createConnection
@@ -122,6 +148,19 @@ protected synchronized void disconnect() {
122148 if (zkw != null ) {
123149 zkw .close ();
124150 }
151+ if (fetchServersChore != null ) {
152+ ChoreService choreService = ctx .getServer ().getChoreService ();
153+ if (null != choreService ) {
154+ choreService .cancelChore (fetchServersChore );
155+ }
156+ }
157+ if (peerConnection != null ) {
158+ try {
159+ peerConnection .close ();
160+ } catch (IOException e ) {
161+ LOG .warn ("Attempt to close peerConnection failed." , e );
162+ }
163+ }
125164 }
126165
127166 /**
@@ -152,8 +191,27 @@ public void stop() {
152191 }
153192
154193 @ Override
155- protected void doStart () {
194+ protected synchronized void doStart () {
195+ this .shortOperationTimeout = ctx .getLocalConfiguration ().getInt (
196+ HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY , DEFAULT_HBASE_RPC_SHORTOPERATION_TIMEOUT );
156197 try {
198+ if (ctx .getLocalConfiguration ().getBoolean (FETCH_SERVERS_USE_ZK_CONF_KEY , false )) {
199+ fetchServersUseZk = true ;
200+ } else {
201+ try {
202+ if (ReplicationUtils .isPeerClusterSupportReplicationOffload (getPeerConnection ())) {
203+ fetchServersChore = new FetchServersChore (ctx .getServer (), this );
204+ ctx .getServer ().getChoreService ().scheduleChore (fetchServersChore );
205+ fetchServersUseZk = false ;
206+ } else {
207+ fetchServersUseZk = true ;
208+ }
209+ } catch (Throwable t ) {
210+ fetchServersUseZk = true ;
211+ LOG .warn ("Peer {} try to fetch servers by admin failed. Using zk impl." ,
212+ ctx .getPeerId (), t );
213+ }
214+ }
157215 reloadZkWatcher ();
158216 notifyStarted ();
159217 } catch (IOException e ) {
@@ -192,7 +250,9 @@ private synchronized void reloadZkWatcher() throws IOException {
192250 }
193251 zkw = new ZKWatcher (ctx .getConfiguration (),
194252 "connection to cluster: " + ctx .getPeerId (), this );
195- zkw .registerListener (new PeerRegionServerListener (this ));
253+ if (fetchServersUseZk ) {
254+ zkw .registerListener (new PeerRegionServerListener (this ));
255+ }
196256 }
197257
198258 @ Override
@@ -207,12 +267,47 @@ public boolean isAborted() {
207267 return false ;
208268 }
209269
270+ /**
271+ * Get the connection to peer cluster
272+ * @return connection to peer cluster
273+ * @throws IOException If anything goes wrong connecting
274+ */
275+ private synchronized AsyncClusterConnection getPeerConnection () throws IOException {
276+ if (peerConnection == null ) {
277+ Configuration conf = ctx .getConfiguration ();
278+ peerConnection = ClusterConnectionFactory .createAsyncClusterConnection (conf , null ,
279+ UserProvider .instantiate (conf ).getCurrent ());
280+ }
281+ return peerConnection ;
282+ }
283+
284+ /**
285+ * Get the list of all the servers that are responsible for replication sink
286+ * from the specified peer master
287+ * @return list of server addresses or an empty list if the slave is unavailable
288+ */
289+ protected List <ServerName > fetchSlavesAddresses () {
290+ try {
291+ AsyncClusterConnection peerConn = getPeerConnection ();
292+ ServerName master = FutureUtils .get (peerConn .getAdmin ().getMaster ());
293+ MasterService .BlockingInterface masterStub = MasterService .newBlockingStub (
294+ peerConn .getRpcClient ()
295+ .createBlockingRpcChannel (master , User .getCurrent (), shortOperationTimeout ));
296+ ListReplicationSinkServersResponse resp = masterStub
297+ .listReplicationSinkServers (null , ListReplicationSinkServersRequest .newBuilder ().build ());
298+ return ProtobufUtil .toServerNameList (resp .getServerNameList ());
299+ } catch (ServiceException | IOException e ) {
300+ LOG .error ("Peer {} fetches servers failed" , ctx .getPeerId (), e );
301+ }
302+ return Collections .emptyList ();
303+ }
304+
210305 /**
211306 * Get the list of all the region servers from the specified peer
212307 *
213308 * @return list of region server addresses or an empty list if the slave is unavailable
214309 */
215- protected List <ServerName > fetchSlavesAddresses () {
310+ protected List <ServerName > fetchSlavesAddressesByZK () {
216311 List <String > children = null ;
217312 try {
218313 children = ZKUtil .listChildrenAndWatchForNewChildren (zkw , zkw .getZNodePaths ().rsZNode );
@@ -233,7 +328,12 @@ protected List<ServerName> fetchSlavesAddresses() {
233328 }
234329
235330 protected synchronized void chooseSinks () {
236- List <ServerName > slaveAddresses = fetchSlavesAddresses ();
331+ List <ServerName > slaveAddresses = Collections .emptyList ();
332+ if (fetchServersUseZk ) {
333+ slaveAddresses = fetchSlavesAddressesByZK ();
334+ } else {
335+ slaveAddresses = fetchSlavesAddresses ();
336+ }
237337 if (slaveAddresses .isEmpty ()) {
238338 LOG .warn ("No sinks available at peer. Will not be able to replicate" );
239339 }
@@ -264,6 +364,14 @@ protected synchronized SinkPeer getReplicationSink() throws IOException {
264364 return createSinkPeer (serverName );
265365 }
266366
367+ private SinkPeer createSinkPeer (ServerName serverName ) throws IOException {
368+ if (ReplicationUtils .isPeerClusterSupportReplicationOffload (conn )) {
369+ return new ReplicationServerSinkPeer (serverName , conn .getReplicationServerAdmin (serverName ));
370+ } else {
371+ return new RegionServerSinkPeer (serverName , conn .getRegionServerAdmin (serverName ));
372+ }
373+ }
374+
267375 /**
268376 * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
269377 * failed). If a single SinkPeer is reported as bad more than
@@ -373,11 +481,23 @@ public void replicateWALEntry(WAL.Entry[] entries, String replicationClusterId,
373481 }
374482 }
375483
376- private SinkPeer createSinkPeer (ServerName serverName ) throws IOException {
377- if (ReplicationUtils .isPeerClusterSupportReplicationOffload (conn )) {
378- return new ReplicationServerSinkPeer (serverName , conn .getReplicationServerAdmin (serverName ));
379- } else {
380- return new RegionServerSinkPeer (serverName , conn .getRegionServerAdmin (serverName ));
484+ /**
485+ * Chore that will fetch the list of servers from peer master.
486+ */
487+ public static class FetchServersChore extends ScheduledChore {
488+
489+ private HBaseReplicationEndpoint endpoint ;
490+
491+ public FetchServersChore (Server server , HBaseReplicationEndpoint endpoint ) {
492+ super ("Peer-" + endpoint .ctx .getPeerId () + "-FetchServersChore" , server ,
493+ server .getConfiguration ()
494+ .getInt (FETCH_SERVERS_INTERVAL_CONF_KEY , DEFAULT_FETCH_SERVERS_INTERVAL ));
495+ this .endpoint = endpoint ;
496+ }
497+
498+ @ Override
499+ protected void chore () {
500+ endpoint .chooseSinks ();
381501 }
382502 }
383503}
0 commit comments