diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 0ed851e5f80..7b961099e07 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -13,43 +13,35 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.fabric8.kubernetes.client.dsl.internal; -import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty; -import static java.net.HttpURLConnection.HTTP_GONE; +package io.fabric8.kubernetes.client.dsl.internal; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.api.model.WatchEvent; -import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.dsl.base.BaseOperation; - import io.fabric8.kubernetes.client.dsl.base.OperationSupport; +import okhttp3.*; +import okhttp3.logging.HttpLoggingInterceptor; +import okio.BufferedSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; -import java.util.concurrent.Executors; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import okhttp3.HttpUrl; -import okhttp3.Interceptor; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.Response; -import okhttp3.logging.HttpLoggingInterceptor; -import okio.BufferedSource; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; + +import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty; +import static java.net.HttpURLConnection.HTTP_GONE; public class WatchHTTPManager> implements Watch { @@ -79,9 +71,9 @@ public Thread newThread(Runnable r) { }); public WatchHTTPManager(final OkHttpClient client, - final BaseOperation baseOperation, - final String version, final Watcher watcher, final int reconnectInterval, - final int reconnectLimit, long connectTimeout) + final BaseOperation baseOperation, + final String version, final Watcher watcher, final int reconnectInterval, + final int reconnectLimit, long connectTimeout) throws MalformedURLException { if (version == null) { @@ -97,7 +89,7 @@ public WatchHTTPManager(final OkHttpClient client, OkHttpClient clonedClient = client.newBuilder() .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) - .readTimeout(0,TimeUnit.MILLISECONDS) + .readTimeout(0, TimeUnit.MILLISECONDS) .cache(null) .build(); @@ -147,42 +139,51 @@ private final void runWatch() { .addHeader("Origin", requestUrl.getProtocol() + "://" + requestUrl.getHost() + ":" + requestUrl.getPort()) .build(); - Response response = null; - try { - response = clonedClient.newCall(request).execute(); - if(!response.isSuccessful()) { - throw OperationSupport.requestFailure(request, - OperationSupport.createStatus(response.code(), response.message())); + clonedClient.newCall(request).enqueue(new Callback() { + @Override + public void onFailure(Call call, IOException e) { + logger.info("Watch connection failed. reason: {}", e.getMessage()); + scheduleReconnect(); } - BufferedSource source = response.body().source(); - while (!source.exhausted()) { - String message = source.readUtf8LineStrict(); - onMessage(message); - } - } catch (Exception e) { - logger.info("Watch connection close received. reason: {}", e.getMessage()); - } finally { - if (forceClosed.get()) { - logger.warn("Ignoring onClose for already closed/closing connection"); - return; - } - if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) { - watcher.onClose(new KubernetesClientException("Connection unexpectedly closed")); - return; - } + @Override + public void onResponse(Call call, Response response) throws IOException { + if (!response.isSuccessful()) { + throw OperationSupport.requestFailure(request, + OperationSupport.createStatus(response.code(), response.message())); + } + try { + BufferedSource source = response.body().source(); + while (!source.exhausted()) { + String message = source.readUtf8LineStrict(); + onMessage(message); + } + } catch (Exception e) { + logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage()); + } - // if we get here, the source is exhausted, so, we have lost our "watch". - // we must reconnect. - if (response != null) { - response.body().close(); + // if we get here, the source is exhausted, so, we have lost our "watch". + // we must reconnect. + if (response != null) { + response.body().close(); + } + scheduleReconnect(); } - scheduleReconnect(); - } + }); } private void scheduleReconnect() { + if (forceClosed.get()) { + logger.warn("Ignoring error for already closed/closing connection"); + return; + } + + if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) { + watcher.onClose(new KubernetesClientException("Connection unexpectedly closed")); + return; + } + logger.debug("Submitting reconnect task to the executor"); // make sure that whichever thread calls this method, the tasks are // performed serially in the executor.