Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 47 additions & 67 deletions Sources/SKCore/TaskScheduler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@
import CAtomics
import Foundation
import LSPLogging
import SKSupport

/// See comment on ``TaskDescriptionProtocol/dependencies(to:taskPriority:)``
public enum TaskDependencyAction<TaskDescription: TaskDescriptionProtocol> {
case waitAndElevatePriorityOfDependency(TaskDescription)
case cancelAndRescheduleDependency(TaskDescription)
}

private let taskSchedulerSubsystem = "org.swift.sourcekit-lsp.task-scheduler"

public protocol TaskDescriptionProtocol: Identifiable, Sendable, CustomLogStringConvertible {
/// Execute the task.
///
Expand Down Expand Up @@ -123,10 +126,6 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
/// Every time `execute` gets called, a new task is placed in this continuation. See comment on `executionTask`.
private let executionTaskCreatedContinuation: AsyncStream<Task<ExecutionTaskFinishStatus, Never>>.Continuation

/// Placing a new value in this continuation will cause `resultTask` to query its priority and set
/// `QueuedTask.priority`.
private let updatePriorityContinuation: AsyncStream<Void>.Continuation

nonisolated(unsafe) private var _priority: AtomicUInt8

/// The latest known priority of the task.
Expand Down Expand Up @@ -187,16 +186,10 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
description: TaskDescription,
executionStateChangedCallback: (@Sendable (QueuedTask, TaskExecutionState) async -> Void)?
) async {
self._priority = .init(initialValue: priority?.rawValue ?? Task.currentPriority.rawValue)
self._priority = AtomicUInt8(initialValue: priority?.rawValue ?? Task.currentPriority.rawValue)
self.description = description
self.executionStateChangedCallback = executionStateChangedCallback

var updatePriorityContinuation: AsyncStream<Void>.Continuation!
let updatePriorityStream = AsyncStream {
updatePriorityContinuation = $0
}
self.updatePriorityContinuation = updatePriorityContinuation

var executionTaskCreatedContinuation: AsyncStream<Task<ExecutionTaskFinishStatus, Never>>.Continuation!
let executionTaskCreatedStream = AsyncStream {
executionTaskCreatedContinuation = $0
Expand All @@ -205,31 +198,24 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {

self.resultTask = Task.detached(priority: priority) {
await withTaskCancellationHandler {
await withTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
for await _ in updatePriorityStream {
self.priority = Task.currentPriority
await withTaskPriorityChangedHandler(initialPriority: self.priority) {
for await task in executionTaskCreatedStream {
switch await task.valuePropagatingCancellation {
case .cancelledToBeRescheduled:
// Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`.
break
case .terminated:
// The task finished. We are done with this `QueuedTask`
return
}
}
taskGroup.addTask {
for await task in executionTaskCreatedStream {
switch await task.valuePropagatingCancellation {
case .cancelledToBeRescheduled:
// Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`.
break
case .terminated:
// The task finished. We are done with this `QueuedTask`
return
}
}
}
// The first (update priority) task never finishes, so this waits for the second (wait for execution) task
// to terminate.
// Afterwards we also cancel the update priority task.
for await _ in taskGroup {
taskGroup.cancelAll()
return
} taskPriorityChanged: {
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
logger.debug(
"Updating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(Task.currentPriority.rawValue)"
)
}
self.priority = Task.currentPriority
}
} onCancel: {
self.resultTaskCancelled.value = true
Expand Down Expand Up @@ -282,20 +268,15 @@ public actor QueuedTask<TaskDescription: TaskDescriptionProtocol> {
self.executionTask = nil
}

/// Trigger `QueuedTask.priority` to be updated with the current priority of the underlying task.
///
/// This is an asynchronous operation that makes no guarantees when the updated priority will be available.
///
/// This is needed because tasks can't subscribe to priority updates (ie. there is no `withPriorityHandler` similar to
/// `withCancellationHandler`, https://github.com/apple/swift/issues/73367).
func triggerPriorityUpdate() {
updatePriorityContinuation.yield()
}

/// If the priority of this task is less than `targetPriority`, elevate the priority to `targetPriority` by spawning
/// a new task that depends on it. Otherwise a no-op.
nonisolated func elevatePriority(to targetPriority: TaskPriority) {
if priority < targetPriority {
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
logger.debug(
"Elevating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(targetPriority.rawValue)"
)
}
Task(priority: targetPriority) {
await self.resultTask.value
}
Expand Down Expand Up @@ -385,16 +366,6 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
return queuedTask
}

/// Trigger all queued tasks to update their priority.
///
/// Should be called occasionally to elevate tasks in the queue whose underlying `Swift.Task` had their priority
/// elevated because a higher-priority task started depending on them.
private func triggerPriorityUpdateOfQueuedTasks() async {
for task in pendingTasks {
await task.triggerPriorityUpdate()
}
}

/// Returns the maximum number of concurrent tasks that are allowed to execute at the given priority.
private func maxConcurrentTasks(at priority: TaskPriority) -> Int {
for (atPriority, maxConcurrentTasks) in maxConcurrentTasksByPriority {
Expand All @@ -417,9 +388,8 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
{
// We don't have any execution slots left. Thus, this poker has nothing to do and is done.
// When the next task finishes, it calls `poke` again.
// If the low priority task's priority gets elevated, that will be picked up when the next task in the
// `TaskScheduler` finishes, which causes `triggerPriorityUpdateOfQueuedTasks` to be called, which transfers
// the new elevated priority to `QueuedTask.priority` and which can then be picked up by the next `poke` call.
// If the low priority task's priority gets elevated that task's priority will get elevated and it will be
// picked up on the next `poke` call.
return
}
let dependencies = task.description.dependencies(to: currentlyExecutingTasks.map(\.description))
Expand All @@ -428,13 +398,17 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
case .cancelAndRescheduleDependency(let taskDescription):
guard let dependency = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id })
else {
logger.fault(
"Cannot find task to wait for \(taskDescription.forLogging) in list of currently executing tasks"
)
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
logger.fault(
"Cannot find task to wait for \(taskDescription.forLogging) in list of currently executing tasks"
)
}
return nil
}
if !taskDescription.isIdempotent {
logger.fault("Cannot reschedule task '\(taskDescription.forLogging)' since it is not idempotent")
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
logger.fault("Cannot reschedule task '\(taskDescription.forLogging)' since it is not idempotent")
}
return dependency
}
if dependency.priority > task.priority {
Expand All @@ -445,9 +419,11 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
case .waitAndElevatePriorityOfDependency(let taskDescription):
guard let dependency = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id })
else {
logger.fault(
"Cannot find task to wait for '\(taskDescription.forLogging)' in list of currently executing tasks"
)
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
logger.fault(
"Cannot find task to wait for '\(taskDescription.forLogging)' in list of currently executing tasks"
)
}
return nil
}
return dependency
Expand All @@ -465,9 +441,11 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
switch taskDependency {
case .cancelAndRescheduleDependency(let taskDescription):
guard let task = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id }) else {
logger.fault(
"Cannot find task to reschedule \(taskDescription.forLogging) in list of currently executing tasks"
)
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
logger.fault(
"Cannot find task to reschedule \(taskDescription.forLogging) in list of currently executing tasks"
)
}
return nil
}
return task
Expand All @@ -478,6 +456,9 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
if !rescheduleTasks.isEmpty {
Task.detached(priority: task.priority) {
for task in rescheduleTasks {
withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) {
logger.debug("Suspending \(task.description.forLogging)")
}
await task.cancelToBeRescheduled()
}
}
Expand Down Expand Up @@ -510,7 +491,6 @@ public actor TaskScheduler<TaskDescription: TaskDescriptionProtocol> {
case .terminated: break
case .cancelledToBeRescheduled: pendingTasks.append(task)
}
await self.triggerPriorityUpdateOfQueuedTasks()
self.poke()
}
}
Expand Down
1 change: 1 addition & 0 deletions Sources/SKSupport/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ add_library(SKSupport STATIC
Result.swift
Sequence+AsyncMap.swift
SwitchableProcessResultExitStatus.swift
Task+WithPriorityChangedHandler.swift
ThreadSafeBox.swift
WorkspaceType.swift
)
Expand Down
60 changes: 60 additions & 0 deletions Sources/SKSupport/Task+WithPriorityChangedHandler.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the Swift.org open source project
//
// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors
// Licensed under Apache License v2.0 with Runtime Library Exception
//
// See https://swift.org/LICENSE.txt for license information
// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors
//
//===----------------------------------------------------------------------===//

