Skip to content

Commit

Permalink
fix fabric8io#3285: adding waiting for list contexts
Browse files Browse the repository at this point in the history
also updating existing logic to use the wait methods
  • Loading branch information
shawkins authored and manusa committed Jul 21, 2021
1 parent c241520 commit 8ed9692
Show file tree
Hide file tree
Showing 26 changed files with 362 additions and 438 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fix #3328: Allow for generic watches of known types

#### Improvements
* Fix #3285: adding waiting for list contexts `Informable.informOnCondition`
* Fix #3284: refined how builders are obtained / used by HasMetadataOperation
* Fix #3314: Add more detailed information about what is generated by the CRD generator

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@

import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.function.Predicate;

public interface Informable<T> {

Expand All @@ -34,6 +37,21 @@ public interface Informable<T> {
*/
Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexers);

/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
* <p>This returned informer will not support resync.
* <p>This call will be blocking for the initial list and watch.
* <p>You are expected to call stop to terminate the underlying Watch.
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handler operations for more than one handler.
*
* @return a running {@link SharedIndexInformer}
*/
default SharedIndexInformer<T> inform() {
return inform(null, 0);
}

/**
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
* and provides a store of all the current resources.
Expand All @@ -56,12 +74,21 @@ default SharedIndexInformer<T> inform(ResourceEventHandler<T> handler) {
* <p>This call will be blocking for the initial list and watch.
* <p>You are expected to call stop to terminate the underlying Watch.
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
* so consider non-blocking handleroperations for more than one handler.
* so consider non-blocking handler operations for more than one handler.
*
* @param handler to notify
* @param resync the resync period or 0 for no resync
* @return a running {@link SharedIndexInformer}
*/
SharedIndexInformer<T> inform(ResourceEventHandler<T> handler, long resync);

/**
* Return a {@link Future} when the list at this context satisfies the given {@link Predicate}.
* The predicate will be tested against the state of the underlying informer store on every event.
* The returned future should be cancelled by the caller if not waiting for completion to close the underlying informer
* @param condition the {@link Predicate} to test
* @return a {@link CompletableFuture} of the list of items after the condition is met
*/
CompletableFuture<List<T>> informOnCondition(Predicate<List<T>> condition);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import io.fabric8.kubernetes.api.model.ObjectReference;
import io.fabric8.kubernetes.client.dsl.WritableOperation;
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.builder.TypedVisitor;
import io.fabric8.kubernetes.api.builder.Visitor;
Expand Down Expand Up @@ -78,7 +76,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -94,7 +92,6 @@ public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceLi
MixedOperation<T, L, R>,
Resource<T> {

private static final Logger LOG = LoggerFactory.getLogger(BaseOperation.class);
private static final String READ_ONLY_UPDATE_EXCEPTION_MESSAGE = "Cannot update read-only resources";
private static final String READ_ONLY_EDIT_EXCEPTION_MESSAGE = "Cannot edit read-only resources";

Expand Down Expand Up @@ -964,56 +961,70 @@ public T waitUntilReady(long amount, TimeUnit timeUnit) {

@Override
public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUnit) {
CompletableFuture<T> future = new CompletableFuture<>();
// tests the condition, trapping any exceptions
Consumer<T> tester = obj -> {
CompletableFuture<T> futureCondition = informOnCondition(l -> {
if (l.isEmpty()) {
return condition.test(null);
}
return condition.test(l.get(0));
}).thenApply(l -> l.isEmpty() ? null : l.get(0));

if (!Utils.waitUntilReady(futureCondition, amount, timeUnit)) {
futureCondition.cancel(true);
T i = getItem();
if (i != null) {
throw new KubernetesClientTimeoutException(i, amount, timeUnit);
}
throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit);
}
return futureCondition.getNow(null);
}

@Override
public CompletableFuture<List<T>> informOnCondition(Predicate<List<T>> condition) {
CompletableFuture<List<T>> future = new CompletableFuture<>();
AtomicReference<Runnable> tester = new AtomicReference<>();

// create an informer that supplies the tester with events and empty list handling
SharedIndexInformer<T> informer = this.createInformer(0);

// prevent unnecessary watches and handle closure
future.whenComplete((r, t) -> informer.stop());

// use the cache to evaluate the list predicate, trapping any exceptions
Runnable test = () -> {
try {
if (condition.test(obj)) {
future.complete(obj);
// could skip if lastResourceVersion has not changed
List<T> list = informer.getStore().list();
if (condition.test(list)) {
future.complete(list);
}
} catch (Exception e) {
future.completeExceptionally(e);
}
};
// start an informer that supplies the tester with events and empty list handling
try (SharedIndexInformer<T> informer = this.createInformer(0, l -> {
if (l.getItems().isEmpty()) {
tester.accept(null);
}
}, new ResourceEventHandler<T>() {

tester.set(test);

informer.addEventHandler(new ResourceEventHandler<T>() {
@Override
public void onAdd(T obj) {
tester.accept(obj);
test.run();
}

@Override
public void onUpdate(T oldObj, T newObj) {
tester.accept(newObj);
public void onDelete(T obj, boolean deletedFinalStateUnknown) {
test.run();
}

@Override
public void onDelete(T obj, boolean deletedFinalStateUnknown) {
tester.accept(null);
public void onUpdate(T oldObj, T newObj) {
test.run();
}
})) {
// prevent unnecessary watches
future.whenComplete((r,t) -> informer.stop());
informer.run();
return future.get(amount, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e.getCause());
} catch (ExecutionException e) {
throw KubernetesClientException.launderThrowable(e.getCause());
} catch (TimeoutException e) {
T i = getItem();
if (i != null) {
throw new KubernetesClientTimeoutException(i, amount, timeUnit);
@Override
public void onNothing() {
test.run();
}
throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit);
}
}
});
informer.run();
return future;
}

public void setType(Class<T> type) {
this.type = type;
Expand Down Expand Up @@ -1041,14 +1052,17 @@ public Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexer

@Override
public SharedIndexInformer<T> inform(ResourceEventHandler<T> handler, long resync) {
DefaultSharedIndexInformer<T, L> result = createInformer(resync, null, handler);
DefaultSharedIndexInformer<T, L> result = createInformer(resync);
if (handler != null) {
result.addEventHandler(handler);
}
// synchronous start list/watch must succeed in the calling thread
// initial add events will be processed in the calling thread as well
result.run();
return result;
}

private DefaultSharedIndexInformer<T, L> createInformer(long resync, Consumer<L> onList, ResourceEventHandler<T> handler) {
private DefaultSharedIndexInformer<T, L> createInformer(long resync) {
T i = getItem();
String name = (Utils.isNotNullOrEmpty(getName()) || i != null) ? checkName(i) : null;

Expand All @@ -1060,11 +1074,7 @@ public L list(ListOptions params, String namespace, OperationContext context) {
if (name != null) {
params.setFieldSelector("metadata.name="+name);
}
L result = BaseOperation.this.list(params);
if (onList != null) {
onList.accept(result);
}
return result;
return BaseOperation.this.list(params);
}

@Override
Expand All @@ -1075,9 +1085,6 @@ public Watch watch(ListOptions params, String namespace, OperationContext contex
if (indexers != null) {
informer.addIndexers(indexers);
}
if (handler != null) {
informer.addEventHandler(handler);
}
return informer;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public List<HasMetadata> waitUntilReady(final long amount, final TimeUnit timeUn
public List<HasMetadata> waitUntilCondition(Predicate<HasMetadata> condition,
long amount,
TimeUnit timeUnit) {
List<HasMetadata> items = acceptVisitors(asHasMetadata(item, true), visitors);
ArrayList<HasMetadata> items = acceptVisitors(asHasMetadata(item, true), visitors);
if (items.isEmpty()) {
return Collections.emptyList();
}
Expand All @@ -109,10 +109,8 @@ public List<HasMetadata> waitUntilCondition(Predicate<HasMetadata> condition,
final List<HasMetadata> results = new ArrayList<>();
final List<HasMetadata> itemsWithConditionNotMatched = new ArrayList<>();

// Iterate over the items because we don't know what kind of List it is.
// But the futures use an ArrayList, so accessing by index is efficient.
int i = 0;
for (final HasMetadata meta : items) {
for (int i = 0; i < items.size(); i++) {
final HasMetadata meta = items.get(i);
try {
CompletableFuture<HasMetadata> future = futures.get(i);
// just get each result as the timeout is enforced below
Expand All @@ -124,7 +122,6 @@ public List<HasMetadata> waitUntilCondition(Predicate<HasMetadata> condition,
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e);
}
++i;
}

if (!itemsWithConditionNotMatched.isEmpty()) {
Expand Down Expand Up @@ -287,8 +284,8 @@ public List<HasMetadata> get() {
}
}

private static List<HasMetadata> acceptVisitors(List<HasMetadata> list, List<Visitor> visitors) {
List<HasMetadata> result = new ArrayList<>();
private static ArrayList<HasMetadata> acceptVisitors(List<HasMetadata> list, List<Visitor> visitors) {
ArrayList<HasMetadata> result = new ArrayList<>();
for (HasMetadata item : list) {
ResourceHandler<HasMetadata, ?> h = handlerOf(item);
VisitableBuilder<HasMetadata, ?> builder = h.edit(item);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,12 @@
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
import io.fabric8.kubernetes.client.dsl.internal.RollingOperationContext;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.PodOperationUtil;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.OkHttpClient;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentList;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,11 +45,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -261,54 +256,37 @@ public ImageEditReplacePatchable<Deployment> withTimeout(long timeout, TimeUnit
* Lets wait until there are enough Ready pods of the given Deployment
*/
private void waitUntilDeploymentIsScaled(final int count) {
final CompletableFuture<Void> scaledFuture = new CompletableFuture<>();
final AtomicReference<Integer> replicasRef = new AtomicReference<>(0);

final String name = checkName(getItem());
final String namespace = checkNamespace(getItem());

final Runnable deploymentPoller = () -> {
try {
Deployment deployment = get();
//If the deployment is gone, we shouldn't wait.
try {
waitUntilCondition(deployment -> {
if (deployment == null) {
if (count == 0) {
scaledFuture.complete(null);
return;
} else {
scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available."));
return;
return true;
}
throw new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.");
}

replicasRef.set(deployment.getStatus().getReplicas());
int currentReplicas = deployment.getStatus().getReplicas() != null ? deployment.getStatus().getReplicas() : 0;
long generation = deployment.getMetadata().getGeneration() != null ? deployment.getMetadata().getGeneration() : 0;
long observedGeneration = deployment.getStatus() != null && deployment.getStatus().getObservedGeneration() != null ? deployment.getStatus().getObservedGeneration() : -1;
if (observedGeneration >= generation && Objects.equals(deployment.getSpec().getReplicas(), currentReplicas)) {
scaledFuture.complete(null);
} else {
LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...",
deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace);
}
} catch (Throwable t) {
LOG.error("Error while waiting for Deployment to be scaled.", t);
}
};

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
try {
if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) {
LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.",
return true;
}
LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...",
deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace);
return false;

}, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS);
LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.",
replicasRef.get(), count, name, namespace);
} else {
LOG.error("{}/{} pod(s) ready for Deployment: {} in namespace: {} after waiting for {} seconds so giving up",
} catch (KubernetesClientTimeoutException e) {
LOG.error("{}/{} pod(s) ready for Deployment: {} in namespace: {} after waiting for {} seconds so giving up",
replicasRef.get(), count, name, namespace, TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout()));
}
} finally {
poller.cancel(true);
executor.shutdown();
}
}

Expand Down
Loading

0 comments on commit 8ed9692

Please sign in to comment.