Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -2403,6 +2403,8 @@ public void move(final byte[] encodedRegionName, byte[] destServerName) throws I

TransitRegionStateProcedure proc =
this.assignmentManager.createMoveRegionProcedure(rp.getRegionInfo(), rp.getDestination());
CompletableFuture<String> completableFuture = new CompletableFuture<>();
proc.setCompletableFuture(completableFuture);
if (conf.getBoolean(WARMUP_BEFORE_MOVE, DEFAULT_WARMUP_BEFORE_MOVE)) {
// Warmup the region on the destination before initiating the move.
// A region server could reject the close request because it either does not
Expand All @@ -2412,11 +2414,10 @@ public void move(final byte[] encodedRegionName, byte[] destServerName) throws I
warmUpRegion(rp.getDestination(), hri);
}
LOG.info(getClientIdAuditPrefix() + " move " + rp + ", running balancer");
Future<byte[]> future = ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
ProcedureSyncWait.submitProcedure(this.procedureExecutor, proc);
try {
// Is this going to work? Will we throw exception on error?
// TODO: CompletableFuture rather than this stunted Future.
future.get();
completableFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new HBaseIOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@ public class TransitRegionStateProcedure

private CompletableFuture<Void> future;

private CompletableFuture<String> completableFuture;

public TransitRegionStateProcedure() {
}

Expand Down Expand Up @@ -747,4 +749,16 @@ public static TransitRegionStateProcedure move(MasterProcedureEnv env, RegionInf
return setOwner(env, new TransitRegionStateProcedure(env, region, targetServer,
targetServer == null, TransitionType.MOVE));
}

@Override
protected void completionCleanup(MasterProcedureEnv env) {
if (null != completableFuture) {
completableFuture.complete("done");
}
}

public void setCompletableFuture(CompletableFuture<String> completableFuture) {
this.completableFuture = completableFuture;
}

}