/// Runs `operation`. If the task's priority changes while the operation is running, calls `taskPriorityChanged`.
///
/// Since Swift Concurrency doesn't support direct observation of a task's priority, this polls the task's priority at
/// `pollingInterval`.
/// The function assumes that the original priority of the task is `initialPriority`. If the task priority changed
/// compared to `initialPriority`, the `taskPriorityChanged` will be called.
public func withTaskPriorityChangedHandler(
initialPriority: TaskPriority = Task.currentPriority,
pollingInterval: Duration = .seconds(0.1),
@_inheritActorContext operation: @escaping @Sendable () async -> Void,
taskPriorityChanged: @escaping @Sendable () -> Void
) async {
let lastPriority = ThreadSafeBox(initialValue: initialPriority)
await withTaskGroup(of: Void.self) { taskGroup in
taskGroup.addTask {
while true {
if Task.isCancelled {
return
}
let newPriority = Task.currentPriority
let didChange = lastPriority.withLock { lastPriority in
if newPriority != lastPriority {
lastPriority = newPriority
return true
}
return false
}
if didChange {
taskPriorityChanged()
}
do {
try await Task.sleep(for: pollingInterval)
} catch {
break
}
}
}
taskGroup.addTask {
await operation()
}
// The first task that watches the priority never finishes, so we are effectively await the `operation` task here
// and cancelling the priority observation task once the operation task is done.
// We do need to await the observation task as well so that priority escalation also affects the observation task.
for await _ in taskGroup {
taskGroup.cancelAll()
}
}
}
10 changes: 10 additions & 0 deletions Sources/SKTestSupport/SkipUnless.swift
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ public actor SkipUnless {
public static func platformIsDarwin(_ message: String) throws {
try XCTSkipUnless(Platform.current == .darwin, message)
}

public static func platformSupportsTaskPriorityElevation() throws {
#if os(macOS)
guard #available(macOS 14.0, *) else {
// Priority elevation was implemented by https://github.com/apple/swift/pull/63019, which is available in the
// Swift 5.9 runtime included in macOS 14.0+
throw XCTSkip("Priority elevation of tasks is only supported on macOS 14 and above")
}
#endif
}
}

// MARK: - Parsing Swift compiler version
Expand Down
6 changes: 4 additions & 2 deletions Tests/SKCoreTests/TaskSchedulerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
//
//===----------------------------------------------------------------------===//

import LSPLogging
import SKCore
import SKTestSupport
import XCTest
Expand Down Expand Up @@ -54,8 +55,7 @@ final class TaskSchedulerTests: XCTestCase {
}

func testTasksWithElevatedPrioritiesGetExecutedFirst() async throws {
try XCTSkipIf(true, "rdar://128601797")

try SkipUnless.platformSupportsTaskPriorityElevation()
await runTaskScheduler(
scheduleTasks: { scheduler, taskExecutionRecorder in
for i in 0..<20 {
Expand Down Expand Up @@ -262,7 +262,9 @@ fileprivate final class ClosureTaskDescription: TaskDescriptionProtocol {
}

func execute() async {
logger.debug("Starting execution of \(self) with priority \(Task.currentPriority.rawValue)")
await closure()
logger.debug("Finished executing \(self) with priority \(Task.currentPriority.rawValue)")
}

func dependencies(
Expand Down