Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -153,12 +153,11 @@ List<Procedure> createSplitWALProcedures(List<FileStatus> splittingWALs,
*/
public ServerName acquireSplitWALWorker(Procedure<?> procedure)
throws ProcedureSuspendedException {
Optional<ServerName> worker = splitWorkerAssigner.acquire();
Optional<ServerName> worker = splitWorkerAssigner.acquire(procedure);
if (worker.isPresent()) {
LOG.debug("Acquired split WAL worker={}", worker.get());
return worker.get();
}
splitWorkerAssigner.suspend(procedure);
throw new ProcedureSuspendedException();
}

Expand All @@ -168,10 +167,9 @@ public ServerName acquireSplitWALWorker(Procedure<?> procedure)
* @param worker worker which is about to release
* @param scheduler scheduler which is to wake up the procedure event
*/
public void releaseSplitWALWorker(ServerName worker, MasterProcedureScheduler scheduler) {
public void releaseSplitWALWorker(ServerName worker) {
LOG.debug("Release split WAL worker={}", worker);
splitWorkerAssigner.release(worker);
splitWorkerAssigner.wake(scheduler);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public WorkerAssigner(MasterServices master, int maxTasks, ProcedureEvent<?> eve
}
}

public synchronized Optional<ServerName> acquire() {
public synchronized Optional<ServerName> acquire(Procedure<?> proc) {
List<ServerName> serverList = master.getServerManager().getOnlineServersList();
Collections.shuffle(serverList);
Optional<ServerName> worker = serverList.stream()
Expand All @@ -60,27 +60,23 @@ public synchronized Optional<ServerName> acquire() {
.findAny();
worker.ifPresent(name -> currentWorkers.compute(name, (serverName,
availableWorker) -> availableWorker == null ? maxTasks - 1 : availableWorker - 1));
event.suspend();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should only suspend the procedure when there is no worker available? And I think here we could make the return value as ServerName instead of Optional, and throw ProcedureSuspendedException directly if worker is not available.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Let me try. Thanks Duo.

event.suspendIfNotReady(proc);
return worker;
}

public synchronized void release(ServerName serverName) {
currentWorkers.compute(serverName, (k, v) -> v == null ? null : v + 1);
}

public void suspend(Procedure<?> proc) {
event.suspend();
event.suspendIfNotReady(proc);
}

public void wake(MasterProcedureScheduler scheduler) {
if (!event.isReady()) {
event.wake(scheduler);
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
}
}

@Override
public void serverAdded(ServerName worker) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to also add synchronized here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, miss here. will fix it. Thanks for reminding.

this.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
if (!event.isReady()) {
event.wake(master.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
}
}

public synchronized void addUsedWorker(ServerName worker) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,7 @@ protected synchronized void complete(MasterProcedureEnv env, Throwable error) {
setFailure("verify-snapshot", e);
} finally {
// release the worker
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer,
env.getProcedureScheduler());
env.getMasterServices().getSnapshotManager().releaseSnapshotVerifyWorker(this, targetServer);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ protected Flow executeFromState(MasterProcedureEnv env, MasterProcedureProtos.Sp
skipPersistence();
throw new ProcedureSuspendedException();
}
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
splitWALManager.releaseSplitWALWorker(worker);
if (!finished) {
LOG.warn("Failed to split wal {} by server {}, retry...", walPath, worker);
setNextState(MasterProcedureProtos.SplitWALState.ACQUIRE_SPLIT_WAL_WORKER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1419,20 +1419,17 @@ public boolean snapshotProcedureEnabled() {

public ServerName acquireSnapshotVerifyWorker(SnapshotVerifyProcedure procedure)
throws ProcedureSuspendedException {
Optional<ServerName> worker = verifyWorkerAssigner.acquire();
Optional<ServerName> worker = verifyWorkerAssigner.acquire(procedure);
if (worker.isPresent()) {
LOG.debug("{} Acquired verify snapshot worker={}", procedure, worker.get());
return worker.get();
}
verifyWorkerAssigner.suspend(procedure);
throw new ProcedureSuspendedException();
}

public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker,
MasterProcedureScheduler scheduler) {
public void releaseSnapshotVerifyWorker(SnapshotVerifyProcedure procedure, ServerName worker) {
LOG.debug("{} Release verify snapshot worker={}", procedure, worker);
verifyWorkerAssigner.release(worker);
verifyWorkerAssigner.wake(scheduler);
}

private void restoreWorkers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ public void testAcquireAndRelease() throws Exception {
Assert.assertNotNull(e);
Assert.assertTrue(e instanceof ProcedureSuspendedException);

splitWALManager.releaseSplitWALWorker(server, TEST_UTIL.getHBaseCluster().getMaster()
.getMasterProcedureExecutor().getEnvironment().getProcedureScheduler());
splitWALManager.releaseSplitWALWorker(server);
Assert.assertNotNull(splitWALManager.acquireSplitWALWorker(testProcedures.get(3)));
}

Expand Down Expand Up @@ -348,7 +347,7 @@ protected Flow executeFromState(MasterProcedureEnv env,
setNextState(MasterProcedureProtos.SplitWALState.RELEASE_SPLIT_WORKER);
return Flow.HAS_MORE_STATE;
case RELEASE_SPLIT_WORKER:
splitWALManager.releaseSplitWALWorker(worker, env.getProcedureScheduler());
splitWALManager.releaseSplitWALWorker(worker);
return Flow.NO_MORE_STATE;
default:
throw new UnsupportedOperationException("unhandled state=" + state);
Expand Down