Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.dsl.internal;

import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty;
import static java.net.HttpURLConnection.HTTP_GONE;
package io.fabric8.kubernetes.client.dsl.internal;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.api.model.Status;
import io.fabric8.kubernetes.api.model.WatchEvent;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;

import io.fabric8.kubernetes.client.dsl.base.OperationSupport;
import okhttp3.*;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import okhttp3.HttpUrl;
import okhttp3.Interceptor;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.logging.HttpLoggingInterceptor;
import okio.BufferedSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.fabric8.kubernetes.client.utils.Utils.isNotNullOrEmpty;
import static java.net.HttpURLConnection.HTTP_GONE;

public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>> implements
Watch {
Expand Down Expand Up @@ -79,9 +71,9 @@ public Thread newThread(Runnable r) {
});

public WatchHTTPManager(final OkHttpClient client,
final BaseOperation<T, L, ?, ?> baseOperation,
final String version, final Watcher<T> watcher, final int reconnectInterval,
final int reconnectLimit, long connectTimeout)
final BaseOperation<T, L, ?, ?> baseOperation,
final String version, final Watcher<T> watcher, final int reconnectInterval,
final int reconnectLimit, long connectTimeout)
throws MalformedURLException {

if (version == null) {
Expand All @@ -97,7 +89,7 @@ public WatchHTTPManager(final OkHttpClient client,

OkHttpClient clonedClient = client.newBuilder()
.connectTimeout(connectTimeout, TimeUnit.MILLISECONDS)
.readTimeout(0,TimeUnit.MILLISECONDS)
.readTimeout(0, TimeUnit.MILLISECONDS)
.cache(null)
.build();

Expand Down Expand Up @@ -147,42 +139,51 @@ private final void runWatch() {
.addHeader("Origin", requestUrl.getProtocol() + "://" + requestUrl.getHost() + ":" + requestUrl.getPort())
.build();

Response response = null;
try {
response = clonedClient.newCall(request).execute();
if(!response.isSuccessful()) {
throw OperationSupport.requestFailure(request,
OperationSupport.createStatus(response.code(), response.message()));
clonedClient.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
logger.info("Watch connection failed. reason: {}", e.getMessage());
scheduleReconnect();
}

BufferedSource source = response.body().source();
while (!source.exhausted()) {
String message = source.readUtf8LineStrict();
onMessage(message);
}
} catch (Exception e) {
logger.info("Watch connection close received. reason: {}", e.getMessage());
} finally {
if (forceClosed.get()) {
logger.warn("Ignoring onClose for already closed/closing connection");
return;
}
if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
watcher.onClose(new KubernetesClientException("Connection unexpectedly closed"));
return;
}
@Override
public void onResponse(Call call, Response response) throws IOException {
if (!response.isSuccessful()) {
throw OperationSupport.requestFailure(request,
OperationSupport.createStatus(response.code(), response.message()));
}

try {
BufferedSource source = response.body().source();
while (!source.exhausted()) {
String message = source.readUtf8LineStrict();
onMessage(message);
}
} catch (Exception e) {
logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage());
}

// if we get here, the source is exhausted, so, we have lost our "watch".
// we must reconnect.
if (response != null) {
response.body().close();
// if we get here, the source is exhausted, so, we have lost our "watch".
// we must reconnect.
if (response != null) {
response.body().close();
}
scheduleReconnect();
}
scheduleReconnect();
}
});
}

private void scheduleReconnect() {
if (forceClosed.get()) {
logger.warn("Ignoring error for already closed/closing connection");
return;
}

if (currentReconnectAttempt.get() >= reconnectLimit && reconnectLimit >= 0) {
watcher.onClose(new KubernetesClientException("Connection unexpectedly closed"));
return;
}

logger.debug("Submitting reconnect task to the executor");
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
Expand Down