From c24152096f1a0438be1cd4978bea23a8dc185334 Mon Sep 17 00:00:00 2001 From: shawkins Date: Mon, 19 Jul 2021 14:42:38 -0400 Subject: [PATCH] fix #3328: moving the watch type conversion logic up --- CHANGELOG.md | 1 + .../dsl/internal/AbstractWatchManager.java | 26 ++++++++++--------- .../internal/BaseOperationRequestBuilder.java | 9 ++++--- .../dsl/internal/WatchConnectionManager.java | 4 +-- .../client/dsl/internal/WatchHTTPManager.java | 3 +-- .../client/informers/cache/Reflector.java | 5 ---- .../internal/AbstractWatchManagerTest.java | 23 +++++++++------- 7 files changed, 36 insertions(+), 35 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 28040fb81d0..5ac7d81e7c2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ * Fix #3083: CertificateException due to PEM being decoded in CertUtils * Fix #3295: Fix wrong kind getting registered in KubernetesDeserializer in SharedInformerFactory * Fix #3318: Informer relist add/update should not always be sync events +* Fix #3328: Allow for generic watches of known types #### Improvements * Fix #3284: refined how builders are obtained / used by HasMetadataOperation diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index ac633163ab0..6177b621305 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -27,6 +27,7 @@ import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.Watcher.Action; +import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import io.fabric8.kubernetes.client.utils.Serialization; import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.OkHttpClient; @@ -34,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.net.MalformedURLException; import java.util.List; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; @@ -46,11 +48,6 @@ public abstract class AbstractWatchManager implements Watch { - @FunctionalInterface - interface RequestBuilder { - Request build(final String resourceVersion); - } - private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class); final Watcher watcher; @@ -62,21 +59,21 @@ interface RequestBuilder { final AtomicInteger currentReconnectAttempt; private ScheduledFuture reconnectAttempt; - private final RequestBuilder requestBuilder; + private final BaseOperationRequestBuilder requestBuilder; protected final OkHttpClient client; private final AtomicBoolean reconnectPending = new AtomicBoolean(false); AbstractWatchManager( - Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder, Supplier clientSupplier - ) { + Watcher watcher, BaseOperation baseOperation, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, Supplier clientSupplier + ) throws MalformedURLException { this.watcher = watcher; this.reconnectLimit = reconnectLimit; this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, maxIntervalExponent); this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); this.currentReconnectAttempt = new AtomicInteger(0); this.forceClosed = new AtomicBoolean(); - this.requestBuilder = requestBuilder; + this.requestBuilder = new BaseOperationRequestBuilder<>(baseOperation, listOptions); this.client = clientSupplier.get(); runWatch(); @@ -172,8 +169,13 @@ boolean isForceClosed() { return forceClosed.get(); } - void eventReceived(Watcher.Action action, T resource) { - watcher.eventReceived(action, resource); + void eventReceived(Watcher.Action action, HasMetadata resource) { + // the WatchEvent deserialization is not specifically typed + // modify the type here if needed + if (resource != null && !requestBuilder.getBaseOperation().getType().isAssignableFrom(resource.getClass())) { + resource = (HasMetadata) Serialization.jsonMapper().convertValue(resource, requestBuilder.getBaseOperation().getType()); + } + watcher.eventReceived(action, (T)resource); } void updateResourceVersion(final String newResourceVersion) { @@ -238,7 +240,7 @@ protected void onMessage(String message) { List items = list.getItems(); if (items != null) { for (HasMetadata item : items) { - eventReceived(action, (T) item); + eventReceived(action, item); } } } else if (object instanceof Status) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java index e1147e99b40..7aa34cb5d14 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java @@ -30,7 +30,7 @@ import okhttp3.HttpUrl; import okhttp3.Request; -class BaseOperationRequestBuilder> implements AbstractWatchManager.RequestBuilder { +class BaseOperationRequestBuilder> { private final URL requestUrl; private final BaseOperation baseOperation; private final ListOptions listOptions; @@ -40,8 +40,11 @@ public BaseOperationRequestBuilder(BaseOperation baseOperation, ListOpt this.requestUrl = baseOperation.getNamespacedUrl(); this.listOptions = listOptions; } - - @Override + + public BaseOperation getBaseOperation() { + return baseOperation; + } + public Request build(final String resourceVersion) { HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index b263ce0fdb7..b4c62f6d929 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -60,9 +60,7 @@ static void closeBody(Response response) { } public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException { - super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, - new BaseOperationRequestBuilder<>(baseOperation, listOptions), () -> client.newBuilder() + super(watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, () -> client.newBuilder() .readTimeout(websocketTimeout, TimeUnit.MILLISECONDS) .build()); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 35a92dc494f..87ccc2e8577 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -56,8 +56,7 @@ public WatchHTTPManager(final OkHttpClient client, throws MalformedURLException { super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, - new BaseOperationRequestBuilder<>(baseOperation, listOptions), () -> { + watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, () -> { final OkHttpClient clonedClient = client.newBuilder() .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) .readTimeout(0, TimeUnit.MILLISECONDS) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java index 44363abb5ab..b8d37d1826b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java @@ -128,11 +128,6 @@ public void eventReceived(Action action, T resource) { if (resource == null) { throw new KubernetesClientException("Unrecognized resource"); } - // the WatchEvent deserialization is not specifically typed - // modify the type here if needed - if (!apiTypeClass.isAssignableFrom(resource.getClass())) { - resource = Serialization.jsonMapper().convertValue(resource, apiTypeClass); - } if (log.isDebugEnabled()) { log.debug("Event received {} {}# resourceVersion {}", action.name(), resource.getKind(), resource.getMetadata().getResourceVersion()); } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java index 3e623025808..d8908009d60 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -19,13 +19,16 @@ import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.Request; import okhttp3.WebSocket; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; +import org.mockito.Mockito; +import java.net.MalformedURLException; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; @@ -43,7 +46,7 @@ class AbstractWatchManagerTest { @Test @DisplayName("closeEvent, is idempotent, multiple calls only close watcher once") - void closeEventIsIdempotent() { + void closeEventIsIdempotent() throws MalformedURLException { // Given final WatcherAdapter watcher = new WatcherAdapter<>(); final WatchManager awm = withDefaultWatchManager(watcher); @@ -57,7 +60,7 @@ void closeEventIsIdempotent() { @Test @DisplayName("closeEvent, with Exception, is idempotent, multiple calls only close watcher once") - void closeEventWithExceptionIsIdempotent() { + void closeEventWithExceptionIsIdempotent() throws MalformedURLException { // Given final WatcherAdapter watcher = new WatcherAdapter<>(); final WatchManager awm = withDefaultWatchManager(watcher); @@ -82,7 +85,7 @@ void closeWebSocket() { @Test @DisplayName("nextReconnectInterval, returns exponential interval values up to the provided limit") - void nextReconnectInterval() { + void nextReconnectInterval() throws MalformedURLException { // Given final WatchManager awm = new WatchManager<>( null, mock(ListOptions.class), 0, 10, 5); @@ -98,7 +101,7 @@ void nextReconnectInterval() { @Test @DisplayName("cancelReconnect, with null attempt, should do nothing") - void cancelReconnectNullAttempt() { + void cancelReconnectNullAttempt() throws MalformedURLException { // Given final ScheduledFuture sf = spy(ScheduledFuture.class); final WatcherAdapter watcher = new WatcherAdapter<>(); @@ -111,7 +114,7 @@ void cancelReconnectNullAttempt() { @Test @DisplayName("cancelReconnect, with non-null attempt, should cancel") - void cancelReconnectNonNullAttempt() { + void cancelReconnectNonNullAttempt() throws MalformedURLException { // Given final ScheduledFuture sf = mock(ScheduledFuture.class); final MockedStatic utils = mockStatic(Utils.class); @@ -127,7 +130,7 @@ void cancelReconnectNonNullAttempt() { @Test @DisplayName("isClosed, after close invocation, should return true") - void isForceClosedWhenClosed() { + void isForceClosedWhenClosed() throws MalformedURLException { // Given final WatcherAdapter watcher = new WatcherAdapter<>(); final WatchManager awm = withDefaultWatchManager(watcher); @@ -139,7 +142,7 @@ void isForceClosedWhenClosed() { @Test @DisplayName("close, after close invocation, should return true") - void closeWithNonNullRunnerShouldCancelRunner() { + void closeWithNonNullRunnerShouldCancelRunner() throws MalformedURLException { // Given final WatcherAdapter watcher = new WatcherAdapter<>(); final WatchManager awm = withDefaultWatchManager(watcher); @@ -149,7 +152,7 @@ void closeWithNonNullRunnerShouldCancelRunner() { assertThat(awm.closeCount.get()).isEqualTo(1); } - private static WatchManager withDefaultWatchManager(Watcher watcher) { + private static WatchManager withDefaultWatchManager(Watcher watcher) throws MalformedURLException { return new WatchManager<>( watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 1, 0, 0); } @@ -175,8 +178,8 @@ private static final class WatchManager extends AbstractW private final AtomicInteger closeCount = new AtomicInteger(0); - public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) { - super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, resourceVersion -> null, ()->null); + public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) throws MalformedURLException { + super(watcher, Mockito.mock(BaseOperation.class), listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, () -> null); } @Override