Skip to content

Commit a4b287e

Browse files
committed
HBASE-23330: Fix delegation token fetch with MasterRegistry (apache#1084)
Signed-off-by: Andrew Purtell <apurtell@apache.org> (cherry picked from commit fcb2012)
1 parent 25af2b5 commit a4b287e

65 files changed

Lines changed: 3899 additions & 2278 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.CompletableFuture;
3333
import java.util.concurrent.ConcurrentHashMap;
3434
import java.util.concurrent.ConcurrentMap;
35+
import java.util.concurrent.ExecutionException;
3536
import java.util.concurrent.ExecutorService;
3637
import java.util.concurrent.TimeUnit;
3738
import java.util.concurrent.atomic.AtomicReference;
@@ -268,6 +269,15 @@ CompletableFuture<MasterService.Interface> getMasterStub() {
268269
}, stub -> true, "master stub");
269270
}
270271

272+
String getClusterId() {
273+
try {
274+
return registry.getClusterId().get();
275+
} catch (InterruptedException | ExecutionException e) {
276+
LOG.error("Error fetching cluster ID: ", e);
277+
}
278+
return null;
279+
}
280+
271281
void clearMasterStubCache(MasterService.Interface stub) {
272282
masterStub.compareAndSet(stub, null);
273283
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ default Table getTable(TableName tableName, ExecutorService pool) throws IOExcep
193193
*/
194194
TableBuilder getTableBuilder(TableName tableName, ExecutorService pool);
195195

196+
/**
197+
* @return the cluster ID unique to this HBase cluster.
198+
*/
199+
String getClusterId();
200+
196201
/**
197202
* Retrieve an Hbck implementation to fix an HBase cluster.
198203
* The returned Hbck is not guaranteed to be thread-safe. A new instance should be created by

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -378,6 +378,16 @@ public Table build() {
378378
};
379379
}
380380

381+
@Override
382+
public String getClusterId() {
383+
try {
384+
return registry.getClusterId().get();
385+
} catch (Exception e) {
386+
LOG.error("Error fetching cluster ID: ", e);
387+
}
388+
return null;
389+
}
390+
381391
@Override
382392
public BufferedMutator getBufferedMutator(BufferedMutatorParams params) {
383393
if (params.getTableName() == null) {

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultiTableInputFormatBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,5 +240,10 @@ public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
240240
@Override
241241
public void clearRegionLocationCache() {
242242
}
243+
244+
@Override
245+
public String getClusterId() {
246+
return null;
247+
}
243248
}
244249
}

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableInputFormatBase.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -290,5 +290,10 @@ public TableBuilder getTableBuilder(TableName tableName, ExecutorService pool) {
290290
@Override
291291
public void clearRegionLocationCache() {
292292
}
293+
294+
@Override
295+
public String getClusterId() {
296+
return null;
297+
}
293298
}
294299
}

hbase-server/src/main/java/org/apache/hadoop/hbase/SharedConnection.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,4 +105,9 @@ public Hbck getHbck() throws IOException {
105105
public Hbck getHbck(ServerName masterServer) throws IOException {
106106
return conn.getHbck(masterServer);
107107
}
108+
109+
@Override
110+
public String getClusterId() {
111+
return conn.getClusterId();
112+
}
108113
}

hbase-server/src/main/java/org/apache/hadoop/hbase/security/token/TokenUtil.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,14 @@
2525
import org.apache.hadoop.hbase.client.ConnectionFactory;
2626
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
2727
import org.apache.hadoop.hbase.security.User;
28-
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
29-
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
3028
import org.apache.hadoop.io.Text;
3129
import org.apache.hadoop.mapred.JobConf;
3230
import org.apache.hadoop.mapreduce.Job;
3331
import org.apache.hadoop.security.token.Token;
3432
import org.apache.yetus.audience.InterfaceAudience;
35-
import org.apache.zookeeper.KeeperException;
3633
import org.slf4j.Logger;
3734
import org.slf4j.LoggerFactory;
3835

39-
4036
/**
4137
* Utility methods for obtaining authentication tokens.
4238
*/
@@ -206,7 +202,7 @@ public static void obtainTokenForJob(final Connection conn, final JobConf job, U
206202
public static void addTokenForJob(final Connection conn, final JobConf job, User user)
207203
throws IOException, InterruptedException {
208204

209-
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
205+
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn, user);
210206
if (token == null) {
211207
token = ClientTokenUtil.obtainToken(conn, user);
212208
}
@@ -225,7 +221,7 @@ public static void addTokenForJob(final Connection conn, final JobConf job, User
225221
*/
226222
public static void addTokenForJob(final Connection conn, User user, Job job)
227223
throws IOException, InterruptedException {
228-
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
224+
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn, user);
229225
if (token == null) {
230226
token = ClientTokenUtil.obtainToken(conn, user);
231227
}
@@ -244,7 +240,7 @@ public static void addTokenForJob(final Connection conn, User user, Job job)
244240
*/
245241
public static boolean addTokenIfMissing(Connection conn, User user)
246242
throws IOException, InterruptedException {
247-
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn.getConfiguration(), user);
243+
Token<AuthenticationTokenIdentifier> token = getAuthToken(conn, user);
248244
if (token == null) {
249245
token = ClientTokenUtil.obtainToken(conn, user);
250246
user.getUGI().addToken(token.getService(), token);
@@ -257,19 +253,12 @@ public static boolean addTokenIfMissing(Connection conn, User user)
257253
* Get the authentication token of the user for the cluster specified in the configuration
258254
* @return null if the user does not have the token, otherwise the auth token for the cluster.
259255
*/
260-
private static Token<AuthenticationTokenIdentifier> getAuthToken(Configuration conf, User user)
261-
throws IOException, InterruptedException {
262-
ZKWatcher zkw = new ZKWatcher(conf, "TokenUtil-getAuthToken", null);
263-
try {
264-
String clusterId = ZKClusterId.readClusterIdZNode(zkw);
265-
if (clusterId == null) {
266-
throw new IOException("Failed to get cluster ID");
267-
}
268-
return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
269-
} catch (KeeperException e) {
270-
throw new IOException(e);
271-
} finally {
272-
zkw.close();
256+
private static Token<AuthenticationTokenIdentifier> getAuthToken(Connection conn, User user)
257+
throws IOException {
258+
final String clusterId = conn.getClusterId();
259+
if (clusterId == null) {
260+
throw new IOException("Failed to get cluster ID");
273261
}
262+
return new AuthenticationTokenSelector().selectToken(new Text(clusterId), user.getTokens());
274263
}
275264
}

hbase-server/src/main/java/org/apache/hadoop/hbase/util/ConnectionCache.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,19 @@ public boolean updateConnectionAccessTime() {
202202
return false;
203203
}
204204

205+
/**
206+
* @return Cluster ID for the HBase cluster or null if there is an err making the connection.
207+
*/
208+
public String getClusterId() {
209+
try {
210+
ConnectionInfo connInfo = getCurrentConnection();
211+
return connInfo.connection.getClusterId();
212+
} catch (IOException e) {
213+
LOG.error("Error getting connection: ", e);
214+
}
215+
return null;
216+
}
217+
205218
class ConnectionInfo {
206219
final Connection connection;
207220
final String userName;

hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -547,6 +547,11 @@ public RegionLocator getRegionLocator() throws IOException {
547547
};
548548
}
549549

550+
@Override
551+
public String getClusterId() {
552+
return null;
553+
}
554+
550555
@Override
551556
public void clearRegionLocationCache() {
552557
}

hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1276,6 +1276,11 @@ public TThriftServerType getThriftServerType() {
12761276
return TThriftServerType.ONE;
12771277
}
12781278

1279+
@Override
1280+
public String getClusterId() throws TException {
1281+
return connectionCache.getClusterId();
1282+
}
1283+
12791284
private static IOError getIOError(Throwable throwable) {
12801285
IOError error = new IOErrorWithCause(throwable);
12811286
error.setMessage(Throwables.getStackTraceAsString(throwable));

0 commit comments

Comments
 (0)