Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 21 additions & 32 deletions Sources/Basics/Concurrency/AsyncProcess.swift
Original file line number Diff line number Diff line change
Expand Up @@ -257,29 +257,26 @@ package final class AsyncProcess {
#endif

/// Typealias for stdout/stderr output closure.
package typealias OutputClosure = ([UInt8]) -> Void
package typealias OutputClosure = @Sendable ([UInt8]) -> Void

/// Typealias for logging handling closure
package typealias LoggingHandler = (String) -> Void
/// Typealias for logging handling closure.
package typealias LoggingHandler = @Sendable (String) -> Void

private static var _loggingHandler: LoggingHandler?
private static let loggingHandlerLock = NSLock()
/// Global logging handler storage.
private static let _loggingHandler = ThreadSafeBox<LoggingHandler?>()

/// Global logging handler. Use with care! preferably use instance level instead of setting one globally.
@available(
*,
deprecated,
message: "use instance level `loggingHandler` passed via `init` instead of setting one globally."
)
package static var loggingHandler: LoggingHandler? {
get {
Self.loggingHandlerLock.withLock {
self._loggingHandler
}
} set {
Self.loggingHandlerLock.withLock {
self._loggingHandler = newValue
}
return _loggingHandler.get() ?? nil
}
@available(
*,
deprecated,
message: "use instance level `loggingHandler` passed via `init` instead of setting one globally."
)
set {
_loggingHandler.put(newValue)
}
}

Expand Down Expand Up @@ -330,8 +327,7 @@ package final class AsyncProcess {
///
/// Key: Executable name or path.
/// Value: Path to the executable, if found.
private static var validatedExecutablesMap = [String: AbsolutePath?]()
private static let validatedExecutablesMapLock = NSLock()
private static let validatedExecutablesMap = ThreadSafeKeyValueStore<String, AbsolutePath?>()

/// Create a new process instance.
///
Expand Down Expand Up @@ -451,14 +447,7 @@ package final class AsyncProcess {
}
// This should cover the most common cases, i.e. when the cache is most helpful.
if workingDirectory == localFileSystem.currentWorkingDirectory {
return AsyncProcess.validatedExecutablesMapLock.withLock {
if let value = AsyncProcess.validatedExecutablesMap[program] {
return value
}
let value = lookup()
AsyncProcess.validatedExecutablesMap[program] = value
return value
}
return AsyncProcess.validatedExecutablesMap.memoize(program, body: lookup)
} else {
return lookup()
}
Expand Down Expand Up @@ -812,17 +801,17 @@ package final class AsyncProcess {
package func waitUntilExit() throws -> AsyncProcessResult {
let group = DispatchGroup()
group.enter()
var processResult: Result<AsyncProcessResult, Swift.Error>?
let resultBox = ThreadSafeBox<Result<AsyncProcessResult, Swift.Error>>()
self.waitUntilExit { result in
processResult = result
resultBox.put(result)
group.leave()
}
group.wait()
return try processResult.unsafelyUnwrapped.get()
return try resultBox.get().unsafelyUnwrapped.get()
}

/// Executes the process I/O state machine, calling completion block when finished.
private func waitUntilExit(_ completion: @escaping (Result<AsyncProcessResult, Swift.Error>) -> Void) {
private func waitUntilExit(_ completion: @Sendable @escaping (Result<AsyncProcessResult, Swift.Error>) -> Void) {
self.stateLock.lock()
switch self.state {
case .idle:
Expand Down Expand Up @@ -1099,7 +1088,7 @@ extension AsyncProcess {
environment: Environment = .current,
loggingHandler: LoggingHandler? = .none,
queue: DispatchQueue? = nil,
completion: @escaping (Result<AsyncProcessResult, Swift.Error>) -> Void
completion: @Sendable @escaping (Result<AsyncProcessResult, Swift.Error>) -> Void
) {
let completionQueue = queue ?? Self.sharedCompletionQueue

Expand Down
13 changes: 6 additions & 7 deletions Sources/Commands/SwiftTestCommand.swift
Original file line number Diff line number Diff line change
Expand Up @@ -980,7 +980,7 @@ final class TestRunner {

/// Executes and returns execution status. Prints test output on standard streams if requested
/// - Returns: Result of spawning and running the test process, and the output stream result
func test(outputHandler: @escaping (String) -> Void) -> Result {
func test(outputHandler: @escaping @Sendable (String) -> Void) -> Result {
var results = [Result]()
for path in self.bundlePaths {
let testSuccess = self.test(at: path, outputHandler: outputHandler)
Expand Down Expand Up @@ -1027,11 +1027,11 @@ final class TestRunner {
return args
}

private func test(at path: AbsolutePath, outputHandler: @escaping (String) -> Void) -> Result {
private func test(at path: AbsolutePath, outputHandler: @escaping @Sendable (String) -> Void) -> Result {
let testObservabilityScope = self.observabilityScope.makeChildScope(description: "running test at \(path)")

do {
let outputHandler = { (bytes: [UInt8]) in
let outputHandler: @Sendable ([UInt8]) -> Void = { (bytes: [UInt8]) in
if let output = String(bytes: bytes, encoding: .utf8) {
outputHandler(output)
}
Expand Down Expand Up @@ -1214,17 +1214,16 @@ final class ParallelTestRunner {
observabilityScope: self.observabilityScope,
library: .xctest // swift-testing does not use ParallelTestRunner
)
var output = ""
let outputLock = NSLock()
let output = ThreadSafeBox<String>("")
let start = DispatchTime.now()
let result = testRunner.test(outputHandler: { _output in outputLock.withLock{ output += _output }})
let result = testRunner.test(outputHandler: { _output in output.append(_output) })
let duration = start.distance(to: .now())
if result == .failure {
self.ranSuccessfully = false
}
self.finishedTests.enqueue(TestResult(
unitTest: test,
output: output,
output: output.get() ?? "",
success: result != .failure,
duration: duration
))
Expand Down
47 changes: 29 additions & 18 deletions Tests/BasicsTests/AsyncProcessTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ final class AsyncProcessTests: XCTestCase {
let args = ["whoami"]
let answer = NSUserName()
#endif
var popenResult: Result<AsyncProcessResult, Error>?
let popenResult = ThreadSafeBox<Result<AsyncProcessResult, Error>>()
let group = DispatchGroup()
group.enter()
AsyncProcess.popen(arguments: args) { result in
popenResult = result
popenResult.put(result)
group.leave()
}
group.wait()
switch popenResult {
switch popenResult.get() {
case .success(let processResult):
let output = try processResult.utf8Output()
XCTAssertTrue(output.hasPrefix(answer))
Expand Down Expand Up @@ -242,9 +242,11 @@ final class AsyncProcessTests: XCTestCase {
}

func testStdin() throws {
var stdout = [UInt8]()
let stdout = ThreadSafeBox<[UInt8]>([])
let process = AsyncProcess(scriptName: "in-to-out\(ProcessInfo.batSuffix)", outputRedirection: .stream(stdout: { stdoutBytes in
stdout += stdoutBytes
stdout.mutate {
$0?.append(contentsOf: stdoutBytes)
}
}, stderr: { _ in }))
let stdinStream = try process.launch()

Expand All @@ -255,7 +257,7 @@ final class AsyncProcessTests: XCTestCase {

try process.waitUntilExit()

XCTAssertEqual(String(decoding: stdout, as: UTF8.self), "hello\(ProcessInfo.EOL)")
XCTAssertEqual(String(decoding: stdout.get(default: []), as: UTF8.self), "hello\(ProcessInfo.EOL)")
}

func testStdoutStdErr() throws {
Expand Down Expand Up @@ -352,28 +354,37 @@ final class AsyncProcessTests: XCTestCase {
}

func testStdoutStdErrStreaming() throws {
var stdout = [UInt8]()
var stderr = [UInt8]()
let stdout = ThreadSafeBox<[UInt8]>([])
let stderr = ThreadSafeBox<[UInt8]>([])
let process = AsyncProcess(scriptName: "long-stdout-stderr\(ProcessInfo.batSuffix)", outputRedirection: .stream(stdout: { stdoutBytes in
stdout += stdoutBytes
stdout.mutate {
$0?.append(contentsOf: stdoutBytes)
}
}, stderr: { stderrBytes in
stderr += stderrBytes
stderr.mutate {
$0?.append(contentsOf: stderrBytes)
}
}))
try process.launch()
try process.waitUntilExit()

let count = 16 * 1024
XCTAssertEqual(String(bytes: stdout, encoding: .utf8), String(repeating: "1", count: count))
XCTAssertEqual(String(bytes: stderr, encoding: .utf8), String(repeating: "2", count: count))
XCTAssertEqual(String(bytes: stdout.get(default: []), encoding: .utf8), String(repeating: "1", count: count))
XCTAssertEqual(String(bytes: stderr.get(default: []), encoding: .utf8), String(repeating: "2", count: count))
}

func testStdoutStdErrStreamingRedirected() throws {
var stdout = [UInt8]()
var stderr = [UInt8]()
let stdout = ThreadSafeBox<[UInt8]>([])
let stderr = ThreadSafeBox<[UInt8]>([])

let process = AsyncProcess(scriptName: "long-stdout-stderr\(ProcessInfo.batSuffix)", outputRedirection: .stream(stdout: { stdoutBytes in
stdout += stdoutBytes
stdout.mutate {
$0?.append(contentsOf: stdoutBytes)
}
}, stderr: { stderrBytes in
stderr += stderrBytes
stderr.mutate {
$0?.append(contentsOf: stderrBytes)
}
}, redirectStderr: true))
try process.launch()
try process.waitUntilExit()
Expand All @@ -386,8 +397,8 @@ final class AsyncProcessTests: XCTestCase {
let expectedStdout = String(repeating: "12", count: count)
let expectedStderr = ""
#endif
XCTAssertEqual(String(bytes: stdout, encoding: .utf8), expectedStdout)
XCTAssertEqual(String(bytes: stderr, encoding: .utf8), expectedStderr)
XCTAssertEqual(String(bytes: stdout.get(default: []), encoding: .utf8), expectedStdout)
XCTAssertEqual(String(bytes: stderr.get(default: []), encoding: .utf8), expectedStderr)
}

func testWorkingDirectory() throws {
Expand Down
2 changes: 1 addition & 1 deletion Tests/BasicsTests/CancellatorTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ class ProcessStartedSemaphore {
self.term = term
}

func handleOutput(_ bytes: [UInt8]) {
@Sendable func handleOutput(_ bytes: [UInt8]) {
self.lock.withLock {
guard !self.trapped else {
return
Expand Down
2 changes: 1 addition & 1 deletion Tests/CommandsTests/RunCommandTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ struct RunCommandTests {
self.sync = sync
}

func handle(bytes: [UInt8]) {
@Sendable func handle(bytes: [UInt8]) {
guard let output = String(bytes: bytes, encoding: .utf8) else {
return
}
Expand Down