Skip to content

Commit

Permalink
fix fabric8io#3328: moving the watch type conversion logic up
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins authored and manusa committed Jul 20, 2021
1 parent 7044437 commit c241520
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Fix #3083: CertificateException due to PEM being decoded in CertUtils
* Fix #3295: Fix wrong kind getting registered in KubernetesDeserializer in SharedInformerFactory
* Fix #3318: Informer relist add/update should not always be sync events
* Fix #3328: Allow for generic watches of known types

#### Improvements
* Fix #3284: refined how builders are obtained / used by HasMetadataOperation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.Watcher.Action;
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
Expand All @@ -46,11 +48,6 @@

public abstract class AbstractWatchManager<T extends HasMetadata> implements Watch {

@FunctionalInterface
interface RequestBuilder {
Request build(final String resourceVersion);
}

private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class);

final Watcher<T> watcher;
Expand All @@ -62,21 +59,21 @@ interface RequestBuilder {
final AtomicInteger currentReconnectAttempt;
private ScheduledFuture<?> reconnectAttempt;

private final RequestBuilder requestBuilder;
private final BaseOperationRequestBuilder requestBuilder;
protected final OkHttpClient client;

private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

AbstractWatchManager(
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder, Supplier<OkHttpClient> clientSupplier
) {
Watcher<T> watcher, BaseOperation<T, ?, ?> baseOperation, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, Supplier<OkHttpClient> clientSupplier
) throws MalformedURLException {
this.watcher = watcher;
this.reconnectLimit = reconnectLimit;
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, maxIntervalExponent);
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();
this.requestBuilder = requestBuilder;
this.requestBuilder = new BaseOperationRequestBuilder<>(baseOperation, listOptions);
this.client = clientSupplier.get();

runWatch();
Expand Down Expand Up @@ -172,8 +169,13 @@ boolean isForceClosed() {
return forceClosed.get();
}

void eventReceived(Watcher.Action action, T resource) {
watcher.eventReceived(action, resource);
void eventReceived(Watcher.Action action, HasMetadata resource) {
// the WatchEvent deserialization is not specifically typed
// modify the type here if needed
if (resource != null && !requestBuilder.getBaseOperation().getType().isAssignableFrom(resource.getClass())) {
resource = (HasMetadata) Serialization.jsonMapper().convertValue(resource, requestBuilder.getBaseOperation().getType());
}
watcher.eventReceived(action, (T)resource);
}

void updateResourceVersion(final String newResourceVersion) {
Expand Down Expand Up @@ -238,7 +240,7 @@ protected void onMessage(String message) {
List<HasMetadata> items = list.getItems();
if (items != null) {
for (HasMetadata item : items) {
eventReceived(action, (T) item);
eventReceived(action, item);
}
}
} else if (object instanceof Status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import okhttp3.HttpUrl;
import okhttp3.Request;

class BaseOperationRequestBuilder<T extends HasMetadata, L extends KubernetesResourceList<T>> implements AbstractWatchManager.RequestBuilder {
class BaseOperationRequestBuilder<T extends HasMetadata, L extends KubernetesResourceList<T>> {
private final URL requestUrl;
private final BaseOperation<T, L, ?> baseOperation;
private final ListOptions listOptions;
Expand All @@ -40,8 +40,11 @@ public BaseOperationRequestBuilder(BaseOperation<T, L, ?> baseOperation, ListOpt
this.requestUrl = baseOperation.getNamespacedUrl();
this.listOptions = listOptions;
}

@Override

public BaseOperation<T, L, ?> getBaseOperation() {
return baseOperation;
}

public Request build(final String resourceVersion) {
HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,7 @@ static void closeBody(Response response) {
}

public WatchConnectionManager(final OkHttpClient client, final BaseOperation<T, L, ?> baseOperation, final ListOptions listOptions, final Watcher<T> watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException {
super(
watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent,
new BaseOperationRequestBuilder<>(baseOperation, listOptions), () -> client.newBuilder()
super(watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, () -> client.newBuilder()
.readTimeout(websocketTimeout, TimeUnit.MILLISECONDS)
.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ public WatchHTTPManager(final OkHttpClient client,
throws MalformedURLException {

super(
watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent,
new BaseOperationRequestBuilder<>(baseOperation, listOptions), () -> {
watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, () -> {
final OkHttpClient clonedClient = client.newBuilder()
.connectTimeout(connectTimeout, TimeUnit.MILLISECONDS)
.readTimeout(0, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,6 @@ public void eventReceived(Action action, T resource) {
if (resource == null) {
throw new KubernetesClientException("Unrecognized resource");
}
// the WatchEvent deserialization is not specifically typed
// modify the type here if needed
if (!apiTypeClass.isAssignableFrom(resource.getClass())) {
resource = Serialization.jsonMapper().convertValue(resource, apiTypeClass);
}
if (log.isDebugEnabled()) {
log.debug("Event received {} {}# resourceVersion {}", action.name(), resource.getKind(), resource.getMetadata().getResourceVersion());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@
import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.Request;
import okhttp3.WebSocket;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import java.net.MalformedURLException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;

Expand All @@ -43,7 +46,7 @@ class AbstractWatchManagerTest {

@Test
@DisplayName("closeEvent, is idempotent, multiple calls only close watcher once")
void closeEventIsIdempotent() {
void closeEventIsIdempotent() throws MalformedURLException {
// Given
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
final WatchManager<HasMetadata> awm = withDefaultWatchManager(watcher);
Expand All @@ -57,7 +60,7 @@ void closeEventIsIdempotent() {

@Test
@DisplayName("closeEvent, with Exception, is idempotent, multiple calls only close watcher once")
void closeEventWithExceptionIsIdempotent() {
void closeEventWithExceptionIsIdempotent() throws MalformedURLException {
// Given
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
final WatchManager<HasMetadata> awm = withDefaultWatchManager(watcher);
Expand All @@ -82,7 +85,7 @@ void closeWebSocket() {

@Test
@DisplayName("nextReconnectInterval, returns exponential interval values up to the provided limit")
void nextReconnectInterval() {
void nextReconnectInterval() throws MalformedURLException {
// Given
final WatchManager<HasMetadata> awm = new WatchManager<>(
null, mock(ListOptions.class), 0, 10, 5);
Expand All @@ -98,7 +101,7 @@ void nextReconnectInterval() {

@Test
@DisplayName("cancelReconnect, with null attempt, should do nothing")
void cancelReconnectNullAttempt() {
void cancelReconnectNullAttempt() throws MalformedURLException {
// Given
final ScheduledFuture<?> sf = spy(ScheduledFuture.class);
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
Expand All @@ -111,7 +114,7 @@ void cancelReconnectNullAttempt() {

@Test
@DisplayName("cancelReconnect, with non-null attempt, should cancel")
void cancelReconnectNonNullAttempt() {
void cancelReconnectNonNullAttempt() throws MalformedURLException {
// Given
final ScheduledFuture<?> sf = mock(ScheduledFuture.class);
final MockedStatic<Utils> utils = mockStatic(Utils.class);
Expand All @@ -127,7 +130,7 @@ void cancelReconnectNonNullAttempt() {

@Test
@DisplayName("isClosed, after close invocation, should return true")
void isForceClosedWhenClosed() {
void isForceClosedWhenClosed() throws MalformedURLException {
// Given
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
final WatchManager<HasMetadata> awm = withDefaultWatchManager(watcher);
Expand All @@ -139,7 +142,7 @@ void isForceClosedWhenClosed() {

@Test
@DisplayName("close, after close invocation, should return true")
void closeWithNonNullRunnerShouldCancelRunner() {
void closeWithNonNullRunnerShouldCancelRunner() throws MalformedURLException {
// Given
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
final WatchManager<HasMetadata> awm = withDefaultWatchManager(watcher);
Expand All @@ -149,7 +152,7 @@ void closeWithNonNullRunnerShouldCancelRunner() {
assertThat(awm.closeCount.get()).isEqualTo(1);
}

private static <T extends HasMetadata> WatchManager<T> withDefaultWatchManager(Watcher<T> watcher) {
private static <T extends HasMetadata> WatchManager<T> withDefaultWatchManager(Watcher<T> watcher) throws MalformedURLException {
return new WatchManager<>(
watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 1, 0, 0);
}
Expand All @@ -175,8 +178,8 @@ private static final class WatchManager<T extends HasMetadata> extends AbstractW

private final AtomicInteger closeCount = new AtomicInteger(0);

public WatchManager(Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) {
super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, resourceVersion -> null, ()->null);
public WatchManager(Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) throws MalformedURLException {
super(watcher, Mockito.mock(BaseOperation.class), listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, () -> null);
}

@Override
Expand Down

0 comments on commit c241520

Please sign in to comment.