Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up out folder lock logic #3704

Merged
merged 8 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions integration/feature/output-directory/resources/build.mill
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,15 @@ object `package` extends RootModule with ScalaModule {
}

def blockWhileExists(path: os.Path) = Task.Command[String] {
if (!os.exists(path))
os.write(path, Array.emptyByteArray)
while (os.exists(path))
Thread.sleep(100L)
os.write(path, Array.emptyByteArray)

while (os.exists(path)) Thread.sleep(100L)
"Blocking command done"
}

def writeMarker(path: os.Path) = Task.Command[String] {
os.write(path, Array.emptyByteArray)

"Write marker done"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,91 +2,78 @@ package mill.integration

import mill.testkit.UtestIntegrationTestSuite
import utest._
import utest.asserts.{RetryInterval, RetryMax}

import java.io.ByteArrayOutputStream
import java.util.concurrent.{CountDownLatch, Executors}

import scala.concurrent.duration.Duration
import java.util.concurrent.Executors
import scala.concurrent.duration.{Duration, DurationInt}
import scala.concurrent.{Await, ExecutionContext, Future}

object OutputDirectoryLockTests extends UtestIntegrationTestSuite {

private val pool = Executors.newCachedThreadPool()
private val ec = ExecutionContext.fromExecutorService(pool)
private implicit val ec = ExecutionContext.fromExecutorService(pool)

override def utestAfterAll(): Unit = {
pool.shutdown()
}

implicit val retryMax: RetryMax = RetryMax(30000.millis)
implicit val retryInterval: RetryInterval = RetryInterval(50.millis)
def tests: Tests = Tests {
test("basic") - integrationTest { tester =>
import tester._
val signalFile = workspacePath / "do-wait"
System.err.println("Spawning blocking task")
val blocksFuture =
Future(eval(("show", "blockWhileExists", "--path", signalFile), check = true))(ec)
while (!os.exists(signalFile) && !blocksFuture.isCompleted)
Thread.sleep(100L)
if (os.exists(signalFile))
System.err.println("Blocking task is running")
else {
System.err.println("Failed to run blocking task")
Predef.assert(blocksFuture.isCompleted)
blocksFuture.value.get.get
// Kick off blocking task in background
Future {
eval(("show", "blockWhileExists", "--path", signalFile), check = true)
}

// Wait for blocking task to write signal file, to indicate it has begun
eventually { os.exists(signalFile) }

val testCommand: os.Shellable = ("show", "hello")
val testMessage = "Hello from hello task"

System.err.println("Evaluating task without lock")
// --no-build-lock allows commands to complete despite background blocker
val noLockRes = eval(("--no-build-lock", testCommand), check = true)
assert(noLockRes.out.contains(testMessage))

System.err.println("Evaluating task without waiting for lock (should fail)")
// --no-wait-for-build-lock causes commands fail due to background blocker
val noWaitRes = eval(("--no-wait-for-build-lock", testCommand))
assert(noWaitRes.err.contains("Cannot proceed, another Mill process is running tasks"))

System.err.println("Evaluating task waiting for the lock")

val lock = new CountDownLatch(1)
val stderr = new ByteArrayOutputStream
var success = false
assert(
noWaitRes
.err
.contains(
s"Another Mill process is running 'show blockWhileExists --path $signalFile', failing"
)
)

// By default, we wait until the background blocking task completes
val waitingLogFile = workspacePath / "waitingLogFile"
val waitingCompleteFile = workspacePath / "waitingCompleteFile"
val futureWaitingRes = Future {
eval(
testCommand,
stderr = os.ProcessOutput {
val expectedMessage =
"Another Mill process is running tasks, waiting for it to be done..."

(bytes, len) =>
stderr.write(bytes, 0, len)
val output = new String(stderr.toByteArray)
if (output.contains(expectedMessage))
lock.countDown()
},
("show", "writeMarker", "--path", waitingCompleteFile),
stderr = waitingLogFile,
check = true
)
}(ec)
try {
lock.await()
success = true
} finally {
if (!success) {
System.err.println("Waiting task output:")
System.err.write(stderr.toByteArray)
}
}

System.err.println("Task is waiting for the lock, unblocking it")
os.remove(signalFile)

System.err.println("Blocking task should exit")
val blockingRes = Await.result(blocksFuture, Duration.Inf)
assert(blockingRes.out.contains("Blocking command done"))
// Ensure we see the waiting message
eventually {
os.read(waitingLogFile)
.contains(
s"Another Mill process is running 'show blockWhileExists --path $signalFile', waiting for it to be done..."
)
}

System.err.println("Waiting task should be free to proceed")
// Even after task starts waiting on blocking task, it is not complete
assert(!futureWaitingRes.isCompleted)
assert(!os.exists(waitingCompleteFile))
// Terminate blocking task, make sure waiting task now completes
os.remove(signalFile)
val waitingRes = Await.result(futureWaitingRes, Duration.Inf)
assert(waitingRes.out.contains(testMessage))
assert(os.exists(waitingCompleteFile))
assert(waitingRes.out == "\"Write marker done\"")
}
}
}
14 changes: 0 additions & 14 deletions main/api/src/mill/api/Logger.scala
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mill.api

import java.io.{InputStream, PrintStream}
import mill.main.client.lock.{Lock, Locked}

/**
* The standard logging interface of the Mill build tool.
Expand Down Expand Up @@ -85,19 +84,6 @@ trait Logger extends AutoCloseable {
finally removePromptLine()
}

def waitForLock(lock: Lock, waitingAllowed: Boolean): Locked = {
val tryLocked = lock.tryLock()
if (tryLocked.isLocked())
tryLocked
else if (waitingAllowed) {
info("Another Mill process is running tasks, waiting for it to be done...")
lock.lock()
} else {
error("Cannot proceed, another Mill process is running tasks")
throw new Exception("Cannot acquire lock on Mill output directory")
}
}

def withOutStream(outStream: PrintStream): Logger = this
private[mill] def logPrefixKey: Seq[String] = Nil
}
5 changes: 5 additions & 0 deletions main/client/src/mill/main/client/OutFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,9 @@ public class OutFiles {
*/
final public static String millLock = "mill-lock";

/**
* Any active Mill command that is currently run, for debugging purposes
*/
final public static String millActiveCommand = "mill-active-command";

}
22 changes: 0 additions & 22 deletions main/client/src/mill/main/client/lock/DummyLock.java

This file was deleted.

11 changes: 0 additions & 11 deletions main/client/src/mill/main/client/lock/DummyTryLocked.java

This file was deleted.

5 changes: 0 additions & 5 deletions main/client/src/mill/main/client/lock/Lock.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,4 @@ public static Lock file(String path) throws Exception {
public static Lock memory() {
return new MemoryLock();
}

public static Lock dummy() {
return new DummyLock();
}

}
2 changes: 1 addition & 1 deletion runner/src/mill/runner/MillCliConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ case class MillCliConfig(
@arg(
hidden = true,
doc =
"""Do not wait for an exclusive lock on the Mill output directory to evaluate tasks / commands. Fail if waiting for a lock is needed."""
"""Do not wait for an exclusive lock on the Mill output directory to evaluate tasks / commands."""
)
noWaitForBuildLock: Flag = Flag()
)
Expand Down
122 changes: 81 additions & 41 deletions runner/src/mill/runner/MillMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import mill.bsp.{BspContext, BspServerResult}
import mill.main.BuildInfo
import mill.main.client.{OutFiles, ServerFiles}
import mill.main.client.lock.Lock
import mill.util.{PromptLogger, PrintLogger, Colors}
import mill.util.{Colors, PrintLogger, PromptLogger}

import java.lang.reflect.InvocationTargetException
import scala.util.control.NonFatal
Expand Down Expand Up @@ -212,9 +212,7 @@ object MillMain {
.getOrElse(config.leftoverArgs.value.toList)

val out = os.Path(OutFiles.out, WorkspaceRoot.workspaceRoot)
val outLock =
if (config.noBuildLock.value || bspContext.isDefined) Lock.dummy()
else Lock.file((out / OutFiles.millLock).toString)

var repeatForBsp = true
var loopRes: (Boolean, RunnerState) = (false, RunnerState.empty)
while (repeatForBsp) {
Expand All @@ -228,44 +226,46 @@ object MillMain {
evaluate = (prevState: Option[RunnerState]) => {
adjustJvmProperties(userSpecifiedProperties, initialSystemProperties)

val logger = getLogger(
streams,
config,
mainInteractive,
enableTicker =
config.ticker
.orElse(config.enableTicker)
.orElse(Option.when(config.disableTicker.value)(false)),
printLoggerState,
serverDir,
colored = colored,
colors = colors
)
Using.resources(
logger,
logger.waitForLock(
outLock,
waitingAllowed = !config.noWaitForBuildLock.value
withOutLock(
config.noBuildLock.value || bspContext.isDefined,
config.noWaitForBuildLock.value,
out,
targetsAndParams,
streams
) {
val logger = getLogger(
streams,
config,
mainInteractive,
enableTicker =
config.ticker
.orElse(config.enableTicker)
.orElse(Option.when(config.disableTicker.value)(false)),
printLoggerState,
serverDir,
colored = colored,
colors = colors
)
) { (_, _) =>
new MillBuildBootstrap(
projectRoot = WorkspaceRoot.workspaceRoot,
output = out,
home = config.home,
keepGoing = config.keepGoing.value,
imports = config.imports,
env = env,
threadCount = threadCount,
targetsAndParams = targetsAndParams,
prevRunnerState = prevState.getOrElse(stateCache),
logger = logger,
disableCallgraph = config.disableCallgraph.value,
needBuildSc = needBuildSc(config),
requestedMetaLevel = config.metaLevel,
config.allowPositional.value,
systemExit = systemExit,
streams0 = streams0
).evaluate()
Using.resource(logger) { _ =>
try new MillBuildBootstrap(
projectRoot = WorkspaceRoot.workspaceRoot,
output = out,
home = config.home,
keepGoing = config.keepGoing.value,
imports = config.imports,
env = env,
threadCount = threadCount,
targetsAndParams = targetsAndParams,
prevRunnerState = prevState.getOrElse(stateCache),
logger = logger,
disableCallgraph = config.disableCallgraph.value,
needBuildSc = needBuildSc(config),
requestedMetaLevel = config.metaLevel,
config.allowPositional.value,
systemExit = systemExit,
streams0 = streams0
).evaluate()
}
}
},
colors = colors
Expand Down Expand Up @@ -416,4 +416,44 @@ object MillMain {
for (k <- systemPropertiesToUnset) System.clearProperty(k)
for ((k, v) <- desiredProps) System.setProperty(k, v)
}

def withOutLock[T](
noBuildLock: Boolean,
noWaitForBuildLock: Boolean,
out: os.Path,
targetsAndParams: Seq[String],
streams: SystemStreams
)(t: => T): T = {
if (noBuildLock) t
else {
val outLock = Lock.file((out / OutFiles.millLock).toString)

def activeTaskString =
try {
os.read(out / OutFiles.millActiveCommand)
} catch {
case e => "<unknown>"
}

def activeTaskPrefix = s"Another Mill process is running '$activeTaskString',"
Using.resource {
val tryLocked = outLock.tryLock()
if (tryLocked.isLocked()) tryLocked
else if (noWaitForBuildLock) {
throw new Exception(s"$activeTaskPrefix failing")
} else {

streams.err.println(
s"$activeTaskPrefix waiting for it to be done..."
)
outLock.lock()
}
} { _ =>
os.write.over(out / OutFiles.millActiveCommand, targetsAndParams.mkString(" "))
try t
finally os.remove.all(out / OutFiles.millActiveCommand)
}
}
}

}
Loading