Skip to content

Commit 2209337

Browse files
apache#2965 Atomic ZNode creation on node registration
Reformatted changed file according to the "helix format"
1 parent 9266487 commit 2209337

File tree

5 files changed

+27
-25
lines changed

5 files changed

+27
-25
lines changed

helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -235,10 +235,8 @@ public static GenericHelixController getLeaderController(String clusterName) {
235235
if (clusterName != null) {
236236
ImmutableSet<GenericHelixController> controllers = _helixControllerFactory.get(clusterName);
237237
if (controllers != null) {
238-
return controllers.stream()
239-
.filter(controller -> controller._helixManager != null)
240-
.filter(controller -> controller._helixManager.isLeader())
241-
.findAny().orElse(null);
238+
return controllers.stream().filter(controller -> controller._helixManager != null)
239+
.filter(controller -> controller._helixManager.isLeader()).findAny().orElse(null);
242240
}
243241
}
244242
return null;

helix-core/src/main/java/org/apache/helix/controller/pipeline/AbstractBaseStage.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ public String getStageName() {
6565
return className;
6666
}
6767

68-
public static <T> Future<T> asyncExecute(ExecutorService service, Callable<T> task) {
68+
public static <T> Future asyncExecute(ExecutorService service, Callable<T> task) {
6969
if (service != null) {
7070
return service.submit(ExecutorTaskUtil.wrap(task));
7171
}

helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/PartialRebalanceRunner.java

+14-13
Original file line numberDiff line numberDiff line change
@@ -100,19 +100,20 @@ public void partialRebalance(ResourceControllerDataProvider clusterData, Map<Str
100100
return;
101101
}
102102

103-
_asyncPartialRebalanceResult = _bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
104-
try {
105-
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
106-
currentStateOutput);
107-
} catch (HelixRebalanceException e) {
108-
if (_asyncPartialRebalanceEnabled) {
109-
_rebalanceFailureCount.increment(1L);
110-
}
111-
LOG.error("Failed to calculate best possible assignment!", e);
112-
return false;
113-
}
114-
return true;
115-
}));
103+
_asyncPartialRebalanceResult =
104+
_bestPossibleCalculateExecutor.submit(ExecutorTaskUtil.wrap(() -> {
105+
try {
106+
doPartialRebalance(clusterData, resourceMap, activeNodes, algorithm,
107+
currentStateOutput);
108+
} catch (HelixRebalanceException e) {
109+
if (_asyncPartialRebalanceEnabled) {
110+
_rebalanceFailureCount.increment(1L);
111+
}
112+
LOG.error("Failed to calculate best possible assignment!", e);
113+
return false;
114+
}
115+
return true;
116+
}));
116117
if (!_asyncPartialRebalanceEnabled) {
117118
try {
118119
if (!_asyncPartialRebalanceResult.get()) {

helix-core/src/main/java/org/apache/helix/manager/zk/CallbackEventExecutor.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -89,8 +89,8 @@ public void submitEventToExecutor(NotificationContext.Type eventType, Notificati
8989
logger.error("Failed to process callback. CallbackEventExecutor is already shut down.");
9090
}
9191
if (_futureCallBackProcessEvent == null || _futureCallBackProcessEvent.isDone()) {
92-
_futureCallBackProcessEvent =
93-
_threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
92+
_futureCallBackProcessEvent = _threadPoolExecutor.submit(
93+
ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
9494
} else {
9595
_callBackEventQueue.put(eventType, event);
9696
}
@@ -102,8 +102,8 @@ private void submitPendingHandleCallBackEventToManagerThreadPool(CallbackHandler
102102
if (_callBackEventQueue.size() != 0) {
103103
try {
104104
NotificationContext event = _callBackEventQueue.take();
105-
_futureCallBackProcessEvent =
106-
_threadPoolExecutor.submit(ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
105+
_futureCallBackProcessEvent = _threadPoolExecutor.submit(
106+
ExecutorTaskUtil.wrap(new CallbackProcessor(handler, event)));
107107
} catch (InterruptedException e) {
108108
logger
109109
.error("Error when submitting pending HandleCallBackEvent to manager thread pool", e);

helix-core/src/main/java/org/apache/helix/util/ExecutorTaskUtil.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,8 @@ public static <T> Callable<T> wrap(Callable<T> callable) {
4040
try {
4141
return callable.call();
4242
} catch (Throwable t) {
43-
LOG.error("Callable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
43+
LOG.error("Callable run on thread {} raised an exception and exited",
44+
Thread.currentThread().getName(), t);
4445
throw t;
4546
}
4647
};
@@ -50,15 +51,17 @@ public static <T> Callable<T> wrap(Callable<T> callable) {
5051
* Wrap a runnable so that any raised exception is logged
5152
* (can be interesting in case the callable is used as a completely asynchronous task
5253
* fed to an {@link java.util.concurrent.ExecutorService}), for which we are never
53-
* calling any of the {@link java.util.concurrent.Future#get()} or {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)}
54+
* calling any of the {@link java.util.concurrent.Future#get()} or
55+
* {@link java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit)}
5456
* methods.
5557
*/
5658
public static Runnable wrap(Runnable runnable) {
5759
return () -> {
5860
try {
5961
runnable.run();
6062
} catch (Throwable t) {
61-
LOG.error("Runnable run on thread {} raised an exception and exited", Thread.currentThread().getName(), t);
63+
LOG.error("Runnable run on thread {} raised an exception and exited",
64+
Thread.currentThread().getName(), t);
6265
throw t;
6366
}
6467
};

0 commit comments

Comments
 (0)