Skip to content

Commit 07ddfb6

Browse files
Ruan HuiApache9
andcommitted
HBASE-27157 Potential race condition in WorkerAssigner (#4577)
Close #7299 Co-authored-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Lijin Bin <binlijin@apache.org> (cherry picked from commit 0d1ff8a)
1 parent c275c59 commit 07ddfb6

6 files changed

Lines changed: 100 additions & 108 deletions

File tree

hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Arrays;
2727
import java.util.Collections;
2828
import java.util.List;
29-
import java.util.Optional;
3029
import java.util.stream.Collectors;
3130
import org.apache.hadoop.conf.Configuration;
3231
import org.apache.hadoop.fs.FileStatus;
@@ -35,7 +34,6 @@
3534
import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
3635
import org.apache.hadoop.hbase.HConstants;
3736
import org.apache.hadoop.hbase.ServerName;
38-
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
3937
import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure;
4038
import org.apache.hadoop.hbase.procedure2.Procedure;
4139
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
@@ -153,25 +151,19 @@ List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
153151
*/
154152
public ServerName acquireSplitWALWorker(Procedure<?> procedure)
155153
throws ProcedureSuspendedException {
156-
Optional<ServerName> worker = splitWorkerAssigner.acquire();
157-
if (worker.isPresent()) {
158-
LOG.debug("Acquired split WAL worker={}", worker.get());
159-
return worker.get();
160-
}
161-
splitWorkerAssigner.suspend(procedure);
162-
throw new ProcedureSuspendedException();
154+
ServerName worker = splitWorkerAssigner.acquire(procedure);
155+
LOG.debug("Acquired split WAL worker={}", worker);
156+
return worker;
163157
}
164158

165159
/**
166160
* After the worker finished the split WAL task, it will release the worker, and wake up all the
167161
* suspend procedures in the ProcedureEvent
168-
* @param worker worker which is about to release
169-
* @param scheduler scheduler which is to wake up the procedure event
162+
* @param worker worker which is about to release
170163
*/
171-
public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
164+
public void releaseSplitWALWorker(ServerName worker) {
172165
LOG.debug("Release split WAL worker={}", worker);
173166
splitWorkerAssigner.release(worker);
174-
splitWorkerAssigner.wake(scheduler);
175167
}
176168

177169
/**

hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@
2323
import java.util.Map;
2424
import java.util.Optional;
2525
import org.apache.hadoop.hbase.ServerName;
26-
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
2726
import org.apache.hadoop.hbase.procedure2.Procedure;
2827
import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
28+
import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
2929
import org.apache.yetus.audience.InterfaceAudience;
3030

3131
/**
@@ -51,36 +51,37 @@ public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent<?> eve
5151
}
5252
}
5353

54-
public synchronized Optional<ServerName> acquire() {
54+
public synchronized ServerName acquire(Procedure<?> proc) throws ProcedureSuspendedException {
5555
List<ServerName> serverList = master.getServerManager().getOnlineServersList();
5656
Collections.shuffle(serverList);
5757
Optional<ServerName> worker = serverList.stream()
5858
.filter(
5959
serverName -> !currentWorkers.containsKey(serverName) || currentWorkers.get(serverName) > 0)
6060
.findAny();
61-
worker.ifPresent(name -> currentWorkers.compute(name, (serverName,
62-
availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1));
63-
return worker;
61+
if (worker.isPresent()) {
62+
ServerName sn = worker.get();
63+
currentWorkers.compute(sn, (serverName,
64+
availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1);
65+
return sn;
66+
} else {
67+
event.suspend();
68+
event.suspendIfNotReady(proc);
69+
throw new ProcedureSuspendedException();
70+
}
6471
}
6572

6673
public synchronized void release(ServerName serverName) {
6774
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
68-
}
69-
70-
public void suspend(Procedure<?> proc) {
71-
event.suspend();
72-
event.suspendIfNotReady(proc);
73-
}
74-
75-
public void wake(MasterProcedureScheduler scheduler) {
7675
if (!event.isReady()) {
77-
event.wake(scheduler);
76+
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
7877
}
7978
}
8079

8180
@Override
82-
public void serverAdded(ServerName worker) {
83-
this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
81+
public synchronized void serverAdded(ServerName worker) {
82+
if (!event.isReady()) {
83+
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
84+
}
8485
}
8586

8687
public synchronized void addUsedWorker(ServerName worker) {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,7 @@ protected synchronized boolean complete(MasterProcedureEnv env, Throwable error)
109109
setFailure("verify-snapshot", e);
110110
} finally {
111111
// release the worker
112-
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer,
113-
env.getProcedureScheduler());
112+
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer);
114113
}
115114
return isProcedureCompleted;
116115
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.Sp
9090
skipPersistence();
9191
throw new ProcedureSuspendedException();
9292
}
93-
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
93+
splitWALManager.releaseSplitWALWorker(worker);
9494
if (!finished) {
9595
LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker);
9696
setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);

hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.util.Iterator;
2727
import java.util.List;
2828
import java.util.Map;
29-
import java.util.Optional;
3029
import java.util.Set;
3130
import java.util.concurrent.ConcurrentHashMap;
3231
import java.util.concurrent.Executors;
@@ -66,7 +65,6 @@
6665
import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner;
6766
import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure;
6867
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
69-
import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler;
7068
import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil;
7169
import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure;
7270
import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure;
@@ -1474,20 +1472,14 @@ public boolean snapshotProcedureEnabled() {
14741472

14751473
public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
14761474
throws ProcedureSuspendedException {
1477-
Optional<ServerName> worker = verifyWorkerAssigner.acquire();
1478-
if (worker.isPresent()) {
1479-
LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get());
1480-
return worker.get();
1481-
}
1482-
verifyWorkerAssigner.suspend(procedure);
1483-
throw new ProcedureSuspendedException();
1475+
ServerName worker = verifyWorkerAssigner.acquire(procedure);
1476+
LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker);
1477+
return worker;
14841478
}
14851479

1486-
public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker,
1487-
MasterProcedureScheduler scheduler) {
1480+
public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) {
14881481
LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
14891482
verifyWorkerAssigner.release(worker);
1490-
verifyWorkerAssigner.wake(scheduler);
14911483
}
14921484

14931485
private void restoreWorkers() {

0 commit comments

Comments
 (0)