diff --git a/Sources/SKCore/TaskScheduler.swift b/Sources/SKCore/TaskScheduler.swift index fe3637d5a..66ca207fb 100644 --- a/Sources/SKCore/TaskScheduler.swift +++ b/Sources/SKCore/TaskScheduler.swift @@ -13,6 +13,7 @@ import CAtomics import Foundation import LSPLogging +import SKSupport /// See comment on ``TaskDescriptionProtocol/dependencies(to:taskPriority:)`` public enum TaskDependencyAction { @@ -20,6 +21,8 @@ public enum TaskDependencyAction { case cancelAndRescheduleDependency(TaskDescription) } +private let taskSchedulerSubsystem = "org.swift.sourcekit-lsp.task-scheduler" + public protocol TaskDescriptionProtocol: Identifiable, Sendable, CustomLogStringConvertible { /// Execute the task. /// @@ -123,10 +126,6 @@ public actor QueuedTask { /// Every time `execute` gets called, a new task is placed in this continuation. See comment on `executionTask`. private let executionTaskCreatedContinuation: AsyncStream>.Continuation - /// Placing a new value in this continuation will cause `resultTask` to query its priority and set - /// `QueuedTask.priority`. - private let updatePriorityContinuation: AsyncStream.Continuation - nonisolated(unsafe) private var _priority: AtomicUInt8 /// The latest known priority of the task. @@ -187,16 +186,10 @@ public actor QueuedTask { description: TaskDescription, executionStateChangedCallback: (@Sendable (QueuedTask, TaskExecutionState) async -> Void)? ) async { - self._priority = .init(initialValue: priority?.rawValue ?? Task.currentPriority.rawValue) + self._priority = AtomicUInt8(initialValue: priority?.rawValue ?? Task.currentPriority.rawValue) self.description = description self.executionStateChangedCallback = executionStateChangedCallback - var updatePriorityContinuation: AsyncStream.Continuation! - let updatePriorityStream = AsyncStream { - updatePriorityContinuation = $0 - } - self.updatePriorityContinuation = updatePriorityContinuation - var executionTaskCreatedContinuation: AsyncStream>.Continuation! let executionTaskCreatedStream = AsyncStream { executionTaskCreatedContinuation = $0 @@ -205,31 +198,24 @@ public actor QueuedTask { self.resultTask = Task.detached(priority: priority) { await withTaskCancellationHandler { - await withTaskGroup(of: Void.self) { taskGroup in - taskGroup.addTask { - for await _ in updatePriorityStream { - self.priority = Task.currentPriority + await withTaskPriorityChangedHandler(initialPriority: self.priority) { + for await task in executionTaskCreatedStream { + switch await task.valuePropagatingCancellation { + case .cancelledToBeRescheduled: + // Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`. + break + case .terminated: + // The task finished. We are done with this `QueuedTask` + return } } - taskGroup.addTask { - for await task in executionTaskCreatedStream { - switch await task.valuePropagatingCancellation { - case .cancelledToBeRescheduled: - // Break the switch and wait for a new `executionTask` to be placed into `executionTaskCreatedStream`. - break - case .terminated: - // The task finished. We are done with this `QueuedTask` - return - } - } - } - // The first (update priority) task never finishes, so this waits for the second (wait for execution) task - // to terminate. - // Afterwards we also cancel the update priority task. - for await _ in taskGroup { - taskGroup.cancelAll() - return + } taskPriorityChanged: { + withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) { + logger.debug( + "Updating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(Task.currentPriority.rawValue)" + ) } + self.priority = Task.currentPriority } } onCancel: { self.resultTaskCancelled.value = true @@ -282,20 +268,15 @@ public actor QueuedTask { self.executionTask = nil } - /// Trigger `QueuedTask.priority` to be updated with the current priority of the underlying task. - /// - /// This is an asynchronous operation that makes no guarantees when the updated priority will be available. - /// - /// This is needed because tasks can't subscribe to priority updates (ie. there is no `withPriorityHandler` similar to - /// `withCancellationHandler`, https://github.com/apple/swift/issues/73367). - func triggerPriorityUpdate() { - updatePriorityContinuation.yield() - } - /// If the priority of this task is less than `targetPriority`, elevate the priority to `targetPriority` by spawning /// a new task that depends on it. Otherwise a no-op. nonisolated func elevatePriority(to targetPriority: TaskPriority) { if priority < targetPriority { + withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) { + logger.debug( + "Elevating priority of \(self.description.forLogging) from \(self.priority.rawValue) to \(targetPriority.rawValue)" + ) + } Task(priority: targetPriority) { await self.resultTask.value } @@ -385,16 +366,6 @@ public actor TaskScheduler { return queuedTask } - /// Trigger all queued tasks to update their priority. - /// - /// Should be called occasionally to elevate tasks in the queue whose underlying `Swift.Task` had their priority - /// elevated because a higher-priority task started depending on them. - private func triggerPriorityUpdateOfQueuedTasks() async { - for task in pendingTasks { - await task.triggerPriorityUpdate() - } - } - /// Returns the maximum number of concurrent tasks that are allowed to execute at the given priority. private func maxConcurrentTasks(at priority: TaskPriority) -> Int { for (atPriority, maxConcurrentTasks) in maxConcurrentTasksByPriority { @@ -417,9 +388,8 @@ public actor TaskScheduler { { // We don't have any execution slots left. Thus, this poker has nothing to do and is done. // When the next task finishes, it calls `poke` again. - // If the low priority task's priority gets elevated, that will be picked up when the next task in the - // `TaskScheduler` finishes, which causes `triggerPriorityUpdateOfQueuedTasks` to be called, which transfers - // the new elevated priority to `QueuedTask.priority` and which can then be picked up by the next `poke` call. + // If the low priority task's priority gets elevated that task's priority will get elevated and it will be + // picked up on the next `poke` call. return } let dependencies = task.description.dependencies(to: currentlyExecutingTasks.map(\.description)) @@ -428,13 +398,17 @@ public actor TaskScheduler { case .cancelAndRescheduleDependency(let taskDescription): guard let dependency = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id }) else { - logger.fault( - "Cannot find task to wait for \(taskDescription.forLogging) in list of currently executing tasks" - ) + withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) { + logger.fault( + "Cannot find task to wait for \(taskDescription.forLogging) in list of currently executing tasks" + ) + } return nil } if !taskDescription.isIdempotent { - logger.fault("Cannot reschedule task '\(taskDescription.forLogging)' since it is not idempotent") + withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) { + logger.fault("Cannot reschedule task '\(taskDescription.forLogging)' since it is not idempotent") + } return dependency } if dependency.priority > task.priority { @@ -445,9 +419,11 @@ public actor TaskScheduler { case .waitAndElevatePriorityOfDependency(let taskDescription): guard let dependency = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id }) else { - logger.fault( - "Cannot find task to wait for '\(taskDescription.forLogging)' in list of currently executing tasks" - ) + withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) { + logger.fault( + "Cannot find task to wait for '\(taskDescription.forLogging)' in list of currently executing tasks" + ) + } return nil } return dependency @@ -465,9 +441,11 @@ public actor TaskScheduler { switch taskDependency { case .cancelAndRescheduleDependency(let taskDescription): guard let task = self.currentlyExecutingTasks.first(where: { $0.description.id == taskDescription.id }) else { - logger.fault( - "Cannot find task to reschedule \(taskDescription.forLogging) in list of currently executing tasks" - ) + withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) { + logger.fault( + "Cannot find task to reschedule \(taskDescription.forLogging) in list of currently executing tasks" + ) + } return nil } return task @@ -478,6 +456,9 @@ public actor TaskScheduler { if !rescheduleTasks.isEmpty { Task.detached(priority: task.priority) { for task in rescheduleTasks { + withLoggingSubsystemAndScope(subsystem: taskSchedulerSubsystem, scope: nil) { + logger.debug("Suspending \(task.description.forLogging)") + } await task.cancelToBeRescheduled() } } @@ -510,7 +491,6 @@ public actor TaskScheduler { case .terminated: break case .cancelledToBeRescheduled: pendingTasks.append(task) } - await self.triggerPriorityUpdateOfQueuedTasks() self.poke() } } diff --git a/Sources/SKSupport/CMakeLists.txt b/Sources/SKSupport/CMakeLists.txt index 0171ac617..000ca70f8 100644 --- a/Sources/SKSupport/CMakeLists.txt +++ b/Sources/SKSupport/CMakeLists.txt @@ -17,6 +17,7 @@ add_library(SKSupport STATIC Result.swift Sequence+AsyncMap.swift SwitchableProcessResultExitStatus.swift + Task+WithPriorityChangedHandler.swift ThreadSafeBox.swift WorkspaceType.swift ) diff --git a/Sources/SKSupport/Task+WithPriorityChangedHandler.swift b/Sources/SKSupport/Task+WithPriorityChangedHandler.swift new file mode 100644 index 000000000..58367d1e0 --- /dev/null +++ b/Sources/SKSupport/Task+WithPriorityChangedHandler.swift @@ -0,0 +1,60 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the Swift.org open source project +// +// Copyright (c) 2014 - 2024 Apple Inc. and the Swift project authors +// Licensed under Apache License v2.0 with Runtime Library Exception +// +// See https://swift.org/LICENSE.txt for license information +// See https://swift.org/CONTRIBUTORS.txt for the list of Swift project authors +// +//===----------------------------------------------------------------------===// + +/// Runs `operation`. If the task's priority changes while the operation is running, calls `taskPriorityChanged`. +/// +/// Since Swift Concurrency doesn't support direct observation of a task's priority, this polls the task's priority at +/// `pollingInterval`. +/// The function assumes that the original priority of the task is `initialPriority`. If the task priority changed +/// compared to `initialPriority`, the `taskPriorityChanged` will be called. +public func withTaskPriorityChangedHandler( + initialPriority: TaskPriority = Task.currentPriority, + pollingInterval: Duration = .seconds(0.1), + @_inheritActorContext operation: @escaping @Sendable () async -> Void, + taskPriorityChanged: @escaping @Sendable () -> Void +) async { + let lastPriority = ThreadSafeBox(initialValue: initialPriority) + await withTaskGroup(of: Void.self) { taskGroup in + taskGroup.addTask { + while true { + if Task.isCancelled { + return + } + let newPriority = Task.currentPriority + let didChange = lastPriority.withLock { lastPriority in + if newPriority != lastPriority { + lastPriority = newPriority + return true + } + return false + } + if didChange { + taskPriorityChanged() + } + do { + try await Task.sleep(for: pollingInterval) + } catch { + break + } + } + } + taskGroup.addTask { + await operation() + } + // The first task that watches the priority never finishes, so we are effectively await the `operation` task here + // and cancelling the priority observation task once the operation task is done. + // We do need to await the observation task as well so that priority escalation also affects the observation task. + for await _ in taskGroup { + taskGroup.cancelAll() + } + } +} diff --git a/Sources/SKTestSupport/SkipUnless.swift b/Sources/SKTestSupport/SkipUnless.swift index 803b8a507..f1ab51777 100644 --- a/Sources/SKTestSupport/SkipUnless.swift +++ b/Sources/SKTestSupport/SkipUnless.swift @@ -272,6 +272,16 @@ public actor SkipUnless { public static func platformIsDarwin(_ message: String) throws { try XCTSkipUnless(Platform.current == .darwin, message) } + + public static func platformSupportsTaskPriorityElevation() throws { + #if os(macOS) + guard #available(macOS 14.0, *) else { + // Priority elevation was implemented by https://github.com/apple/swift/pull/63019, which is available in the + // Swift 5.9 runtime included in macOS 14.0+ + throw XCTSkip("Priority elevation of tasks is only supported on macOS 14 and above") + } + #endif + } } // MARK: - Parsing Swift compiler version diff --git a/Tests/SKCoreTests/TaskSchedulerTests.swift b/Tests/SKCoreTests/TaskSchedulerTests.swift index 105c32048..d4e62c0f4 100644 --- a/Tests/SKCoreTests/TaskSchedulerTests.swift +++ b/Tests/SKCoreTests/TaskSchedulerTests.swift @@ -10,6 +10,7 @@ // //===----------------------------------------------------------------------===// +import LSPLogging import SKCore import SKTestSupport import XCTest @@ -54,8 +55,7 @@ final class TaskSchedulerTests: XCTestCase { } func testTasksWithElevatedPrioritiesGetExecutedFirst() async throws { - try XCTSkipIf(true, "rdar://128601797") - + try SkipUnless.platformSupportsTaskPriorityElevation() await runTaskScheduler( scheduleTasks: { scheduler, taskExecutionRecorder in for i in 0..<20 { @@ -262,7 +262,9 @@ fileprivate final class ClosureTaskDescription: TaskDescriptionProtocol { } func execute() async { + logger.debug("Starting execution of \(self) with priority \(Task.currentPriority.rawValue)") await closure() + logger.debug("Finished executing \(self) with priority \(Task.currentPriority.rawValue)") } func dependencies(