Skip to content

Commit

Permalink
PlatformFileOps: read/write files asynchronously
Browse files Browse the repository at this point in the history
Previously, we simply wrapped a Future around a blocking file I/O both
on JVM and Native; while we still keep this approach on Native, let's
implement actual asynchronous file I/O on JVM, just like on JS.

Also, modify runners to use this async writing and a different thread
pool for that, so that it does not clash with formatting tasks.
  • Loading branch information
kitbellew committed Feb 16, 2025
1 parent 27a5191 commit 7a3419a
Show file tree
Hide file tree
Showing 12 changed files with 198 additions and 53 deletions.
3 changes: 0 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ inThisBuild {
crossScalaVersions := List(scala213, scala212),
resolvers ++= Resolver.sonatypeOssRepos("releases"),
resolvers ++= Resolver.sonatypeOssRepos("snapshots"),
resolvers +=
"Sonatype Releases"
.at("https://oss.sonatype.org/content/repositories/releases"),
testFrameworks += new TestFramework("munit.Framework"),
// causes native image issues
dependencyOverrides += "org.jline" % "jline" % "3.29.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.scalafmt.dynamic.ScalafmtDynamicError
import org.scalafmt.interfaces.Scalafmt
import org.scalafmt.interfaces.ScalafmtSession
import org.scalafmt.sysops.PlatformFileOps
import org.scalafmt.sysops.PlatformRunOps

import java.nio.file.Path

Expand Down Expand Up @@ -52,10 +53,11 @@ object ScalafmtDynamicRunner extends ScalafmtRunner {
inputMethod: InputMethod,
session: ScalafmtSession,
options: CliOptions,
): Future[ExitCode] = inputMethod.readInput(options).map { input =>
val formatResult = session.format(inputMethod.path, input)
inputMethod.write(formatResult, input, options)
}
): Future[ExitCode] = inputMethod.readInput(options)
.map(code => code -> session.format(inputMethod.path, code))
.flatMap { case (code, formattedCode) =>
inputMethod.write(formattedCode, code, options)
}(PlatformRunOps.ioExecutionContext)

