Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 8 additions & 12 deletions Sources/NIOCore/AsyncChannel/AsyncChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -145,19 +145,17 @@ public final class NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Senda

@inlinable
@_spi(AsyncChannel)
public static func wrapAsyncChannelWithTransformations<ChannelReadResult: Sendable>(
public static func wrapAsyncChannelWithTransformations(
synchronouslyWrapping channel: Channel,
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isOutboundHalfClosureEnabled: Bool = false,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<ChannelReadResult>,
postFireChannelReadTransformation: @Sendable @escaping (ChannelReadResult) -> EventLoopFuture<Inbound>
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<Inbound>
) throws -> NIOAsyncChannel<Inbound, Outbound> where Outbound == Never {
channel.eventLoop.preconditionInEventLoop()
let (inboundStream, outboundWriter): (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) = try channel._syncAddAsyncHandlersWithTransformations(
backpressureStrategy: backpressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled,
channelReadTransformation: channelReadTransformation,
postFireChannelReadTransformation: postFireChannelReadTransformation
channelReadTransformation: channelReadTransformation
)

outboundWriter.finish()
Expand Down Expand Up @@ -197,21 +195,19 @@ extension Channel {
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@inlinable
@_spi(AsyncChannel)
public func _syncAddAsyncHandlersWithTransformations<ChannelReadResult, PostFireChannelReadResult>(
public func _syncAddAsyncHandlersWithTransformations<ChannelReadResult>(
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
isOutboundHalfClosureEnabled: Bool,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<ChannelReadResult>,
postFireChannelReadTransformation: @Sendable @escaping (ChannelReadResult) -> EventLoopFuture<PostFireChannelReadResult>
) throws -> (NIOAsyncChannelInboundStream<PostFireChannelReadResult>, NIOAsyncChannelOutboundWriter<Never>) {
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<ChannelReadResult>
) throws -> (NIOAsyncChannelInboundStream<ChannelReadResult>, NIOAsyncChannelOutboundWriter<Never>) {
self.eventLoop.assertInEventLoop()

let closeRatchet = CloseRatchet(isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled)
let inboundStream = try NIOAsyncChannelInboundStream<PostFireChannelReadResult>.makeTransformationHandler(
let inboundStream = try NIOAsyncChannelInboundStream<ChannelReadResult>.makeTransformationHandler(
channel: self,
backpressureStrategy: backpressureStrategy,
closeRatchet: closeRatchet,
channelReadTransformation: channelReadTransformation,
postFireChannelReadTransformation: postFireChannelReadTransformation
channelReadTransformation: channelReadTransformation
)
let writer = try NIOAsyncChannelOutboundWriter<Never>(
channel: self,
Expand Down
16 changes: 7 additions & 9 deletions Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
@usableFromInline let _producer: Producer

@inlinable
init<HandlerInbound: Sendable, ReadTransformationResult: Sendable>(
init<HandlerInbound: Sendable>(
channel: Channel,
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
closeRatchet: CloseRatchet,
handler: NIOAsyncChannelInboundStreamChannelHandler<HandlerInbound, ReadTransformationResult, Inbound>
handler: NIOAsyncChannelInboundStreamChannelHandler<HandlerInbound, Inbound>
) throws {
channel.eventLoop.preconditionInEventLoop()
let strategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark
Expand Down Expand Up @@ -58,7 +58,7 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
closeRatchet: CloseRatchet
) throws -> NIOAsyncChannelInboundStream {
let handler = NIOAsyncChannelInboundStreamChannelHandler<Inbound, Inbound, Inbound>.makeHandler(
let handler = NIOAsyncChannelInboundStreamChannelHandler<Inbound, Inbound>.makeHandler(
eventLoop: channel.eventLoop,
closeRatchet: closeRatchet
)
Expand All @@ -73,18 +73,16 @@ public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {

/// Creates a new ``NIOAsyncChannelInboundStream`` which has hooks for transformations.
@inlinable
static func makeTransformationHandler<ChannelReadResult>(
static func makeTransformationHandler(
channel: Channel,
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
closeRatchet: CloseRatchet,
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<ChannelReadResult>,
postFireChannelReadTransformation: @Sendable @escaping (ChannelReadResult) -> EventLoopFuture<Inbound>
channelReadTransformation: @Sendable @escaping (Channel) -> EventLoopFuture<Inbound>
) throws -> NIOAsyncChannelInboundStream {
let handler = NIOAsyncChannelInboundStreamChannelHandler<Channel, ChannelReadResult, Inbound>.makeHandlerWithTransformations(
let handler = NIOAsyncChannelInboundStreamChannelHandler<Channel, Inbound>.makeHandlerWithTransformations(
eventLoop: channel.eventLoop,
closeRatchet: closeRatchet,
channelReadTransformation: channelReadTransformation,
postFireChannelReadTransformation: postFireChannelReadTransformation
channelReadTransformation: channelReadTransformation
)

return try .init(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/// ``Channel`` into an asynchronous sequence that supports back-pressure.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@usableFromInline
internal final class NIOAsyncChannelInboundStreamChannelHandler<InboundIn: Sendable, ReadTransformationResult: Sendable, ProducerElement: Sendable>: ChannelDuplexHandler {
internal final class NIOAsyncChannelInboundStreamChannelHandler<InboundIn: Sendable, ProducerElement: Sendable>: ChannelDuplexHandler {
@usableFromInline
enum _ProducingState {
// Not .stopProducing
Expand Down Expand Up @@ -73,8 +73,7 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler<InboundIn: Senda
/// A synchronous transformation is applied to incoming reads. This is used when sync wrapping a channel.
case syncWrapping((InboundIn) -> ProducerElement)
case transformation(
channelReadTransformation: @Sendable (InboundIn) -> EventLoopFuture<ReadTransformationResult>,
postFireChannelReadTransformation: @Sendable (ReadTransformationResult) -> EventLoopFuture<ProducerElement>
channelReadTransformation: @Sendable (InboundIn) -> EventLoopFuture<ProducerElement>
)
}

Expand Down Expand Up @@ -111,15 +110,13 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler<InboundIn: Senda
static func makeHandlerWithTransformations(
eventLoop: EventLoop,
closeRatchet: CloseRatchet,
channelReadTransformation: @Sendable @escaping (InboundIn) -> EventLoopFuture<ReadTransformationResult>,
postFireChannelReadTransformation: @Sendable @escaping (ReadTransformationResult) -> EventLoopFuture<ProducerElement>
channelReadTransformation: @Sendable @escaping (InboundIn) -> EventLoopFuture<ProducerElement>
) -> NIOAsyncChannelInboundStreamChannelHandler where InboundIn == Channel {
return .init(
eventLoop: eventLoop,
closeRatchet: closeRatchet,
transformation: .transformation(
channelReadTransformation: channelReadTransformation,
postFireChannelReadTransformation: postFireChannelReadTransformation
channelReadTransformation: channelReadTransformation
)
)
}
Expand All @@ -145,22 +142,21 @@ internal final class NIOAsyncChannelInboundStreamChannelHandler<InboundIn: Senda
// We forward on reads here to enable better channel composition.
context.fireChannelRead(data)

case .transformation(let channelReadTransformation, let postFireChannelReadTransformation):
case .transformation(let channelReadTransformation):
// The unsafe transfers here are required because we need to use self in whenComplete
// We are making sure to be on our event loop so we can safely use self in whenComplete
let unsafeSelf = NIOLoopBound(self, eventLoop: context.eventLoop)
let unsafeContext = NIOLoopBound(context, eventLoop: context.eventLoop)
channelReadTransformation(unwrapped)
.hop(to: context.eventLoop)
.flatMap { result -> EventLoopFuture<ProducerElement> in
.map { result -> ProducerElement in
context.eventLoop.preconditionInEventLoop()
// We have to fire through the original data now. Since our channelReadTransformation
// is the channel initializer. Once that's done we need to fire the channel as a read
// so that it hits channelRead0 in the base socket channel.
context.fireChannelRead(data)

return postFireChannelReadTransformation(result)
return result
}
.hop(to: context.eventLoop)
.whenComplete { result in
unsafeSelf.value._transformationCompleted(context: unsafeContext.value, result: result)
}
Expand Down Expand Up @@ -322,7 +318,7 @@ struct NIOAsyncChannelInboundStreamChannelHandlerProducerDelegate: @unchecked Se
let _produceMore: () -> Void

@inlinable
init<InboundIn, ReadTransformationResult, ProducerElement>(handler: NIOAsyncChannelInboundStreamChannelHandler<InboundIn, ReadTransformationResult, ProducerElement>) {
init<InboundIn, ProducerElement>(handler: NIOAsyncChannelInboundStreamChannelHandler<InboundIn, ProducerElement>) {
self.eventLoop = handler.eventLoop
self._didTerminate = handler._didTerminate
self._produceMore = handler._produceMore
Expand Down
58 changes: 53 additions & 5 deletions Sources/NIOCore/ChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,53 @@ extension RemovableChannelHandler {

/// The result of protocol negotiation.
@_spi(AsyncChannel)
public enum NIOProtocolNegotiationResult<NegotiationResult: Sendable> {
/// Indicates that the protocol negotiation finished.
case finished(NegotiationResult)
/// Indicates that protocol negotiation has been deferred to the next handler.
case deferredResult(EventLoopFuture<NIOProtocolNegotiationResult<NegotiationResult>>)
public struct NIOProtocolNegotiationResult<NegotiationResult: Sendable> {
fileprivate enum Result {
/// Indicates that the protocol negotiation finished.
case finished(NegotiationResult)
/// Indicates that protocol negotiation has been deferred to the next handler.
case deferredResult(EventLoopFuture<NIOProtocolNegotiationResult<NegotiationResult>>)
}

private let result: Result

public init(result: NegotiationResult) {
self.result = .finished(result)
}

public init(deferredResult: EventLoopFuture<NIOProtocolNegotiationResult<NegotiationResult>>) {
self.result = .deferredResult(deferredResult)
}

/// Waits for the final protocol negotiation result and returns the value.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
public func waitForFinalResult() async throws -> NegotiationResult {
switch self.result {
case .finished(let negotiationResult):
return negotiationResult
case .deferredResult(let eventLoopFuture):
return try await eventLoopFuture.flatMap { $0.result.resolve(on: eventLoopFuture.eventLoop) }.get()
}
}
}

@_spi(AsyncChannel)
extension NIOProtocolNegotiationResult.Result {
fileprivate func resolve(on eventLoop: EventLoop) -> EventLoopFuture<NegotiationResult> {
Self.resolve(on: eventLoop, result: self)
}

fileprivate static func resolve(on eventLoop: EventLoop, result: Self) -> EventLoopFuture<NegotiationResult> {
switch result {
case .finished(let negotiationResult):
return eventLoop.makeSucceededFuture(negotiationResult)

case .deferredResult(let future):
return future.flatMap { result in
return resolve(on: eventLoop, result: result.result)
}
}
}
}

@_spi(AsyncChannel)
Expand All @@ -359,6 +401,12 @@ extension NIOProtocolNegotiationResult: Equatable where NegotiationResult: Equat
@_spi(AsyncChannel)
extension NIOProtocolNegotiationResult: Sendable where NegotiationResult: Sendable {}

@_spi(AsyncChannel)
extension NIOProtocolNegotiationResult.Result: Equatable where NegotiationResult: Equatable {}

@_spi(AsyncChannel)
extension NIOProtocolNegotiationResult.Result: Sendable where NegotiationResult: Sendable {}

/// A ``ProtocolNegotiationHandler`` is a ``ChannelHandler`` that is responsible for negotiating networking protocols.
///
/// Typically these handlers are at the tail of the pipeline and wait until the peer indicated what protocol should be used. Once, the protocol
Expand Down
Loading