From 16549048159ddae1ca77375e10da2849526cfb76 Mon Sep 17 00:00:00 2001 From: Giuseppe Iannello Date: Sun, 25 Mar 2018 13:16:33 +0200 Subject: [PATCH 1/4] Support the `error` stream from the kubernetes websocket API --- .../kubernetes/client/dsl/ExecWatch.java | 11 +++ .../dsl/internal/ExecWebSocketListener.java | 95 ++++++++++++------- .../kubernetes/examples/ExecLoopExample.java | 4 +- .../kubernetes/examples/ExecPipesExample.java | 4 +- 4 files changed, 77 insertions(+), 37 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java index 9bf5d75ef9f..8185167f249 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java @@ -21,12 +21,23 @@ public interface ExecWatch extends Closeable { + @Deprecated OutputStream getInput(); + @Deprecated InputStream getOutput(); + @Deprecated InputStream getError(); + OutputStream getStdinPipe(); + + InputStream getStdoutPipe(); + + InputStream getStderrPipe(); + + InputStream getErrorPipe(); + /** * Close the Watch. */ diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index 19418c4f2e2..b2206cc6be2 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -63,13 +63,15 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc private static final Logger LOGGER = LoggerFactory.getLogger(ExecWebSocketListener.class); private final Config config; - private final InputStream in; - private final OutputStream out; - private final OutputStream err; + private final InputStream stdin; + private final OutputStream stdout; + private final OutputStream stderr; + private final OutputStream error; - private final PipedOutputStream input; - private final PipedInputStream output; - private final PipedInputStream error; + private final PipedOutputStream stdinPipe; + private final PipedInputStream stdoutPipe; + private final PipedInputStream stderrPipe; + private final PipedInputStream errorPipe; private final AtomicReference webSocketRef = new AtomicReference<>(); private final ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -88,21 +90,28 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc private final Set toClose = new LinkedHashSet<>(); @Deprecated - public ExecWebSocketListener(InputStream in, OutputStream out, OutputStream err, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, ExecListener listener) { - this(new Config(), in, out, err, inputPipe, outputPipe, errorPipe, listener); + public ExecWebSocketListener(InputStream stdin, OutputStream stdout, OutputStream stderr, PipedOutputStream stdinPipe, PipedInputStream stdoutPipe, PipedInputStream stderrPipe, ExecListener listener) { + this(new Config(), stdin, stdout, stderr, stdinPipe, stdoutPipe, stderrPipe, listener); } - public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, ExecListener listener) { + @Deprecated + public ExecWebSocketListener(Config config, InputStream stdin, OutputStream stdout, OutputStream stderr, PipedOutputStream stdinPipe, PipedInputStream stdoutPipe, PipedInputStream stderrPipe, ExecListener listener) { + this(config, stdin, stdout, stderr, null, stdinPipe, stdoutPipe, stderrPipe, null, listener); + } + + public ExecWebSocketListener(Config config, InputStream stdin, OutputStream stdout, OutputStream stderr, OutputStream error, PipedOutputStream stdinPipe, PipedInputStream stdoutPipe, PipedInputStream stderrPipe, PipedInputStream errorPipe, ExecListener listener) { this.config = config; this.listener = listener; - this.in = inputStreamOrPipe(in, inputPipe, toClose); - this.out = outputStreamOrPipe(out, outputPipe, toClose); - this.err = outputStreamOrPipe(err, errorPipe, toClose); - - this.input = inputPipe; - this.output = outputPipe; - this.error = errorPipe; - this.pumper = new NonBlockingInputStreamPumper(this.in, new Callback() { + this.stdin = inputStreamOrPipe(stdin, stdinPipe, toClose); + this.stdout = outputStreamOrPipe(stdout, stdoutPipe, toClose); + this.stderr = outputStreamOrPipe(stderr, stderrPipe, toClose); + this.error = outputStreamOrPipe(error, errorPipe, toClose); + + this.stdinPipe = stdinPipe; + this.stdoutPipe = stdoutPipe; + this.stderrPipe = stderrPipe; + this.errorPipe = errorPipe; + this.pumper = new NonBlockingInputStreamPumper(this.stdin, new Callback() { @Override public void call(byte[] data) { try { @@ -173,14 +182,17 @@ public void waitUntilReady() { @Override public void onOpen(WebSocket webSocket, Response response) { try { - if (in instanceof PipedInputStream && input != null) { - input.connect((PipedInputStream) in); + if (stdin instanceof PipedInputStream && stdinPipe != null) { + stdinPipe.connect((PipedInputStream) stdin); + } + if (stdout instanceof PipedOutputStream && stdoutPipe != null) { + stdoutPipe.connect((PipedOutputStream) stdout); } - if (out instanceof PipedOutputStream && output != null) { - output.connect((PipedOutputStream) out); + if (stderr instanceof PipedOutputStream && stderrPipe != null) { + stderrPipe.connect((PipedOutputStream) stderr); } - if (err instanceof PipedOutputStream && error != null) { - error.connect((PipedOutputStream) err); + if (error instanceof PipedOutputStream && errorPipe != null) { + errorPipe.connect((PipedOutputStream) error); } webSocketRef.set(webSocket); @@ -232,18 +244,18 @@ public void onMessage(WebSocket webSocket, ByteString bytes) { if (byteString.size() > 0) { switch (streamID) { case 1: - if (out != null) { - out.write(byteString.toByteArray()); + if (stdout != null) { + stdout.write(byteString.toByteArray()); } break; case 2: - if (err != null) { - err.write(byteString.toByteArray()); + if (stderr != null) { + stderr.write(byteString.toByteArray()); } break; case 3: - if (err != null) { - err.write(byteString.toByteArray()); + if (error != null) { + error.write(byteString.toByteArray()); } break; default: @@ -278,16 +290,35 @@ public void onClosed(WebSocket webSocket, int code, String reason) { } } + @Deprecated public OutputStream getInput() { - return input; + return this.getStdinPipe(); } + @Deprecated public InputStream getOutput() { - return output; + return this.getStdoutPipe(); } + @Deprecated public InputStream getError() { - return error; + return this.getStderrPipe(); + } + + public OutputStream getStdinPipe() { + return stdinPipe; + } + + public InputStream getStdoutPipe() { + return stdoutPipe; + } + + public InputStream getStderrPipe() { + return stderrPipe; + } + + public InputStream getErrorPipe() { + return errorPipe; } private void send(byte[] bytes) throws IOException { diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecLoopExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecLoopExample.java index 92234d0d3d3..88eae61bb7f 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecLoopExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecLoopExample.java @@ -18,7 +18,6 @@ import io.fabric8.kubernetes.client.Callback; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; - import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.ExecListener; import io.fabric8.kubernetes.client.dsl.ExecWatch; import io.fabric8.kubernetes.client.utils.InputStreamPumper; @@ -28,7 +27,6 @@ import java.io.IOException; import java.util.concurrent.CountDownLatch; - import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; @@ -76,7 +74,7 @@ public void onClose(int code, String reason) { latch.countDown(); } }).exec("date"); - pump = new InputStreamPumper(watch.getOutput(), new SystemOutCallback()); + pump = new InputStreamPumper(watch.getStdoutPipe(), new SystemOutCallback()); executorService.submit(pump); Future outPumpFuture = executorService.submit(pump, "Done"); executorService.scheduleAtFixedRate(new FutureChecker("Pump " + (i + 1), outPumpFuture), 0, 2, TimeUnit.SECONDS); diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java index 3e6d9dfe826..3814c648a1b 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java @@ -50,11 +50,11 @@ public static void main(String[] args) throws InterruptedException, IOException .redirectingInput() .redirectingOutput() .exec(); - InputStreamPumper pump = new InputStreamPumper(watch.getOutput(), new SystemOutCallback())) + InputStreamPumper pump = new InputStreamPumper(watch.getStdoutPipe(), new SystemOutCallback())) { executorService.submit(pump); - watch.getInput().write("ls -al\n".getBytes()); + watch.getStdinPipe().write("ls -al\n".getBytes()); Thread.sleep(5 * 1000); } catch (Exception e) { throw KubernetesClientException.launderThrowable(e); From 94dbf0c4caa1783306434f5e7729eb92ce3c8e76 Mon Sep 17 00:00:00 2001 From: Carlos Sanchez Date: Mon, 26 Mar 2018 13:19:37 +0200 Subject: [PATCH 2/4] Add withErrorChannel methods to PodOperations --- .../client/dsl/ErrorChannelable.java | 31 +++++++++ .../client/dsl/TtyExecErrorChannelable.java | 27 ++++++++ .../client/dsl/TtyExecErrorable.java | 4 +- .../dsl/internal/PodOperationsImpl.java | 65 ++++++++++++------- .../client/handlers/PodHandler.java | 14 ++-- .../kubernetes/examples/ExecPipesExample.java | 2 + 6 files changed, 112 insertions(+), 31 deletions(-) create mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ErrorChannelable.java create mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorChannelable.java diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ErrorChannelable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ErrorChannelable.java new file mode 100644 index 00000000000..f9ac0e2be39 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ErrorChannelable.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.dsl; + +/** + * @param Where to write errorChannel to. + * @param

Where to read errorChannel from. + * @param The return type. + */ +public interface ErrorChannelable { + + T writingErrorChannel(O in); + + T readingErrorChannel(P in); + + T redirectingErrorChannel(); +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorChannelable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorChannelable.java new file mode 100644 index 00000000000..d3dbe1e4c81 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorChannelable.java @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.dsl; + +/** + * @param The exec input. + * @param Where to write err channel to. + * @param

Where to read err channel from. + * @param The exec output. + */ +public interface TtyExecErrorChannelable extends + TtyExecable, + ErrorChannelable> { +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java index dd8f4655759..71fe06935bd 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/TtyExecErrorable.java @@ -24,7 +24,7 @@ * @param The exec output. */ public interface TtyExecErrorable extends - TtyExecable, - Errorable> { + TtyExecErrorChannelable, + Errorable> { } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationsImpl.java index f1b8894f8ac..5888e9e9419 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationsImpl.java @@ -47,6 +47,7 @@ import io.fabric8.kubernetes.client.dsl.PrettyLoggable; import io.fabric8.kubernetes.client.dsl.TailPrettyLoggable; import io.fabric8.kubernetes.client.dsl.TimeTailPrettyLoggable; +import io.fabric8.kubernetes.client.dsl.TtyExecErrorChannelable; import io.fabric8.kubernetes.client.dsl.BytesLimitTerminateTimeTailPrettyLoggable; import io.fabric8.kubernetes.client.dsl.TtyExecErrorable; import io.fabric8.kubernetes.client.dsl.TtyExecOutputErrorable; @@ -65,10 +66,12 @@ public class PodOperationsImpl extends HasMetadataOperation labels, Map labelsNot, Map labelsIn, Map labelsNotIn, Map fields) { this(client, config, apiVersion, namespace, name, cascading, item, resourceVersion, reloadingFromServer, gracePeriodSeconds, labels, labelsNot, labelsIn, labelsNotIn, fields, - null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null); + null, null, null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null); } - public PodOperationsImpl(OkHttpClient client, Config config, String apiVersion, String namespace, String name, Boolean cascading, Pod item, String resourceVersion, Boolean reloadingFromServer, long gracePeriodSeconds, Map labels, Map labelsNot, Map labelsIn, Map labelsNotIn, Map fields, String containerId, InputStream in, PipedOutputStream inPipe, OutputStream out, PipedInputStream outPipe, OutputStream err, PipedInputStream errPipe, boolean withTTY, boolean withTerminatedStatus, boolean withTimestamps, String sinceTimestamp, Integer sinceSeconds, Integer withTailingLines, boolean withPrettyOutput, ExecListener execListener, Integer limitBytes) { + public PodOperationsImpl(OkHttpClient client, Config config, String apiVersion, String namespace, String name, Boolean cascading, Pod item, String resourceVersion, Boolean reloadingFromServer, long gracePeriodSeconds, Map labels, Map labelsNot, Map labelsIn, Map labelsNotIn, Map fields, String containerId, InputStream in, PipedOutputStream inPipe, OutputStream out, PipedInputStream outPipe, OutputStream err, PipedInputStream errPipe, OutputStream errChannel, PipedInputStream errChannelPipe, boolean withTTY, boolean withTerminatedStatus, boolean withTimestamps, String sinceTimestamp, Integer sinceSeconds, Integer withTailingLines, boolean withPrettyOutput, ExecListener execListener, Integer limitBytes) { super(client, config, null, apiVersion, "pods", namespace, name, cascading, item, resourceVersion, reloadingFromServer, gracePeriodSeconds, labels, labelsNot, labelsIn, labelsNotIn, fields); this.containerId = containerId; this.in = in; @@ -98,6 +101,8 @@ public PodOperationsImpl(OkHttpClient client, Config config, String apiVersion, this.outPipe = outPipe; this.err = err; this.errPipe = errPipe; + this.errChannel = errChannel; + this.errChannelPipe = errChannelPipe; this.withTTY = withTTY; this.withTerminatedStatus = withTerminatedStatus; this.withTimestamps = withTimestamps; @@ -166,7 +171,7 @@ public Reader getLogReader() { @Override public String getLog(Boolean isPretty) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, isPretty, execListener, limitBytes).getLog(); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, isPretty, execListener, limitBytes).getLog(); } @Override @@ -218,7 +223,7 @@ public LocalPortForward portForward(int port, int localPort) { @Override public ContainerResource inContainer(String containerId) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override @@ -258,7 +263,7 @@ public ExecWatch exec(String... command) { URL url = new URL(URLUtils.join(getResourceUrl().toString(), sb.toString())); Request.Builder r = new Request.Builder().url(url).get(); OkHttpClient clone = client.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build(); - final ExecWebSocketListener execWebSocketListener = new ExecWebSocketListener(in, out, err, inPipe, outPipe, errPipe, execListener); + final ExecWebSocketListener execWebSocketListener = new ExecWebSocketListener(new Config(), in, out, err, errChannel, inPipe, outPipe, errPipe, errChannelPipe, execListener); clone.newWebSocket(r.build(), execWebSocketListener); execWebSocketListener.waitUntilReady(); return execWebSocketListener; @@ -269,12 +274,12 @@ public ExecWatch exec(String... command) { @Override public TtyExecOutputErrorable readingInput(InputStream in) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override public TtyExecOutputErrorable writingInput(PipedOutputStream inPipe) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override @@ -284,12 +289,12 @@ public TtyExecOutputErrorable @Override public TtyExecErrorable writingOutput(OutputStream out) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override public TtyExecErrorable readingOutput(PipedInputStream outPipe) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override @@ -298,60 +303,76 @@ public TtyExecErrorable redir } @Override - public TtyExecable writingError(OutputStream err) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + public TtyExecErrorChannelable writingError(OutputStream err) { + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override - public TtyExecable readingError(PipedInputStream errPipe) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + public TtyExecErrorChannelable readingError(PipedInputStream errPipe) { + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override - public TtyExecable redirectingError() { + public TtyExecErrorChannelable redirectingError() { return readingError(new PipedInputStream()); } + @Override + public TtyExecable writingErrorChannel(OutputStream errChannel) { + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + } + + @Override + public TtyExecable readingErrorChannel(PipedInputStream errChannelPipe) { + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + } + + @Override + public TtyExecable redirectingErrorChannel() { + return readingErrorChannel(new PipedInputStream()); + } + + @Override public ExecListenable withTTY() { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, true, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, true, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override public Loggable withPrettyOutput() { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, true, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, true, execListener, limitBytes); } @Override public PrettyLoggable tailingLines(int withTailingLines) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override public TailPrettyLoggable sinceTime(String sinceTimestamp) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override public TailPrettyLoggable sinceSeconds(int sinceSeconds) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override public TimeTailPrettyLoggable terminated() { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, true, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, true, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override public Execable usingListener(ExecListener execListener) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } @Override public BytesLimitTerminateTimeTailPrettyLoggable limitBytes(int limitBytes) { - return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); + return new PodOperationsImpl(client, getConfig(), apiVersion, namespace, name, isCascading(), getItem(), getResourceVersion(), isReloadingFromServer(), getGracePeriodSeconds(), getLabels(), getLabelsNot(), getLabelsIn(), getLabelsNotIn(), getFields(), containerId, in, inPipe, out, outPipe, err, errPipe, errChannel, errChannelPipe, withTTY, withTerminatedStatus, withTimestamps, sinceTimestamp, sinceSeconds, withTailingLines, withPrettyOutput, execListener, limitBytes); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/handlers/PodHandler.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/handlers/PodHandler.java index 37a32761794..ad8467124c9 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/handlers/PodHandler.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/handlers/PodHandler.java @@ -40,17 +40,17 @@ public String getKind() { @Override public Pod create(OkHttpClient client, Config config, String namespace, Pod item) { - return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).create(); + return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).create(); } @Override public Pod replace(OkHttpClient client, Config config, String namespace, Pod item) { - return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, true, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).replace(item); + return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, true, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).replace(item); } @Override public Pod reload(OkHttpClient client, Config config, String namespace, Pod item) { - return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).fromServer().get(); + return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).fromServer().get(); } @Override @@ -60,21 +60,21 @@ public PodBuilder edit(Pod item) { @Override public Boolean delete(OkHttpClient client, Config config, String namespace, Pod item) { - return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).delete(item); + return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).delete(item); } @Override public Watch watch(OkHttpClient client, Config config, String namespace, Pod item, Watcher watcher) { - return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).watch(watcher); + return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).watch(watcher); } @Override public Watch watch(OkHttpClient client, Config config, String namespace, Pod item, String resourceVersion, Watcher watcher) { - return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).watch(resourceVersion, watcher); + return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).watch(resourceVersion, watcher); } @Override public Pod waitUntilReady(OkHttpClient client, Config config, String namespace, Pod item, long amount, TimeUnit timeUnit) throws InterruptedException { - return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).waitUntilReady(amount,timeUnit); + return new PodOperationsImpl(client, config, null, namespace, null, true, item, null, false, -1, new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), new TreeMap(), null, null, null, null, null, null, null, null, null, false, false, false, null, null, null, false, null, null).waitUntilReady(amount,timeUnit); } } diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java index 3814c648a1b..5da85794a91 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java @@ -49,6 +49,8 @@ public static void main(String[] args) throws InterruptedException, IOException ExecWatch watch = client.pods().withName(podName) .redirectingInput() .redirectingOutput() + .redirectingError() + .redirectingErrorChannel() .exec(); InputStreamPumper pump = new InputStreamPumper(watch.getStdoutPipe(), new SystemOutCallback())) { From 9896a43dbd6a14c15c7d666df073fe2507f0424b Mon Sep 17 00:00:00 2001 From: Carlos Sanchez Date: Tue, 27 Mar 2018 14:36:47 +0200 Subject: [PATCH 3/4] Use v4.channel.k8s.io protocol To get error messages as json --- .../kubernetes/client/dsl/internal/PodOperationsImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationsImpl.java index 5888e9e9419..26696e8e913 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/PodOperationsImpl.java @@ -261,7 +261,7 @@ public ExecWatch exec(String... command) { try { URL url = new URL(URLUtils.join(getResourceUrl().toString(), sb.toString())); - Request.Builder r = new Request.Builder().url(url).get(); + Request.Builder r = new Request.Builder().url(url).header("Sec-WebSocket-Protocol", "v4.channel.k8s.io").get(); OkHttpClient clone = client.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build(); final ExecWebSocketListener execWebSocketListener = new ExecWebSocketListener(new Config(), in, out, err, errChannel, inPipe, outPipe, errPipe, errChannelPipe, execListener); clone.newWebSocket(r.build(), execWebSocketListener); From 454406b9a6265eb6cbd2c6c7542d6e27c184381a Mon Sep 17 00:00:00 2001 From: Giuseppe Iannello Date: Wed, 9 May 2018 11:49:36 +0200 Subject: [PATCH 4/4] Less intrusive naming --- .../kubernetes/client/dsl/ExecWatch.java | 11 +-- .../dsl/internal/ExecWebSocketListener.java | 99 ++++++++----------- .../kubernetes/examples/ExecLoopExample.java | 2 +- .../kubernetes/examples/ExecPipesExample.java | 4 +- 4 files changed, 46 insertions(+), 70 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java index 8185167f249..877ca52ead4 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/ExecWatch.java @@ -21,22 +21,13 @@ public interface ExecWatch extends Closeable { - @Deprecated OutputStream getInput(); - @Deprecated InputStream getOutput(); - @Deprecated InputStream getError(); - OutputStream getStdinPipe(); - - InputStream getStdoutPipe(); - - InputStream getStderrPipe(); - - InputStream getErrorPipe(); + InputStream getErrorChannel(); /** * Close the Watch. diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index b2206cc6be2..1a1dad7a7cd 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -63,15 +63,15 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc private static final Logger LOGGER = LoggerFactory.getLogger(ExecWebSocketListener.class); private final Config config; - private final InputStream stdin; - private final OutputStream stdout; - private final OutputStream stderr; - private final OutputStream error; + private final InputStream in; + private final OutputStream out; + private final OutputStream err; + private final OutputStream errChannel; - private final PipedOutputStream stdinPipe; - private final PipedInputStream stdoutPipe; - private final PipedInputStream stderrPipe; - private final PipedInputStream errorPipe; + private final PipedOutputStream input; + private final PipedInputStream output; + private final PipedInputStream error; + private final PipedInputStream errorChannel; private final AtomicReference webSocketRef = new AtomicReference<>(); private final ExecutorService executorService = Executors.newSingleThreadExecutor(); @@ -90,28 +90,28 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc private final Set toClose = new LinkedHashSet<>(); @Deprecated - public ExecWebSocketListener(InputStream stdin, OutputStream stdout, OutputStream stderr, PipedOutputStream stdinPipe, PipedInputStream stdoutPipe, PipedInputStream stderrPipe, ExecListener listener) { - this(new Config(), stdin, stdout, stderr, stdinPipe, stdoutPipe, stderrPipe, listener); + public ExecWebSocketListener(InputStream in, OutputStream out, OutputStream err, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, ExecListener listener) { + this(new Config(), in, out, err, inputPipe, outputPipe, errorPipe, listener); } @Deprecated - public ExecWebSocketListener(Config config, InputStream stdin, OutputStream stdout, OutputStream stderr, PipedOutputStream stdinPipe, PipedInputStream stdoutPipe, PipedInputStream stderrPipe, ExecListener listener) { - this(config, stdin, stdout, stderr, null, stdinPipe, stdoutPipe, stderrPipe, null, listener); + public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, ExecListener listener) { + this(config, in, out, err, null, inputPipe, outputPipe, errorPipe, null, listener); } - public ExecWebSocketListener(Config config, InputStream stdin, OutputStream stdout, OutputStream stderr, OutputStream error, PipedOutputStream stdinPipe, PipedInputStream stdoutPipe, PipedInputStream stderrPipe, PipedInputStream errorPipe, ExecListener listener) { + public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, OutputStream errChannel, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, PipedInputStream errorChannelPipe, ExecListener listener) { this.config = config; this.listener = listener; - this.stdin = inputStreamOrPipe(stdin, stdinPipe, toClose); - this.stdout = outputStreamOrPipe(stdout, stdoutPipe, toClose); - this.stderr = outputStreamOrPipe(stderr, stderrPipe, toClose); - this.error = outputStreamOrPipe(error, errorPipe, toClose); - - this.stdinPipe = stdinPipe; - this.stdoutPipe = stdoutPipe; - this.stderrPipe = stderrPipe; - this.errorPipe = errorPipe; - this.pumper = new NonBlockingInputStreamPumper(this.stdin, new Callback() { + this.in = inputStreamOrPipe(in, inputPipe, toClose); + this.out = outputStreamOrPipe(out, outputPipe, toClose); + this.err = outputStreamOrPipe(err, errorPipe, toClose); + this.errChannel = outputStreamOrPipe(errChannel, errorChannelPipe, toClose); + + this.input = inputPipe; + this.output = outputPipe; + this.error = errorPipe; + this.errorChannel = errorChannelPipe; + this.pumper = new NonBlockingInputStreamPumper(this.in, new Callback() { @Override public void call(byte[] data) { try { @@ -182,17 +182,17 @@ public void waitUntilReady() { @Override public void onOpen(WebSocket webSocket, Response response) { try { - if (stdin instanceof PipedInputStream && stdinPipe != null) { - stdinPipe.connect((PipedInputStream) stdin); + if (in instanceof PipedInputStream && input != null) { + input.connect((PipedInputStream) in); } - if (stdout instanceof PipedOutputStream && stdoutPipe != null) { - stdoutPipe.connect((PipedOutputStream) stdout); + if (out instanceof PipedOutputStream && output != null) { + output.connect((PipedOutputStream) out); } - if (stderr instanceof PipedOutputStream && stderrPipe != null) { - stderrPipe.connect((PipedOutputStream) stderr); + if (err instanceof PipedOutputStream && error != null) { + error.connect((PipedOutputStream) err); } - if (error instanceof PipedOutputStream && errorPipe != null) { - errorPipe.connect((PipedOutputStream) error); + if (errChannel instanceof PipedOutputStream && errorChannel != null) { + errorChannel.connect((PipedOutputStream) errChannel); } webSocketRef.set(webSocket); @@ -244,18 +244,18 @@ public void onMessage(WebSocket webSocket, ByteString bytes) { if (byteString.size() > 0) { switch (streamID) { case 1: - if (stdout != null) { - stdout.write(byteString.toByteArray()); + if (out != null) { + out.write(byteString.toByteArray()); } break; case 2: - if (stderr != null) { - stderr.write(byteString.toByteArray()); + if (err != null) { + err.write(byteString.toByteArray()); } break; case 3: - if (error != null) { - error.write(byteString.toByteArray()); + if (errChannel != null) { + errChannel.write(byteString.toByteArray()); } break; default: @@ -290,35 +290,20 @@ public void onClosed(WebSocket webSocket, int code, String reason) { } } - @Deprecated public OutputStream getInput() { - return this.getStdinPipe(); + return input; } - @Deprecated public InputStream getOutput() { - return this.getStdoutPipe(); + return output; } - @Deprecated public InputStream getError() { - return this.getStderrPipe(); - } - - public OutputStream getStdinPipe() { - return stdinPipe; - } - - public InputStream getStdoutPipe() { - return stdoutPipe; - } - - public InputStream getStderrPipe() { - return stderrPipe; + return error; } - public InputStream getErrorPipe() { - return errorPipe; + public InputStream getErrorChannel() { + return errorChannel; } private void send(byte[] bytes) throws IOException { diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecLoopExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecLoopExample.java index 88eae61bb7f..a86c4f04384 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecLoopExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecLoopExample.java @@ -74,7 +74,7 @@ public void onClose(int code, String reason) { latch.countDown(); } }).exec("date"); - pump = new InputStreamPumper(watch.getStdoutPipe(), new SystemOutCallback()); + pump = new InputStreamPumper(watch.getOutput(), new SystemOutCallback()); executorService.submit(pump); Future outPumpFuture = executorService.submit(pump, "Done"); executorService.scheduleAtFixedRate(new FutureChecker("Pump " + (i + 1), outPumpFuture), 0, 2, TimeUnit.SECONDS); diff --git a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java index 5da85794a91..63bbd9592c4 100644 --- a/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java +++ b/kubernetes-examples/src/main/java/io/fabric8/kubernetes/examples/ExecPipesExample.java @@ -52,11 +52,11 @@ public static void main(String[] args) throws InterruptedException, IOException .redirectingError() .redirectingErrorChannel() .exec(); - InputStreamPumper pump = new InputStreamPumper(watch.getStdoutPipe(), new SystemOutCallback())) + InputStreamPumper pump = new InputStreamPumper(watch.getOutput(), new SystemOutCallback())) { executorService.submit(pump); - watch.getStdinPipe().write("ls -al\n".getBytes()); + watch.getInput().write("ls -al\n".getBytes()); Thread.sleep(5 * 1000); } catch (Exception e) { throw KubernetesClientException.launderThrowable(e);