private def getFileMatcher(paths: Seq[Path]): Path => Boolean = {
val dirBuilder = Seq.newBuilder[Path]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class CliOptions(
quiet: Boolean = false,
stdIn: Boolean = false,
noStdErr: Boolean = false,
error: Boolean = false,
private val error: Boolean = false,
check: Boolean = false,
) {
val writeMode: WriteMode = writeModeOpt.getOrElse(WriteMode.Override)
Expand Down Expand Up @@ -205,4 +205,7 @@ case class CliOptions(
*/
private[cli] def getVersionOpt: Option[String] = getHoconValueOpt(_.version)

private[cli] def exitCodeOnChange =
if (error) ExitCode.TestError else ExitCode.Ok

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.scalafmt.sysops.AbsoluteFile
import org.scalafmt.sysops.FileOps
import org.scalafmt.sysops.PlatformCompat
import org.scalafmt.sysops.PlatformFileOps
import org.scalafmt.sysops.PlatformRunOps

import java.io.InputStream
import java.nio.file.Path
Expand All @@ -20,28 +21,30 @@ sealed abstract class InputMethod {

protected def print(text: String, options: CliOptions): Unit
protected def list(options: CliOptions): Unit
protected def overwrite(text: String, options: CliOptions): Unit
protected def overwrite(text: String, options: CliOptions): Future[Unit]

final def write(
formatted: String,
original: String,
options: CliOptions,
): ExitCode = {
val codeChanged = formatted != original
if (options.writeMode == WriteMode.Stdout) print(formatted, options)
else if (codeChanged) options.writeMode match {
): Future[ExitCode] = {
def codeChanged = formatted != original
def exitCode = if (codeChanged) options.exitCodeOnChange else ExitCode.Ok
options.writeMode match {
case WriteMode.Stdout => print(formatted, options); exitCode.future
case _ if !codeChanged => ExitCode.Ok.future
case WriteMode.List => list(options); options.exitCodeOnChange.future
case WriteMode.Override => overwrite(formatted, options)
.map(_ => options.exitCodeOnChange)(PlatformRunOps.ioExecutionContext)
case WriteMode.Test =>
val pathStr = path.toString
val diff = InputMethod.unifiedDiff(pathStr, original, formatted)
val msg =
if (diff.nonEmpty) diff
else s"--- +$pathStr\n => modified line endings only"
throw MisformattedFile(path, msg)
case WriteMode.Override => overwrite(formatted, options)
case WriteMode.List => list(options)
case _ =>
Future.failed(MisformattedFile(path, msg))
case _ => options.exitCodeOnChange.future
}
if (options.error && codeChanged) ExitCode.TestError else ExitCode.Ok
}

}
Expand All @@ -61,8 +64,10 @@ object InputMethod {
override protected def print(text: String, options: CliOptions): Unit =
options.common.out.print(text)

override protected def overwrite(text: String, options: CliOptions): Unit =
print(text, options)
override protected def overwrite(
text: String,
options: CliOptions,
): Future[Unit] = Future.successful(print(text, options))

override protected def list(options: CliOptions): Unit = options.common.out
.println(filename)
Expand All @@ -76,8 +81,11 @@ object InputMethod {
override protected def print(text: String, options: CliOptions): Unit =
options.common.out.print(text)

override protected def overwrite(text: String, options: CliOptions): Unit =
file.writeFile(text)(options.encoding)
override protected def overwrite(
text: String,
options: CliOptions,
): Future[Unit] = PlatformFileOps
.writeFileAsync(file.path, text)(options.encoding)

override protected def list(options: CliOptions): Unit = options.common.out
.println(PlatformCompat.relativize(options.cwd, file))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.scalafmt.Versions
import org.scalafmt.config.ProjectFiles
import org.scalafmt.config.ScalafmtConfig
import org.scalafmt.config.ScalafmtConfigException
import org.scalafmt.sysops.PlatformRunOps

import scala.meta.parsers.ParseException
import scala.meta.tokenizers.TokenizeException
Expand Down Expand Up @@ -62,24 +63,32 @@ object ScalafmtCoreRunner extends ScalafmtRunner {
inputMethod: InputMethod,
options: CliOptions,
scalafmtConfig: ScalafmtConfig,
): Future[ExitCode] = inputMethod.readInput(options).map { input =>
val filename = inputMethod.path.toString
): Future[ExitCode] = {
val path = inputMethod.path.toString
@tailrec
def handleError(e: Throwable): ExitCode = e match {
case Error.WithCode(e, _) => handleError(e)
case _: ParseException | _: TokenizeException =>
options.common.err.println(e.toString)
ExitCode.ParseError
case e =>
new FailedToFormat(filename, e).printStackTrace(options.common.err)
new FailedToFormat(path, e).printStackTrace(options.common.err)
ExitCode.UnexpectedError
}
Scalafmt.formatCode(input, scalafmtConfig, options.range, filename)
.formatted match {
case Formatted.Success(x) => inputMethod.write(x, input, options)
case x: Formatted.Failure =>
if (scalafmtConfig.runner.ignoreWarnings) ExitCode.Ok // do nothing
else handleError(x.e)
}
inputMethod.readInput(options).map { code =>
val res = Scalafmt.formatCode(code, scalafmtConfig, options.range, path)
res.formatted match {
case x: Formatted.Success => Right(code -> x.formattedCode)
case x: Formatted.Failure => Left(
if (res.config.runner.ignoreWarnings) ExitCode.Ok // do nothing
else handleError(x.e),
)
}
}.flatMap {
case Right((code, formattedCode)) => inputMethod
.write(formattedCode, code, options)
case Left(exitCode) => exitCode.future
}(PlatformRunOps.ioExecutionContext)
}

}
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
package org.scalafmt.sysops

//import org.scalafmt.CompatCollections.JavaConverters._

import scala.meta.internal.io._

import java.nio.file._

import scala.concurrent.Future
import scala.concurrent.Promise
import scala.io.Codec
import scala.scalajs.js
import scala.util.Try
Expand Down Expand Up @@ -77,19 +74,18 @@ object PlatformFileOps {
def readFile(path: Path)(implicit codec: Codec): String = JSFs
.readFileSync(path.toString, codec.name)

def readFileAsync(file: Path)(implicit codec: Codec): Future[String] = {
val promise = Promise[String]()
def cb(err: js.Error, res: String): Unit =
if (err == null) promise.trySuccess(res)
else promise.tryFailure(new RuntimeException(err.message))
JSFs.readFile(file.toString, codec.name, cb)
promise.future
}
def readFileAsync(file: Path)(implicit codec: Codec): Future[String] =
JSFsPromises.readFile(file.toString, codec.name).toFuture

def readStdinAsync: Future[String] = JSIO.readStdinAsync

def writeFile(path: Path, content: String)(implicit codec: Codec): Unit = JSFs
.writeFileSync(path.toString, content, codec.name)

def writeFileAsync(file: Path, data: String)(implicit
codec: Codec,
): Future[Unit] = JSFsPromises.writeFile(file.toString, data, codec.name)
.toFuture

def cwd() = js.Dynamic.global.process.cwd().asInstanceOf[String]
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ private[scalafmt] object PlatformRunOps {
implicit def executionContext: ExecutionContext =
scala.scalajs.concurrent.JSExecutionContext.Implicits.queue

def ioExecutionContext: ExecutionContext = executionContext

def getSingleThreadExecutionContext: ExecutionContext = executionContext // same one

def runArgv(cmd: Seq[String], cwd: Option[Path]): Try[String] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ object PlatformFileOps {
def readFile(file: Path)(implicit codec: Codec): String =
new String(Files.readAllBytes(file), codec.charSet)

def readFileAsync(file: Path)(implicit codec: Codec): Future[String] = Future
.successful(readFile(file))
def readFileAsync(file: Path)(implicit codec: Codec): Future[String] =
GranularPlatformAsyncOps.readFileAsync(file)

def readStdinAsync: Future[String] = Future
.successful(FileOps.readInputStream(System.in))
Expand All @@ -65,5 +65,9 @@ object PlatformFileOps {
Files.write(path, bytes)
}

def writeFileAsync(path: Path, content: String)(implicit
codec: Codec,
): Future[Unit] = GranularPlatformAsyncOps.writeFileAsync(path, content)

def cwd() = System.getProperty("user.dir")
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ private[scalafmt] object PlatformRunOps {

implicit def executionContext: ExecutionContext = ExecutionContext.global

def ioExecutionContext: ExecutionContext =
GranularPlatformAsyncOps.ioExecutionContext

def getSingleThreadExecutionContext: ExecutionContext = ExecutionContext
.fromExecutor(Executors.newSingleThreadExecutor())

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package org.scalafmt.sysops

import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.nio.channels.AsynchronousFileChannel
import java.nio.channels.CompletionHandler
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.util.concurrent.Executors

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.io.Codec
import scala.util.Try

private[sysops] object GranularPlatformAsyncOps {

implicit val ioExecutionContext: ExecutionContext = ExecutionContext
.fromExecutor(Executors.newCachedThreadPool())

def readFileAsync(path: Path)(implicit codec: Codec): Future[String] = {
val promise = Promise[String]()

val buf = new Array[Byte](1024)
val bbuf = ByteBuffer.wrap(buf)
val os = new ByteArrayOutputStream()

Try {
val channel = AsynchronousFileChannel.open(path, StandardOpenOption.READ)
val handler = new CompletionHandler[Integer, AnyRef] {
override def completed(result: Integer, unused: AnyRef): Unit = {
val count = result.intValue()
if (count < 0) {
promise.trySuccess(os.toString(codec.charSet.name()))
channel.close()
} else {
if (count > 0) {
os.write(buf, 0, count)
bbuf.clear()
}
channel.read(bbuf, os.size(), null, this)
}
}
override def failed(exc: Throwable, unused: AnyRef): Unit = {
promise.tryFailure(exc)
channel.close()
}
}

channel.read(bbuf, 0, null, handler)
}.failed.foreach(promise.tryFailure)

promise.future
}

def writeFileAsync(path: Path, content: String)(implicit
codec: Codec,
): Future[Unit] = {
val promise = Promise[Unit]()
val buf = ByteBuffer.wrap(content.getBytes(codec.charSet))

Try {
val channel = AsynchronousFileChannel.open(
path,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING,
)

val handler = new CompletionHandler[Integer, AnyRef] {
override def completed(result: Integer, attachment: AnyRef): Unit =
if (buf.hasRemaining) channel.write(buf, buf.position(), null, this)
else {
promise.trySuccess(())
channel.close()
}

override def failed(exc: Throwable, attachment: AnyRef): Unit = {
promise.tryFailure(exc)
channel.close()
}
}

channel.write(buf, 0L, null, handler)
}.failed.foreach(promise.tryFailure)

promise.future
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package org.scalafmt.sysops

import java.nio.file.Files
import java.nio.file.Path

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.io.Codec

private[sysops] object GranularPlatformAsyncOps {

import PlatformRunOps.executionContext

def ioExecutionContext: ExecutionContext = executionContext

def readFileAsync(path: Path)(implicit codec: Codec): Future[String] =
Future(PlatformFileOps.readFile(path))

def writeFileAsync(path: Path, content: String)(implicit
codec: Codec,
): Future[Unit] = Future {
val bytes = content.getBytes(codec.charSet)
Files.write(path, bytes)
}

}
Loading

0 comments on commit 7a3419a

Please sign in to comment.