From 8d5f0f213b3cf22699a847945a473d0af7192379 Mon Sep 17 00:00:00 2001 From: huiruan Date: Fri, 24 Jun 2022 13:14:53 +0800 Subject: [PATCH 1/4] HBASE-27157 Potential race condition in WorkerAssigner --- .../java/org/apache/hadoop/hbase/master/WorkerAssigner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java index b6df41acee23..546b58eee1e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java @@ -67,12 +67,12 @@ public synchronized void release(ServerName serverName) { currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); } - public void suspend(Procedure proc) { + public synchronized void suspend(Procedure proc) { event.suspend(); event.suspendIfNotReady(proc); } - public void wake(MasterProcedureScheduler scheduler) { + public synchronized void wake(MasterProcedureScheduler scheduler) { if (!event.isReady()) { event.wake(scheduler); } From d51702ecb8778685030b1f3c7ea74a5eb081312d Mon Sep 17 00:00:00 2001 From: huiruan Date: Sat, 25 Jun 2022 22:08:05 +0800 Subject: [PATCH 2/4] HBASE-27157 fold suspend and wake method --- .../hadoop/hbase/master/SplitWALManager.java | 6 ++---- .../hadoop/hbase/master/WorkerAssigner.java | 18 +++++++----------- .../procedure/SnapshotVerifyProcedure.java | 3 +-- .../master/procedure/SplitWALProcedure.java | 2 +- .../hbase/master/snapshot/SnapshotManager.java | 7 ++----- .../hbase/master/TestSplitWALManager.java | 5 ++--- 6 files changed, 15 insertions(+), 26 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java index 18dfc7d493bf..f5f11b8ced03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -153,12 +153,11 @@ List createSplitWALProcedures(List splittingWALs, */ public ServerName acquireSplitWALWorker(Procedure procedure) throws ProcedureSuspendedException { - Optional worker = splitWorkerAssigner.acquire(); + Optional worker = splitWorkerAssigner.acquire(procedure); if (worker.isPresent()) { LOG.debug("Acquired split WAL worker={}", worker.get()); return worker.get(); } - splitWorkerAssigner.suspend(procedure); throw new ProcedureSuspendedException(); } @@ -168,10 +167,9 @@ public ServerName acquireSplitWALWorker(Procedure procedure) * @param worker worker which is about to release * @param scheduler scheduler which is to wake up the procedure event */ - public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) { + public void releaseSplitWALWorker(ServerName worker) { LOG.debug("Release split WAL worker={}", worker); splitWorkerAssigner.release(worker); - splitWorkerAssigner.wake(scheduler); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java index 546b58eee1e1..5f266b365193 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java @@ -51,7 +51,7 @@ public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent eve } } - public synchronized Optional acquire() { + public synchronized Optional acquire(Procedure proc) { List serverList = master.getServerManager().getOnlineServersList(); Collections.shuffle(serverList); Optional worker = serverList.stream() @@ -60,27 +60,23 @@ public synchronized Optional acquire() { .findAny(); worker.ifPresent(name -> currentWorkers.compute(name, (serverName, availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1)); + event.suspend(); + event.suspendIfNotReady(proc); return worker; } public synchronized void release(ServerName serverName) { currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1); - } - - public synchronized void suspend(Procedure proc) { - event.suspend(); - event.suspendIfNotReady(proc); - } - - public synchronized void wake(MasterProcedureScheduler scheduler) { if (!event.isReady()) { - event.wake(scheduler); + event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); } } @Override public void serverAdded(ServerName worker) { - this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + if (!event.isReady()) { + event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + } } public synchronized void addUsedWorker(ServerName worker) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java index 651822ff5b2a..8ec261d768c5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SnapshotVerifyProcedure.java @@ -109,8 +109,7 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) { setFailure("verify-snapshot", e); } finally { // release the worker - env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer, - env.getProcedureScheduler()); + env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java index 699834f9c1d7..98c2c0ec6930 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/SplitWALProcedure.java @@ -90,7 +90,7 @@ protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.Sp skipPersistence(); throw new ProcedureSuspendedException(); } - splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + splitWALManager.releaseSplitWALWorker(worker); if (!finished) { LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker); setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index 1ddcd2e3408e..b22d2aa6471d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -1419,20 +1419,17 @@ public boolean snapshotProcedureEnabled() { public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure) throws ProcedureSuspendedException { - Optional worker = verifyWorkerAssigner.acquire(); + Optional worker = verifyWorkerAssigner.acquire(procedure); if (worker.isPresent()) { LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get()); return worker.get(); } - verifyWorkerAssigner.suspend(procedure); throw new ProcedureSuspendedException(); } - public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker, - MasterProcedureScheduler scheduler) { + public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) { LOG.debug("{} Release verify snapshot worker={}", procedure, worker); verifyWorkerAssigner.release(worker); - verifyWorkerAssigner.wake(scheduler); } private void restoreWorkers() { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java index ea92f7922794..4665b9c16f8a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestSplitWALManager.java @@ -115,8 +115,7 @@ public void testAcquireAndRelease() throws Exception { Assert.assertNotNull(e); Assert.assertTrue(e instanceof ProcedureSuspendedException); - splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster() - .getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); + splitWALManager.releaseSplitWALWorker(server); Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3))); } @@ -348,7 +347,7 @@ protected Flow executeFromState(MasterProcedureEnv env, setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER); return Flow.HAS_MORE_STATE; case RELEASE_SPLIT_WORKER: - splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler()); + splitWALManager.releaseSplitWALWorker(worker); return Flow.NO_MORE_STATE; default: throw new UnsupportedOperationException("unhandled state=" + state); From 070ababc5d09b93be7a7ef832b1680cde880be6b Mon Sep 17 00:00:00 2001 From: huiruan Date: Tue, 28 Jun 2022 11:14:52 +0800 Subject: [PATCH 3/4] make serverAdded method synchronized also --- .../java/org/apache/hadoop/hbase/master/SplitWALManager.java | 2 -- .../java/org/apache/hadoop/hbase/master/WorkerAssigner.java | 3 +-- .../apache/hadoop/hbase/master/snapshot/SnapshotManager.java | 1 - 3 files changed, 1 insertion(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java index f5f11b8ced03..b2b870089504 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -35,7 +35,6 @@ import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.SplitWALProcedure; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; @@ -165,7 +164,6 @@ public ServerName acquireSplitWALWorker(Procedure procedure) * After the worker finished the split WAL task, it will release the worker, and wake up all the * suspend procedures in the ProcedureEvent * @param worker worker which is about to release - * @param scheduler scheduler which is to wake up the procedure event */ public void releaseSplitWALWorker(ServerName worker) { LOG.debug("Release split WAL worker={}", worker); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java index 5f266b365193..b07208eb8ea7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java @@ -23,7 +23,6 @@ import java.util.Map; import java.util.Optional; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; import org.apache.yetus.audience.InterfaceAudience; @@ -73,7 +72,7 @@ public synchronized void release(ServerName serverName) { } @Override - public void serverAdded(ServerName worker) { + public synchronized void serverAdded(ServerName worker) { if (!event.isReady()) { event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index b22d2aa6471d..ba975368cea4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.master.cleaner.HFileLinkCleaner; import org.apache.hadoop.hbase.master.procedure.CloneSnapshotProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.master.procedure.MasterProcedureScheduler; import org.apache.hadoop.hbase.master.procedure.MasterProcedureUtil; import org.apache.hadoop.hbase.master.procedure.RestoreSnapshotProcedure; import org.apache.hadoop.hbase.master.procedure.SnapshotProcedure; From fd5028c91b9c74ea359b62a957404045b38a76df Mon Sep 17 00:00:00 2001 From: huiruan Date: Sat, 2 Jul 2022 19:13:03 +0800 Subject: [PATCH 4/4] throw ProcedureSuspendedException if no worker is available --- .../hadoop/hbase/master/SplitWALManager.java | 9 +++------ .../hadoop/hbase/master/WorkerAssigner.java | 16 ++++++++++++---- .../hbase/master/snapshot/SnapshotManager.java | 9 +++------ 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java index b2b870089504..7497877e67c6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/SplitWALManager.java @@ -152,12 +152,9 @@ List createSplitWALProcedures(List splittingWALs, */ public ServerName acquireSplitWALWorker(Procedure procedure) throws ProcedureSuspendedException { - Optional worker = splitWorkerAssigner.acquire(procedure); - if (worker.isPresent()) { - LOG.debug("Acquired split WAL worker={}", worker.get()); - return worker.get(); - } - throw new ProcedureSuspendedException(); + ServerName worker = splitWorkerAssigner.acquire(procedure); + LOG.debug("Acquired split WAL worker={}", worker); + return worker; } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java index b07208eb8ea7..b1f2558045d1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/WorkerAssigner.java @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.procedure2.Procedure; import org.apache.hadoop.hbase.procedure2.ProcedureEvent; +import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException; import org.apache.yetus.audience.InterfaceAudience; /** @@ -50,7 +51,7 @@ public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent eve } } - public synchronized Optional acquire(Procedure proc) { + public synchronized ServerName acquire(Procedure proc) throws ProcedureSuspendedException { List serverList = master.getServerManager().getOnlineServersList(); Collections.shuffle(serverList); Optional worker = serverList.stream() @@ -59,9 +60,16 @@ public synchronized Optional acquire(Procedure proc) { .findAny(); worker.ifPresent(name -> currentWorkers.compute(name, (serverName, availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1)); - event.suspend(); - event.suspendIfNotReady(proc); - return worker; + if (worker.isPresent()) { + ServerName sn = worker.get(); + currentWorkers.compute(sn, (serverName, + availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1); + return sn; + } else { + event.suspend(); + event.suspendIfNotReady(proc); + throw new ProcedureSuspendedException(); + } } public synchronized void release(ServerName serverName) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java index ba975368cea4..677baff1b6bf 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/snapshot/SnapshotManager.java @@ -1418,12 +1418,9 @@ public boolean snapshotProcedureEnabled() { public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure) throws ProcedureSuspendedException { - Optional worker = verifyWorkerAssigner.acquire(procedure); - if (worker.isPresent()) { - LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get()); - return worker.get(); - } - throw new ProcedureSuspendedException(); + ServerName worker = verifyWorkerAssigner.acquire(procedure); + LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker); + return worker; } public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) {