Skip to content

Commit

Permalink
[MSHARED-1072] fix blocking in StreamFeeder (#113)
Browse files Browse the repository at this point in the history
If input stream has no more available data
StreamFeeder was block forever
  • Loading branch information
slawekjaranowski authored Apr 21, 2023
1 parent 63c92f8 commit b60e8a2
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,13 @@
package org.apache.maven.shared.utils.cli;

/**
* Report a timeout for executing process.
*
* @author Olivier Lamy
*
*/
public class CommandLineTimeOutException extends CommandLineException {

/**
*
*/
private static final long serialVersionUID = 7322428741683224481L;

/**
Expand All @@ -36,4 +35,11 @@ public class CommandLineTimeOutException extends CommandLineException {
public CommandLineTimeOutException(String message, Throwable cause) {
super(message, cause);
}

/**
* @param message The message of the exception.
*/
public CommandLineTimeOutException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;

import org.apache.maven.shared.utils.Os;
import org.apache.maven.shared.utils.StringUtils;
Expand All @@ -45,7 +46,7 @@ public abstract class CommandLineUtils {
*/
public static class StringStreamConsumer implements StreamConsumer {

private final StringBuffer string = new StringBuffer();
private final StringBuilder string = new StringBuilder();

private static final String LS = System.getProperty("line.separator", "\n");

Expand All @@ -65,16 +66,6 @@ public String getOutput() {
}
}

/**
* Number of milliseconds per second.
*/
private static final long MILLIS_PER_SECOND = 1000L;

/**
* Number of nanoseconds per second.
*/
private static final long NANOS_PER_SECOND = 1000000000L;

/**
* @param cl The command line {@link Commandline}
* @param systemOut {@link StreamConsumer}
Expand Down Expand Up @@ -277,26 +268,13 @@ public Integer call() throws CommandLineException {
errorPumper.setName("StreamPumper-systemErr");
errorPumper.start();

int returnValue;
if (timeoutInSeconds <= 0) {
returnValue = p.waitFor();
} else {
final long now = System.nanoTime();
final long timeout = now + NANOS_PER_SECOND * timeoutInSeconds;
while (isAlive(p) && (System.nanoTime() < timeout)) {
// The timeout is specified in seconds. Therefore we must not sleep longer than one second
// but we should sleep as long as possible to reduce the number of iterations performed.
Thread.sleep(MILLIS_PER_SECOND - 1L);
}

if (isAlive(p)) {
throw new InterruptedException(
String.format("Process timed out after %d seconds.", timeoutInSeconds));
}

returnValue = p.exitValue();
if (timeoutInSeconds > 0 && !p.waitFor(timeoutInSeconds, TimeUnit.SECONDS)) {
throw new CommandLineTimeOutException(
String.format("Process timed out after %d seconds.", timeoutInSeconds));
}

int returnValue = p.waitFor();

// TODO Find out if waitUntilDone needs to be called using a try-finally construct. The method may
// throw an
// InterruptedException so that calls to waitUntilDone may be skipped.
Expand Down Expand Up @@ -325,12 +303,8 @@ public Integer call() throws CommandLineException {
outputPumper.waitUntilDone();
errorPumper.waitUntilDone();

if (inputFeeder != null) {
inputFeeder.close();

if (inputFeeder.getException() != null) {
throw new CommandLineException("Failure processing stdin.", inputFeeder.getException());
}
if (inputFeeder != null && inputFeeder.getException() != null) {
throw new CommandLineException("Failure processing stdin.", inputFeeder.getException());
}

if (outputPumper.getException() != null) {
Expand All @@ -343,13 +317,10 @@ public Integer call() throws CommandLineException {

return returnValue;
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new CommandLineTimeOutException(
"Error while executing external command, process killed.", ex);

} finally {
if (inputFeeder != null) {
inputFeeder.disable();
}
if (outputPumper != null) {
outputPumper.disable();
}
Expand All @@ -363,14 +334,7 @@ public Integer call() throws CommandLineException {
}
} finally {
ShutdownHookUtils.removeShutdownHook(processHook);

try {
processHook.run();
} finally {
if (inputFeeder != null) {
inputFeeder.close();
}
}
processHook.run();
}
}
}
Expand Down Expand Up @@ -405,19 +369,6 @@ public static Properties getSystemEnvVars(boolean caseSensitive) {
return ensureCaseSensitivity(envs, caseSensitive);
}

private static boolean isAlive(Process p) {
if (p == null) {
return false;
}

try {
p.exitValue();
return false;
} catch (IllegalThreadStateException e) {
return true;
}
}

/**
* @param toProcess The command line to translate.
* @return The array of translated parts.
Expand All @@ -436,7 +387,7 @@ public static String[] translateCommandline(String toProcess) throws CommandLine
boolean inEscape = false;
int state = normal;
final StringTokenizer tok = new StringTokenizer(toProcess, "\"\' \\", true);
List<String> tokens = new ArrayList<String>();
List<String> tokens = new ArrayList<>();
StringBuilder current = new StringBuilder();

while (tok.hasMoreTokens()) {
Expand Down
89 changes: 36 additions & 53 deletions src/main/java/org/apache/maven/shared/utils/cli/StreamFeeder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,72 +21,62 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Objects;

/**
* Read from an InputStream and write the output to an OutputStream.
*
* @author <a href="mailto:[email protected]">Trygve Laugst&oslash;l</a>
*/
class StreamFeeder extends AbstractStreamHandler {
class StreamFeeder extends Thread {

private final AtomicReference<InputStream> input;
private final InputStream input;

private final AtomicReference<OutputStream> output;
private final OutputStream output;

private volatile Throwable exception;
private Throwable exception;
private boolean done;

private final Object lock = new Object();

/**
* Create a new StreamFeeder
*
* @param input Stream to read from
* @param input Stream to read from
* @param output Stream to write to
*/
StreamFeeder(InputStream input, OutputStream output) {
super();
this.input = new AtomicReference<InputStream>(input);
this.output = new AtomicReference<OutputStream>(output);
this.input = Objects.requireNonNull(input);
this.output = Objects.requireNonNull(output);
this.done = false;
}

@Override
@SuppressWarnings("checkstyle:innerassignment")
public void run() {
try {
feed();
} catch (Throwable e) {
// Catch everything so the streams will be closed and flagged as done.
if (this.exception != null) {
this.exception = e;
for (int data; !isInterrupted() && (data = input.read()) != -1; ) {
output.write(data);
}
output.flush();
} catch (IOException e) {
exception = e;
} finally {
close();

synchronized (this) {
notifyAll();
}
}
}

public void close() {
setDone();
final InputStream is = input.getAndSet(null);
if (is != null) {
try {
is.close();
} catch (IOException ex) {
if (this.exception != null) {
this.exception = ex;
}
}
synchronized (lock) {
done = true;
lock.notifyAll();
}
}

final OutputStream os = output.getAndSet(null);
if (os != null) {
try {
os.close();
} catch (IOException ex) {
if (this.exception != null) {
this.exception = ex;
}
private void close() {
try {
output.close();
} catch (IOException e) {
if (exception == null) {
exception = e;
}
}
}
Expand All @@ -98,23 +88,16 @@ public Throwable getException() {
return this.exception;
}

@SuppressWarnings("checkstyle:innerassignment")
private void feed() throws IOException {
InputStream is = input.get();
OutputStream os = output.get();
boolean flush = false;

if (is != null && os != null) {
for (int data; !isDone() && (data = is.read()) != -1; ) {
if (!isDisabled()) {
os.write(data);
flush = true;
public void waitUntilDone() {
this.interrupt();
synchronized (lock) {
while (!done) {
try {
lock.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

if (flush) {
os.flush();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.maven.shared.utils.cli;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;

import org.junit.Test;

import static org.junit.Assert.assertEquals;

public class StreamFeederTest {
static class BlockingInputStream extends ByteArrayInputStream {
public BlockingInputStream(byte[] buf) {
super(buf);
}

@Override
public synchronized int read() {
int data = super.read();
if (data >= 0) {
return data;
}

// end test data ... block
try {
wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return -1;
}
}

@Test
public void waitUntilFeederDone() throws InterruptedException {

String TEST_DATA = "TestData";

BlockingInputStream inputStream = new BlockingInputStream(TEST_DATA.getBytes());
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();

StreamFeeder streamFeeder = new StreamFeeder(inputStream, outputStream);

streamFeeder.start();

// wait until all data from steam will be read
while (outputStream.size() < TEST_DATA.length()) {
Thread.sleep(10);
}

streamFeeder.waitUntilDone(); // wait until process finish

assertEquals(TEST_DATA, outputStream.toString());
}
}

0 comments on commit b60e8a2

Please sign in to comment.