Skip to content

Commit

Permalink
Merge pull request #1045 from giannello/exec-ws-fixes
Browse files Browse the repository at this point in the history
Support `error` websocket channel
  • Loading branch information
iocanel authored May 23, 2018
2 parents cf61c6d + 454406b commit c78c1a4
Show file tree
Hide file tree
Showing 9 changed files with 133 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.fabric8.kubernetes.client.dsl;

/**
* @param <O> Where to write errorChannel to.
* @param <P> Where to read errorChannel from.
* @param <T> The return type.
*/
public interface ErrorChannelable<O, P, T> {

T writingErrorChannel(O in);

T readingErrorChannel(P in);

T redirectingErrorChannel();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public interface ExecWatch extends Closeable {

InputStream getError();

InputStream getErrorChannel();

/**
* Close the Watch.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Copyright (C) 2015 Red Hat, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.fabric8.kubernetes.client.dsl;

/**
* @param <X> The exec input.
* @param <O> Where to write err channel to.
* @param <P> Where to read err channel from.
* @param <T> The exec output.
*/
public interface TtyExecErrorChannelable<X, O, P, T> extends
TtyExecable<X, T>,
ErrorChannelable<O, P, TtyExecable<X, T>> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
* @param <T> The exec output.
*/
public interface TtyExecErrorable<X, O, P, T> extends
TtyExecable<X, T>,
Errorable<O, P, TtyExecable<X, T>> {
TtyExecErrorChannelable<X, O, P, T>,
Errorable<O, P, TtyExecErrorChannelable<X, O, P, T>> {

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,12 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc
private final InputStream in;
private final OutputStream out;
private final OutputStream err;
private final OutputStream errChannel;

private final PipedOutputStream input;
private final PipedInputStream output;
private final PipedInputStream error;
private final PipedInputStream errorChannel;

private final AtomicReference<WebSocket> webSocketRef = new AtomicReference<>();
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
Expand All @@ -92,16 +94,23 @@ public ExecWebSocketListener(InputStream in, OutputStream out, OutputStream err,
this(new Config(), in, out, err, inputPipe, outputPipe, errorPipe, listener);
}

@Deprecated
public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, ExecListener listener) {
this(config, in, out, err, null, inputPipe, outputPipe, errorPipe, null, listener);
}

public ExecWebSocketListener(Config config, InputStream in, OutputStream out, OutputStream err, OutputStream errChannel, PipedOutputStream inputPipe, PipedInputStream outputPipe, PipedInputStream errorPipe, PipedInputStream errorChannelPipe, ExecListener listener) {
this.config = config;
this.listener = listener;
this.in = inputStreamOrPipe(in, inputPipe, toClose);
this.out = outputStreamOrPipe(out, outputPipe, toClose);
this.err = outputStreamOrPipe(err, errorPipe, toClose);
this.errChannel = outputStreamOrPipe(errChannel, errorChannelPipe, toClose);

this.input = inputPipe;
this.output = outputPipe;
this.error = errorPipe;
this.errorChannel = errorChannelPipe;
this.pumper = new NonBlockingInputStreamPumper(this.in, new Callback<byte[]>() {
@Override
public void call(byte[] data) {
Expand Down Expand Up @@ -182,6 +191,9 @@ public void onOpen(WebSocket webSocket, Response response) {
if (err instanceof PipedOutputStream && error != null) {
error.connect((PipedOutputStream) err);
}
if (errChannel instanceof PipedOutputStream && errorChannel != null) {
errorChannel.connect((PipedOutputStream) errChannel);
}

webSocketRef.set(webSocket);
executorService.submit(pumper);
Expand Down Expand Up @@ -242,8 +254,8 @@ public void onMessage(WebSocket webSocket, ByteString bytes) {
}
break;
case 3:
if (err != null) {
err.write(byteString.toByteArray());
if (errChannel != null) {
errChannel.write(byteString.toByteArray());
}
break;
default:
Expand Down Expand Up @@ -290,6 +302,10 @@ public InputStream getError() {
return error;
}

public InputStream getErrorChannel() {
return errorChannel;
}

private void send(byte[] bytes) throws IOException {
if (bytes.length > 0) {
WebSocket ws = webSocketRef.get();
Expand Down
Loading

0 comments on commit c78c1a4

Please sign in to comment.