Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion IntegrationTests/tests_04_performance/Thresholds/5.10.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"encode_1000_ws_frames_new_buffer_with_space": 3050,
"encode_1000_ws_frames_new_buffer_with_space_with_mask": 5050,
"execute_hop_10000_tasks": 0,
"future_assume_isolated_lots_of_callbacks": 92050,
"future_assume_isolated_lots_of_callbacks": 74050,
"future_erase_result": 4050,
"future_lots_of_callbacks": 74050,
"get_100000_headers_canonical_form": 700050,
Expand Down
2 changes: 1 addition & 1 deletion IntegrationTests/tests_04_performance/Thresholds/5.9.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"encode_1000_ws_frames_new_buffer_with_space": 3050,
"encode_1000_ws_frames_new_buffer_with_space_with_mask": 5050,
"execute_hop_10000_tasks": 0,
"future_assume_isolated_lots_of_callbacks": 91050,
"future_assume_isolated_lots_of_callbacks": 74050,
"future_erase_result": 4050,
"future_lots_of_callbacks": 74050,
"get_100000_headers_canonical_form": 700050,
Expand Down
2 changes: 1 addition & 1 deletion IntegrationTests/tests_04_performance/Thresholds/6.0.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"encode_1000_ws_frames_new_buffer_with_space": 3050,
"encode_1000_ws_frames_new_buffer_with_space_with_mask": 5050,
"execute_hop_10000_tasks": 0,
"future_assume_isolated_lots_of_callbacks": 92050,
"future_assume_isolated_lots_of_callbacks": 74050,
"future_erase_result": 4050,
"future_lots_of_callbacks": 74050,
"get_100000_headers_canonical_form": 700050,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"encode_1000_ws_frames_new_buffer_with_space": 3050,
"encode_1000_ws_frames_new_buffer_with_space_with_mask": 5050,
"execute_hop_10000_tasks": 0,
"future_assume_isolated_lots_of_callbacks": 92050,
"future_assume_isolated_lots_of_callbacks": 74050,
"future_erase_result": 4050,
"future_lots_of_callbacks": 74050,
"get_100000_headers_canonical_form": 700050,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
"encode_1000_ws_frames_new_buffer_with_space": 3050,
"encode_1000_ws_frames_new_buffer_with_space_with_mask": 5050,
"execute_hop_10000_tasks": 0,
"future_assume_isolated_lots_of_callbacks": 92050,
"future_assume_isolated_lots_of_callbacks": 74050,
"future_erase_result": 4050,
"future_lots_of_callbacks": 74050,
"get_100000_headers_canonical_form": 500050,
Expand Down
167 changes: 126 additions & 41 deletions Sources/NIOCore/EventLoopFuture+AssumeIsolated.swift
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,25 @@ extension EventLoopFuture {
public func flatMap<NewValue: Sendable>(
_ callback: @escaping (Value) -> EventLoopFuture<NewValue>
) -> EventLoopFuture<NewValue>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.flatMap {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<NewValue>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let t):
let futureU = callback(t)
if futureU.eventLoop.inEventLoop {
return futureU._addCallback {
next._setValue(value: futureU._value!)
}
} else {
futureU.cascade(to: next)
return CallbackList()
}
case .failure(let error):
return next._setValue(value: .failure(error))
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// When the current `EventLoopFuture<Value>` is fulfilled, run the provided callback, which
Expand All @@ -238,10 +253,22 @@ extension EventLoopFuture {
public func flatMapThrowing<NewValue>(
_ callback: @escaping (Value) throws -> NewValue
) -> EventLoopFuture<NewValue>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.flatMapThrowing {
try unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<NewValue>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let t):
do {
let r = try callback(t)
return next._setValue(value: .success(r))
} catch {
return next._setValue(value: .failure(error))
}
case .failure(let e):
return next._setValue(value: .failure(e))
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which
Expand All @@ -263,10 +290,22 @@ extension EventLoopFuture {
public func flatMapErrorThrowing(
_ callback: @escaping (Error) throws -> Value
) -> EventLoopFuture<Value>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.flatMapErrorThrowing {
try unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<Value>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let t):
return next._setValue(value: .success(t))
case .failure(let e):
do {
let r = try callback(e)
return next._setValue(value: .success(r))
} catch {
return next._setValue(value: .failure(error))
}
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// When the current `EventLoopFuture<Value>` is fulfilled, run the provided callback, which
Expand Down Expand Up @@ -300,10 +339,17 @@ extension EventLoopFuture {
public func map<NewValue>(
_ callback: @escaping (Value) -> (NewValue)
) -> EventLoopFuture<NewValue>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.map {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
if NewValue.self == Value.self && NewValue.self == Void.self {
self.whenSuccess(callback as! (Value) -> Void)
return self as! EventLoopFuture<NewValue>.Isolated
} else {
let next = EventLoopPromise<NewValue>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
next._setValue(value: base._value!.map(callback))
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}
}

/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which
Expand All @@ -325,10 +371,25 @@ extension EventLoopFuture {
public func flatMapError(
_ callback: @escaping (Error) -> EventLoopFuture<Value>
) -> EventLoopFuture<Value>.Isolated where Value: Sendable {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.flatMapError {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<Value>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let t):
return next._setValue(value: .success(t))
case .failure(let e):
let t = callback(e)
if t.eventLoop.inEventLoop {
return t._addCallback {
next._setValue(value: t._value!)
}
} else {
t.cascade(to: next)
return CallbackList()
}
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// When the current `EventLoopFuture<Value>` is fulfilled, run the provided callback, which
Expand All @@ -349,10 +410,22 @@ extension EventLoopFuture {
public func flatMapResult<NewValue, SomeError: Error>(
_ body: @escaping (Value) -> Result<NewValue, SomeError>
) -> EventLoopFuture<NewValue>.Isolated {
let unsafeTransfer = UnsafeTransfer(body)
return self._wrapped.flatMapResult {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<NewValue>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let value):
switch body(value) {
case .success(let newValue):
return next._setValue(value: .success(newValue))
case .failure(let error):
return next._setValue(value: .failure(error))
}
case .failure(let e):
return next._setValue(value: .failure(e))
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// When the current `EventLoopFuture<Value>` is in an error state, run the provided callback, which
Expand All @@ -372,10 +445,17 @@ extension EventLoopFuture {
public func recover(
_ callback: @escaping (Error) -> Value
) -> EventLoopFuture<Value>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.recover {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
let next = EventLoopPromise<Value>.makeUnleakablePromise(eventLoop: self._wrapped.eventLoop)
let base = self._wrapped
base._whenCompleteIsolated {
switch base._value! {
case .success(let t):
return next._setValue(value: .success(t))
case .failure(let e):
return next._setValue(value: .success(callback(e)))
}
}
return next.futureResult.assumeIsolatedUnsafeUnchecked()
}

/// Adds an observer callback to this `EventLoopFuture` that is called when the
Expand All @@ -391,9 +471,12 @@ extension EventLoopFuture {
@inlinable
@available(*, noasync)
public func whenSuccess(_ callback: @escaping (Value) -> Void) {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.whenSuccess {
unsafeTransfer.wrappedValue($0)
let base = self._wrapped
base._whenCompleteIsolated {
if case .success(let t) = base._value! {
callback(t)
}
return CallbackList()
}
}

Expand All @@ -410,9 +493,12 @@ extension EventLoopFuture {
@inlinable
@available(*, noasync)
public func whenFailure(_ callback: @escaping (Error) -> Void) {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.whenFailure {
unsafeTransfer.wrappedValue($0)
let base = self._wrapped
base._whenCompleteIsolated {
if case .failure(let e) = base._value! {
callback(e)
}
return CallbackList()
}
}

Expand All @@ -426,9 +512,10 @@ extension EventLoopFuture {
public func whenComplete(
_ callback: @escaping (Result<Value, Error>) -> Void
) {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.whenComplete {
unsafeTransfer.wrappedValue($0)
let base = self._wrapped
base._whenCompleteIsolated {
callback(base._value!)
return CallbackList()
}
}

Expand All @@ -443,10 +530,8 @@ extension EventLoopFuture {
public func always(
_ callback: @escaping (Result<Value, Error>) -> Void
) -> EventLoopFuture<Value>.Isolated {
let unsafeTransfer = UnsafeTransfer(callback)
return self._wrapped.always {
unsafeTransfer.wrappedValue($0)
}.assumeIsolatedUnsafeUnchecked()
self.whenComplete { result in callback(result) }
return self
}

/// Unwrap an `EventLoopFuture` where its type parameter is an `Optional`.
Expand Down
20 changes: 15 additions & 5 deletions Sources/NIOCore/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ import Dispatch
/// In particular, note that _run() here continues to obtain and execute lists of callbacks until it completes.
/// This eliminates recursion when processing `flatMap()` chains.
@usableFromInline
internal struct CallbackList: Sendable {
internal struct CallbackList {
@usableFromInline
internal typealias Element = @Sendable () -> CallbackList
internal typealias Element = () -> CallbackList
@usableFromInline
internal var firstCallback: Optional<Element>
@usableFromInline
Expand Down Expand Up @@ -115,6 +115,9 @@ internal struct CallbackList: Sendable {
}
}

@available(*, unavailable)
extension CallbackList: Sendable {}

/// Internal error for operations that return results that were not replaced
@usableFromInline
internal struct OperationPlaceholderError: Error {
Expand Down Expand Up @@ -779,7 +782,7 @@ extension EventLoopFuture {

/// Add a callback. If there's already a value, invoke it and return the resulting list of new callback functions.
@inlinable
internal func _addCallback(_ callback: @escaping @Sendable () -> CallbackList) -> CallbackList {
internal func _addCallback(_ callback: @escaping () -> CallbackList) -> CallbackList {
self.eventLoop.assertInEventLoop()
if self._value == nil {
self._callbacks.append(callback)
Expand All @@ -800,14 +803,21 @@ extension EventLoopFuture {
@inlinable
internal func _internalWhenComplete(_ callback: @escaping @Sendable () -> CallbackList) {
if self.eventLoop.inEventLoop {
self._addCallback(callback)._run()
self._whenCompleteIsolated(callback)
} else {
self.eventLoop.execute {
self._addCallback(callback)._run()
self._whenCompleteIsolated(callback)
}
}
}

/// Add a callback. If there's already a value, run as much of the chain as we can.
@inlinable
internal func _whenCompleteIsolated(_ callback: @escaping () -> CallbackList) {
self.eventLoop.assertInEventLoop()
self._addCallback(callback)._run()
}

/// Adds an observer callback to this `EventLoopFuture` that is called when the
/// `EventLoopFuture` has a success result.
///
Expand Down
Loading