-
Notifications
You must be signed in to change notification settings - Fork 10.6k
[Concurrency] Prevent task re-enqueues in continuation APIs #84944
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
47c5e8b
15a67b0
4a479b7
1c27ba9
3f6beb3
93b3fa1
9d71804
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -889,8 +889,7 @@ internal func _resumeUnsafeThrowingContinuationWithError<T>( | |
| @available(SwiftStdlib 5.1, *) | ||
| @_alwaysEmitIntoClient | ||
| @unsafe | ||
| public func withUnsafeContinuation<T>( | ||
| isolation: isolated (any Actor)? = #isolation, | ||
| public nonisolated(nonsending) func withUnsafeContinuation<T>( | ||
| _ fn: (UnsafeContinuation<T, Never>) -> Void | ||
| ) async -> sending T { | ||
| return await Builtin.withUnsafeContinuation { | ||
|
|
@@ -926,8 +925,7 @@ public func withUnsafeContinuation<T>( | |
| @available(SwiftStdlib 5.1, *) | ||
| @_alwaysEmitIntoClient | ||
| @unsafe | ||
| public func withUnsafeThrowingContinuation<T>( | ||
| isolation: isolated (any Actor)? = #isolation, | ||
| public nonisolated(nonsending) func withUnsafeThrowingContinuation<T>( | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I hope we can get away with this "technically" source break... Let's see what the source compat suite says.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ktoso in practice why is the behavior different after this change? since these functions already had explicit isolated parameters that should have inherited the actor context, why would moving to implicits one cause things to function differently? |
||
| _ fn: (UnsafeContinuation<T, Error>) -> Void | ||
| ) async throws -> sending T { | ||
| return try await Builtin.withUnsafeThrowingContinuation { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,160 @@ | ||
| // RUN: %target-run-simple-swift( -target %target-swift-5.1-abi-triple %import-libdispatch) | %FileCheck %s | ||
| // RUN: %target-run-simple-swift( -O -target %target-swift-5.1-abi-triple %import-libdispatch) | %FileCheck %s | ||
| // REQUIRES: concurrency | ||
| // REQUIRES: executable_test | ||
|
|
||
| // REQUIRES: concurrency_runtime | ||
| // UNSUPPORTED: back_deployment_runtime | ||
| // UNSUPPORTED: freestanding | ||
| // REQUIRES: libdispatch | ||
| // REQUIRES: synchronization | ||
|
|
||
| import Synchronization | ||
|
|
||
|
|
||
| if #available(SwiftStdlib 6.0, *) { | ||
| print("=== foo() async") | ||
| print("---------------------------------------") | ||
| await foo() | ||
| } | ||
|
|
||
| // CHECK: === foo() async | ||
| // CHECK-NEXT: --------------------------------------- | ||
| // We hop to the task executor: | ||
| // CHECK-NEXT: [executor][task-executor] Enqueue (1) | ||
|
|
||
| // CHECK-NEXT: foo - withTaskExecutorPreference | ||
|
|
||
| // CHECK: foo - withTaskExecutorPreference - withCheckedContinuation | ||
| // CHECK-NEXT: foo - withTaskExecutorPreference - withCheckedContinuation done | ||
|
|
||
| // CHECK: foo - withTaskExecutorPreference - withUnsafeContinuation | ||
| // CHECK-NEXT: foo - withTaskExecutorPreference - withUnsafeContinuation done | ||
|
|
||
| // CHECK: foo - withTaskExecutorPreference - withCheckedThrowingContinuation | ||
| // CHECK-NEXT: foo - withTaskExecutorPreference - withCheckedThrowingContinuation done | ||
|
|
||
| // CHECK: foo - withTaskExecutorPreference - withUnsafeThrowingContinuation | ||
| // CHECK-NEXT: foo - withTaskExecutorPreference - withUnsafeThrowingContinuation done | ||
|
|
||
| // By checking that this is the second enqueue here, | ||
| // we check that there was no stray enqueues between with... invocations: | ||
| // CHECK-NEXT: [executor][task-executor] Enqueue (2) | ||
|
|
||
| // CHECK-NEXT: foo - withTaskExecutorPreference done | ||
|
|
||
| // CHECK-NEXT: == Make: actor Foo | ||
| // CHECK-NEXT: --------------------------------------- | ||
| // CHECK-NEXT: [executor][actor-executor] Enqueue (1) | ||
| // CHECK-NEXT: actor.foo | ||
|
|
||
| // CHECK: actor.foo - withCheckedContinuation | ||
| // CHECK-NEXT: actor.foo - withCheckedContinuation done | ||
|
|
||
| // CHECK: actor.foo - withUnsafeContinuation | ||
| // CHECK-NEXT: actor.foo - withUnsafeContinuation done | ||
|
|
||
| // CHECK: actor.foo - withCheckedThrowingContinuation | ||
| // CHECK-NEXT: actor.foo - withCheckedThrowingContinuation done | ||
|
|
||
| // CHECK: actor.foo - withUnsafeThrowingContinuation | ||
| // CHECK-NEXT: actor.foo - withUnsafeThrowingContinuation done | ||
| // CHECK-NEXT: actor.foo done | ||
|
|
||
| // No more enqueues are expected afterwards | ||
| // CHECK-NOT: [executor] | ||
|
|
||
| @available(SwiftStdlib 6.0, *) | ||
| @concurrent | ||
| func foo() async { | ||
| await withTaskExecutorPreference(EnqueueCountExecutor(name: "task-executor")) { | ||
| print("foo - withTaskExecutorPreference") | ||
| await withCheckedContinuation { cont in | ||
| print("foo - withTaskExecutorPreference - withCheckedContinuation") | ||
| cont.resume() | ||
| } | ||
| print("foo - withTaskExecutorPreference - withCheckedContinuation done") | ||
|
|
||
| await withUnsafeContinuation { cont in | ||
| print("foo - withTaskExecutorPreference - withUnsafeContinuation") | ||
| cont.resume() | ||
| } | ||
| print("foo - withTaskExecutorPreference - withUnsafeContinuation done") | ||
|
|
||
| try! await withCheckedThrowingContinuation { cont in | ||
| print("foo - withTaskExecutorPreference - withCheckedThrowingContinuation") | ||
| cont.resume() | ||
| } | ||
| print("foo - withTaskExecutorPreference - withCheckedThrowingContinuation done") | ||
|
|
||
| try! await withUnsafeThrowingContinuation { cont in | ||
| print("foo - withTaskExecutorPreference - withUnsafeThrowingContinuation") | ||
| cont.resume() | ||
| } | ||
| print("foo - withTaskExecutorPreference - withUnsafeThrowingContinuation done") | ||
| } | ||
| print("foo - withTaskExecutorPreference done") | ||
|
|
||
| print("== Make: actor Foo") | ||
| print("---------------------------------------") | ||
| await Foo().foo() | ||
| } | ||
|
|
||
| @available(SwiftStdlib 6.0, *) | ||
| actor Foo { | ||
| let exec = EnqueueCountExecutor(name: "actor-executor") | ||
|
|
||
| nonisolated var unownedExecutor: UnownedSerialExecutor { | ||
| self.exec.asUnownedSerialExecutor() | ||
| } | ||
|
|
||
| func foo() async { | ||
| print("actor.foo") | ||
|
|
||
| await withCheckedContinuation { cont in | ||
| print("actor.foo - withCheckedContinuation") | ||
| cont.resume() | ||
| } | ||
| print("actor.foo - withCheckedContinuation done") | ||
|
|
||
| await withUnsafeContinuation { cont in | ||
| print("actor.foo - withUnsafeContinuation") | ||
| cont.resume() | ||
| } | ||
| print("actor.foo - withUnsafeContinuation done") | ||
|
|
||
| try! await withCheckedThrowingContinuation { cont in | ||
| print("actor.foo - withCheckedThrowingContinuation") | ||
| cont.resume() | ||
| } | ||
| print("actor.foo - withCheckedThrowingContinuation done") | ||
|
|
||
| try! await withUnsafeThrowingContinuation { cont in | ||
| print("actor.foo - withUnsafeThrowingContinuation") | ||
| cont.resume() | ||
| } | ||
| print("actor.foo - withUnsafeThrowingContinuation done") | ||
|
|
||
| print("actor.foo done") | ||
| } | ||
| } | ||
|
|
||
| @available(SwiftStdlib 6.0, *) | ||
| final class EnqueueCountExecutor: TaskExecutor, SerialExecutor { | ||
| let enqueueCount: Atomic<Int> | ||
|
|
||
| let name: String | ||
|
|
||
| init(name: String) { | ||
| self.enqueueCount = .init(0) | ||
| self.name = name | ||
| } | ||
|
|
||
| public func enqueue(_ job: consuming ExecutorJob) { | ||
| let newEnqueueValue = self.enqueueCount.add(1, ordering: .relaxed).newValue | ||
| print("[executor][\(self.name)] Enqueue (\(newEnqueueValue))") | ||
| job.runSynchronously(on: self.asUnownedSerialExecutor()) | ||
| } | ||
| } | ||
|
|
||
| print("done") // CHECK: done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'the implicit' what? (here and below)