Skip to content

Commit edd59d5

Browse files
committed
Added tests
1 parent 00b78c7 commit edd59d5

File tree

4 files changed

+158
-10
lines changed

4 files changed

+158
-10
lines changed

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.fabric8.kubernetes.api.model.HasMetadata;
1919
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
2020
import io.fabric8.kubernetes.client.utils.Utils;
21+
import java.net.ProtocolException;
2122
import okhttp3.HttpUrl;
2223
import okhttp3.OkHttpClient;
2324
import okhttp3.Request;
@@ -651,9 +652,15 @@ public Watch watch(String resourceVersion, final Watcher<T> watcher) throws Kube
651652
} catch (MalformedURLException e) {
652653
throw KubernetesClientException.launderThrowable(e);
653654
} catch (KubernetesClientException ke) {
654-
WatchHTTPManager watch = null;
655+
if (ke.getCode() != 200) {
656+
throw ke;
657+
}
658+
659+
// If the HTTP return code is 200, we retry the watch again using a persistent hanging
660+
// HTTP GET. This is meant to handle cases like kubectl local proxy which does not support
661+
// websockets. Issue: https://github.com/kubernetes/kubernetes/issues/25126
655662
try {
656-
watch = new WatchHTTPManager(
663+
return new WatchHTTPManager(
657664
client,
658665
this,
659666
resourceVersion,
@@ -665,7 +672,6 @@ public Watch watch(String resourceVersion, final Watcher<T> watcher) throws Kube
665672
} catch (MalformedURLException e) {
666673
throw KubernetesClientException.launderThrowable(e);
667674
}
668-
return watch;
669675
}
670676
}
671677

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/OperationSupport.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ public static Status createStatus(int statusCode, String message) {
302302
return status;
303303
}
304304

305-
KubernetesClientException requestFailure(Request request, Status status) {
305+
public static KubernetesClientException requestFailure(Request request, Status status) {
306306
StringBuilder sb = new StringBuilder();
307307
sb.append("Failure executing: ").append(request.method())
308308
.append(" at: ").append(request.url().toString()).append(".");
@@ -318,7 +318,7 @@ KubernetesClientException requestFailure(Request request, Status status) {
318318
return new KubernetesClientException(sb.toString(), status.getCode(), status);
319319
}
320320

321-
KubernetesClientException requestException(Request request, Exception e) {
321+
public static KubernetesClientException requestException(Request request, Exception e) {
322322
StringBuilder sb = new StringBuilder();
323323
sb.append("Error executing: ").append(request.method())
324324
.append(" at: ").append(request.url().toString())

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/**
2-
* Copyright (C) 2017 Google, Inc.
2+
* Copyright (C) 2015 Red Hat, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -13,7 +13,6 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
1716
package io.fabric8.kubernetes.client.dsl.internal;
1817

1918
import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty;
@@ -24,11 +23,13 @@
2423
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
2524
import io.fabric8.kubernetes.api.model.Status;
2625
import io.fabric8.kubernetes.api.model.WatchEvent;
26+
import io.fabric8.kubernetes.client.KubernetesClient;
2727
import io.fabric8.kubernetes.client.KubernetesClientException;
2828
import io.fabric8.kubernetes.client.Watch;
2929
import io.fabric8.kubernetes.client.Watcher;
3030
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
3131

32+
import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
3233
import java.io.IOException;
3334
import java.net.MalformedURLException;
3435
import java.net.URL;
@@ -111,7 +112,7 @@ public WatchHTTPManager(final OkHttpClient client,
111112

112113
this.clonedClient = clonedClient;
113114
requestUrl = baseOperation.getNamespacedUrl();
114-
scheduleReconnect();
115+
runWatch();
115116
}
116117

117118
private final void runWatch() {
@@ -149,12 +150,17 @@ private final void runWatch() {
149150
Response response = null;
150151
try {
151152
response = clonedClient.newCall(request).execute();
153+
if(!response.isSuccessful()) {
154+
throw OperationSupport.requestFailure(request,
155+
OperationSupport.createStatus(response.code(), response.message()));
156+
}
157+
152158
BufferedSource source = response.body().source();
153159
while (!source.exhausted()) {
154160
String message = source.readUtf8LineStrict();
155161
onMessage(message);
156162
}
157-
} catch (IOException e) {
163+
} catch (Exception e) {
158164
logger.info("Watch connection close received. reason: {}", e.getMessage());
159165
} finally {
160166
if (forceClosed.get()) {
@@ -166,6 +172,7 @@ private final void runWatch() {
166172
return;
167173
}
168174

175+
169176
// if we get here, the source is exhausted, so, we have lost our "watch".
170177
// we must reconnect.
171178
if (response != null) {
@@ -228,7 +235,6 @@ public void onMessage(String messageSource) throws IOException {
228235
watcher.eventReceived(action, obj);
229236
} else if (event.getObject() instanceof Status) {
230237
Status status = (Status) event.getObject();
231-
232238
// The resource version no longer exists - this has to be handled by the caller.
233239
if (status.getCode() == HTTP_GONE) {
234240
// exception
Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,136 @@
1+
/**
2+
* Copyright (C) 2015 Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.fabric8.kubernetes.client;
18+
19+
import static org.junit.Assert.assertTrue;
20+
21+
import io.fabric8.kubernetes.api.model.Pod;
22+
import io.fabric8.kubernetes.api.model.PodBuilder;
23+
import io.fabric8.kubernetes.api.model.Status;
24+
import io.fabric8.kubernetes.api.model.StatusBuilder;
25+
import io.fabric8.kubernetes.api.model.WatchEvent;
26+
import io.fabric8.kubernetes.api.model.WatchEventBuilder;
27+
import io.fabric8.kubernetes.server.mock.KubernetesServer;
28+
import java.net.HttpURLConnection;
29+
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
31+
import junit.framework.AssertionFailedError;
32+
import org.junit.Rule;
33+
import org.junit.Test;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
public class WatchOverHTTP {
38+
static final Pod pod1 = new PodBuilder().withNewMetadata().withNamespace("test").withName("pod1")
39+
.withResourceVersion("1").endMetadata().build();
40+
static final Status outdatedStatus = new StatusBuilder().withCode(HttpURLConnection.HTTP_GONE)
41+
.withMessage(
42+
"401: The event in requested index is outdated and cleared (the requested history has been cleared [3/1]) [2]")
43+
.build();
44+
static final WatchEvent outdatedEvent = new WatchEventBuilder().withStatusObject(outdatedStatus).build();
45+
final String path = "/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&watch=true";
46+
@Rule
47+
public KubernetesServer server = new KubernetesServer(false);
48+
Logger logger = LoggerFactory.getLogger(WatchTest.class);
49+
50+
@Test
51+
public void testDeleted() throws InterruptedException {
52+
logger.info("testDeleted");
53+
KubernetesClient client = server.getClient().inNamespace("test");
54+
55+
server.expect()
56+
.withPath(path)
57+
.andReturn(200, "Failed WebSocket Connection").once();
58+
server.expect().withPath(path).andReturnChunked(200,
59+
new WatchEvent(pod1, "DELETED"), "\n",
60+
new WatchEvent(pod1, "ADDED"), "\n").once();
61+
62+
final CountDownLatch addLatch = new CountDownLatch(1);
63+
final CountDownLatch deleteLatch = new CountDownLatch(1);
64+
try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher<Pod>() {
65+
@Override
66+
public void eventReceived(Action action, Pod resource) {
67+
switch (action) {
68+
case DELETED:
69+
deleteLatch.countDown();
70+
break;
71+
case ADDED:
72+
addLatch.countDown();
73+
break;
74+
default:
75+
throw new AssertionFailedError();
76+
}
77+
}
78+
79+
@Override
80+
public void onClose(KubernetesClientException cause) {}
81+
})) /* autoclose */ {
82+
assertTrue(addLatch.await(10, TimeUnit.SECONDS));
83+
assertTrue(deleteLatch.await(10, TimeUnit.SECONDS));
84+
}
85+
}
86+
87+
@Test
88+
public void testOutdated() throws InterruptedException {
89+
logger.info("testOutdated");
90+
KubernetesClient client = server.getClient().inNamespace("test");
91+
92+
server.expect()
93+
.withPath(path)
94+
.andReturn(200, "Failed WebSocket Connection").once();
95+
server.expect().withPath(path).andReturnChunked(200, outdatedEvent, "\n").once();
96+
try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher<Pod>() {
97+
@Override
98+
public void eventReceived(Action action, Pod resource) {
99+
throw new AssertionFailedError();
100+
}
101+
102+
@Override
103+
public void onClose(KubernetesClientException cause) {
104+
throw new AssertionFailedError();
105+
}
106+
})){};
107+
}
108+
109+
@Test
110+
public void testHttpErrorReconnect() throws InterruptedException {
111+
logger.info("testHttpErrorReconnect");
112+
KubernetesClient client = server.getClient().inNamespace("test");
113+
114+
server.expect()
115+
.withPath(path)
116+
.andReturn(200, "Failed WebSocket Connection").once();
117+
server.expect().withPath(path).andReturnChunked(503, new StatusBuilder().withCode(503).build()).times(6);
118+
server.expect().withPath(path).andReturnChunked(200, outdatedEvent, "\n").once();
119+
120+
final CountDownLatch closeLatch = new CountDownLatch(1);
121+
try (Watch watch = client.pods().withName("pod1").withResourceVersion("1").watch(new Watcher<Pod>() {
122+
@Override
123+
public void eventReceived(Action action, Pod resource) {
124+
throw new AssertionFailedError();
125+
}
126+
127+
@Override
128+
public void onClose(KubernetesClientException cause) {
129+
logger.debug("onClose", cause);
130+
closeLatch.countDown();
131+
}
132+
})) /* autoclose */ {
133+
assertTrue(closeLatch.await(3, TimeUnit.MINUTES));
134+
}
135+
}
136+
}

0 commit comments

Comments
 (0)