Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automatically pump os.proc streams when SystemStreams are redirected #3275

Merged
merged 17 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion build.sc
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ object Deps {
val junitInterface = ivy"com.github.sbt:junit-interface:0.13.3"
val lambdaTest = ivy"de.tototec:de.tobiasroeser.lambdatest:0.8.0"
val log4j2Core = ivy"org.apache.logging.log4j:log4j-core:2.23.1"
val osLib = ivy"com.lihaoyi::os-lib:0.10.2"
val osLib = ivy"com.lihaoyi::os-lib:0.10.2-11-6f1c69"
val pprint = ivy"com.lihaoyi::pprint:0.9.0"
val mainargs = ivy"com.lihaoyi::mainargs:0.7.0"
val millModuledefsVersion = "0.10.9"
Expand Down Expand Up @@ -618,6 +618,7 @@ object main extends MillStableScalaModule with BuildInfo {
)

object api extends MillStableScalaModule with BuildInfo {
def moduleDeps = Seq(client)
def buildInfoPackageName = "mill.api"
def buildInfoMembers = Seq(
BuildInfo.Value("millVersion", millVersion(), "Mill version."),
Expand Down
23 changes: 21 additions & 2 deletions main/api/src/mill/api/SystemStreams.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mill.api

import java.io.{InputStream, PrintStream}
import java.io.{InputStream, OutputStream, PrintStream}
import mill.main.client.InputPumper

/**
* Represents a set of streams that look similar to those provided by the
Expand Down Expand Up @@ -48,6 +49,18 @@ object SystemStreams {

def originalErr: PrintStream = original.err

private class PumpedProcessInput extends os.ProcessInput {
def redirectFrom = ProcessBuilder.Redirect.PIPE
def processInput(processIn: => os.SubProcess.InputStream) = Some(
new InputPumper(() => System.in, () => processIn, true, () => true)
)
}

private class PumpedProcessOutput(dest: OutputStream) extends os.ProcessOutput {
def redirectTo = ProcessBuilder.Redirect.PIPE
def processOutput(processOut: => os.SubProcess.OutputStream) =
Some(new InputPumper(() => processOut, () => dest, false, () => true))
}
def withStreams[T](systemStreams: SystemStreams)(t: => T): T = {
val in = System.in
val out = System.out
Expand All @@ -59,7 +72,13 @@ object SystemStreams {
Console.withIn(systemStreams.in) {
Console.withOut(systemStreams.out) {
Console.withErr(systemStreams.err) {
t
os.Inherit.in.withValue(new PumpedProcessInput) {
os.Inherit.out.withValue(new PumpedProcessOutput(System.out)) {
os.Inherit.err.withValue(new PumpedProcessOutput(System.err)) {
t
}
}
}
}
}
}
Expand Down
21 changes: 13 additions & 8 deletions main/client/src/mill/main/client/InputPumper.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,34 @@

import java.io.InputStream;
import java.io.OutputStream;
import java.util.function.Supplier;

public class InputPumper implements Runnable{
private InputStream src;
private OutputStream dest;
private Supplier<InputStream> src0;
private Supplier<OutputStream> dest0;

private Boolean checkAvailable;
private java.util.function.BooleanSupplier runningCheck;
public InputPumper(InputStream src,
OutputStream dest,
public InputPumper(Supplier<InputStream> src,
Supplier<OutputStream> dest,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making these lazily-initialized is necessary to avoid infinite recursion when initializing the pumper <-> stream <-> subprocess data structure, which is circular

Boolean checkAvailable){
this(src, dest, checkAvailable, () -> true);
}
public InputPumper(InputStream src,
OutputStream dest,
public InputPumper(Supplier<InputStream> src,
Supplier<OutputStream> dest,
Boolean checkAvailable,
java.util.function.BooleanSupplier runningCheck){
this.src = src;
this.dest = dest;
this.src0 = src;
this.dest0 = dest;
this.checkAvailable = checkAvailable;
this.runningCheck = runningCheck;
}

boolean running = true;
public void run() {
InputStream src = src0.get();
OutputStream dest = dest0.get();

byte[] buffer = new byte[1024];
try{
while(running){
Expand Down
2 changes: 1 addition & 1 deletion main/client/src/mill/main/client/MillClientMain.java
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ public static int run(
InputStream outErr = ioSocket.getInputStream();
OutputStream in = ioSocket.getOutputStream();
ProxyStreamPumper outPump = new ProxyStreamPumper(outErr, stdout, stderr);
InputPumper inPump = new InputPumper(stdin, in, true);
InputPumper inPump = new InputPumper(() -> stdin, () -> in, true);
Thread outThread = new Thread(outPump, "outPump");
outThread.setDaemon(true);
Thread inThread = new Thread(inPump, "inPump");
Expand Down
47 changes: 7 additions & 40 deletions main/util/src/mill/util/Jvm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,46 +223,13 @@ object Jvm extends CoursierSupport {
workingDir: os.Path,
backgroundOutputs: Option[Tuple2[ProcessOutput, ProcessOutput]] = None
): SubProcess = {
// If System.in is fake, then we pump output manually rather than relying
// on `os.Inherit`. That is because `os.Inherit` does not follow changes
// to System.in/System.out/System.err, so the subprocess's streams get sent
// to the parent process's origin outputs even if we want to direct them
// elsewhere

if (!SystemStreams.isOriginal()) {
val process = os.proc(commandArgs).spawn(
cwd = workingDir,
env = envArgs,
stdin = if (backgroundOutputs.isEmpty) os.Pipe else "",
stdout = backgroundOutputs.map(_._1).getOrElse(os.Pipe),
stderr = backgroundOutputs.map(_._2).getOrElse(os.Pipe)
)

val sources = Seq(
(process.stdout, System.out, "spawnSubprocess.stdout", false, () => true),
(process.stderr, System.err, "spawnSubprocess.stderr", false, () => true),
(System.in, process.stdin, "spawnSubprocess.stdin", true, () => process.isAlive())
)

for ((std, dest, name, checkAvailable, runningCheck) <- sources) {
val t = new Thread(
new InputPumper(std, dest, checkAvailable, () => runningCheck()),
name
)
t.setDaemon(true)
t.start()
}

process
} else {
os.proc(commandArgs).spawn(
cwd = workingDir,
env = envArgs,
stdin = if (backgroundOutputs.isEmpty) os.Inherit else "",
stdout = backgroundOutputs.map(_._1).getOrElse(os.Inherit),
stderr = backgroundOutputs.map(_._2).getOrElse(os.Inherit)
)
}
os.proc(commandArgs).spawn(
cwd = workingDir,
env = envArgs,
stdin = if (backgroundOutputs.isEmpty) os.Inherit else "",
stdout = backgroundOutputs.map(_._1).getOrElse(os.Inherit),
stderr = backgroundOutputs.map(_._2).getOrElse(os.Inherit)
)
}

def runLocal(
Expand Down
2 changes: 1 addition & 1 deletion runner/src/mill/runner/MillServerMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class Server[T](
val pipedInput = new PipedInputStream()
val pipedOutput = new PipedOutputStream()
pipedOutput.connect(pipedInput)
val pumper = new InputPumper(in, pipedOutput, false)
val pumper = new InputPumper(() => in, () => pipedOutput, false)
val pumperThread = new Thread(pumper, "proxyInputStreamThroughPumper")
pumperThread.setDaemon(true)
pumperThread.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,12 @@ class ScalaJSWorkerImpl extends ScalaJSWorkerApi {

for ((std, dest, name, checkAvailable, runningCheck) <- sources) {
val t = new Thread(
new mill.main.client.InputPumper(std, dest, checkAvailable, () => runningCheck()),
new mill.main.client.InputPumper(
() => std,
() => dest,
checkAvailable,
() => runningCheck()
),
name
)
t.setDaemon(true)
Expand Down
Loading