Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Sources/NIOPosix/Bootstrap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2600,6 +2600,7 @@ extension NIOPipeBootstrap {
let pipeChannelOutput: SelectablePipeHandle?
let hasNoInputPipe: Bool
let hasNoOutputPipe: Bool
let bootstrapChannelInitializer = self.channelInitializer
do {
if let input = input {
try self.validateFileDescriptorIsNotAFile(input)
Expand Down Expand Up @@ -2632,6 +2633,13 @@ extension NIOPipeBootstrap {
func setupChannel() -> EventLoopFuture<ChannelInitializerResult> {
eventLoop.assertInEventLoop()
return channelOptions.applyAllChannelOptions(to: channel).flatMap {
if let bootstrapChannelInitializer {
bootstrapChannelInitializer(channel)
} else {
channel.eventLoop.makeSucceededVoidFuture()
}
}
.flatMap {
_ -> EventLoopFuture<ChannelInitializerResult> in
channelInitializer(channel)
}.flatMap { result in
Expand Down
216 changes: 216 additions & 0 deletions Tests/NIOPosixTests/AsyncChannelBootstrapTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1033,6 +1033,222 @@ final class AsyncChannelBootstrapTests: XCTestCase {
}
}

func testPipeBootstrap_callsChannelInitializer() async throws {
let eventLoopGroup = self.group!
let (pipe1ReadFD, pipe1WriteFD, pipe2ReadFD, pipe2WriteFD) = self.makePipeFileDescriptors()
let channel: NIOAsyncChannel<ByteBuffer, ByteBuffer>
let toChannel: NIOAsyncChannel<Never, ByteBuffer>
let fromChannel: NIOAsyncChannel<ByteBuffer, Never>
let didCallChannelInitializer = NIOLockedValueBox(0)

do {
channel = try await NIOPipeBootstrap(group: eventLoopGroup)
.channelInitializer { channel in
didCallChannelInitializer.withLockedValue { $0 += 1 }
return channel.eventLoop.makeSucceededVoidFuture()
}
.takingOwnershipOfDescriptors(
input: pipe1ReadFD,
output: pipe2WriteFD
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(wrappingChannelSynchronously: channel)
}
}
} catch {
for fileDescriptor in [pipe1ReadFD, pipe1WriteFD, pipe2ReadFD, pipe2WriteFD] {
try SystemCalls.close(descriptor: fileDescriptor)
}
throw error
}

do {
toChannel = try await NIOPipeBootstrap(group: eventLoopGroup)
.channelInitializer { channel in
didCallChannelInitializer.withLockedValue { $0 += 1 }
return channel.eventLoop.makeSucceededVoidFuture()
}
.takingOwnershipOfDescriptor(
output: pipe1WriteFD
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(wrappingChannelSynchronously: channel)
}
}
} catch {
for fileDescriptor in [pipe1WriteFD, pipe2ReadFD] {
try SystemCalls.close(descriptor: fileDescriptor)
}
throw error
}

do {
fromChannel = try await NIOPipeBootstrap(group: eventLoopGroup)
.channelInitializer { channel in
didCallChannelInitializer.withLockedValue { $0 += 1 }
return channel.eventLoop.makeSucceededVoidFuture()
}
.takingOwnershipOfDescriptor(
input: pipe2ReadFD
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(wrappingChannelSynchronously: channel)
}
}
} catch {
for fileDescriptor in [pipe2ReadFD] {
try SystemCalls.close(descriptor: fileDescriptor)
}
throw error
}

try await channel.executeThenClose { channelInbound, channelOutbound in
try await fromChannel.executeThenClose { fromChannelInbound, _ in
try await toChannel.executeThenClose { _, toChannelOutbound in
var inboundIterator = channelInbound.makeAsyncIterator()
var fromChannelInboundIterator = fromChannelInbound.makeAsyncIterator()

try await toChannelOutbound.write(.init(string: "Request"))
try await XCTAsyncAssertEqual(try await inboundIterator.next(), ByteBuffer(string: "Request"))

let response = ByteBuffer(string: "Response")
try await channelOutbound.write(response)
try await XCTAsyncAssertEqual(try await fromChannelInboundIterator.next(), response)
}
}
}

XCTAssertEqual(didCallChannelInitializer.withLockedValue { $0 }, 3)
}

func testPipeBootstrap_whenInputNil_callsChannelInitializer() async throws {
let eventLoopGroup = self.group!
let (pipe1ReadFD, pipe1WriteFD) = self.makePipeFileDescriptors()
let channel: NIOAsyncChannel<ByteBuffer, ByteBuffer>
let fromChannel: NIOAsyncChannel<ByteBuffer, Never>
let didCallChannelInitializer = NIOLockedValueBox(0)

do {
channel = try await NIOPipeBootstrap(group: eventLoopGroup)
.channelInitializer { channel in
didCallChannelInitializer.withLockedValue { $0 += 1 }
return channel.eventLoop.makeSucceededVoidFuture()
}
.takingOwnershipOfDescriptor(
output: pipe1WriteFD
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(wrappingChannelSynchronously: channel)
}
}
} catch {
for fileDescriptor in [pipe1ReadFD, pipe1WriteFD] {
try SystemCalls.close(descriptor: fileDescriptor)
}
throw error
}

do {
fromChannel = try await NIOPipeBootstrap(group: eventLoopGroup)
.channelInitializer { channel in
didCallChannelInitializer.withLockedValue { $0 += 1 }
return channel.eventLoop.makeSucceededVoidFuture()
}
.takingOwnershipOfDescriptor(
input: pipe1ReadFD
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(wrappingChannelSynchronously: channel)
}
}
} catch {
for fileDescriptor in [pipe1WriteFD] {
try SystemCalls.close(descriptor: fileDescriptor)
}
throw error
}

try await channel.executeThenClose { channelInbound, channelOutbound in
try await fromChannel.executeThenClose { fromChannelInbound, _ in
var inboundIterator = channelInbound.makeAsyncIterator()
var fromChannelInboundIterator = fromChannelInbound.makeAsyncIterator()

try await XCTAsyncAssertEqual(try await inboundIterator.next(), nil)

let response = ByteBuffer(string: "Response")
try await channelOutbound.write(response)
try await XCTAsyncAssertEqual(try await fromChannelInboundIterator.next(), response)
}
}

XCTAssertEqual(didCallChannelInitializer.withLockedValue { $0 }, 2)
}

func testPipeBootstrap_whenOutputNil_callsChannelInitializer() async throws {
let eventLoopGroup = self.group!
let (pipe1ReadFD, pipe1WriteFD) = self.makePipeFileDescriptors()
let channel: NIOAsyncChannel<ByteBuffer, ByteBuffer>
let toChannel: NIOAsyncChannel<Never, ByteBuffer>
let didCallChannelInitializer = NIOLockedValueBox(0)

do {
channel = try await NIOPipeBootstrap(group: eventLoopGroup)
.channelInitializer { channel in
didCallChannelInitializer.withLockedValue { $0 += 1 }
return channel.eventLoop.makeSucceededVoidFuture()
}
.takingOwnershipOfDescriptor(
input: pipe1ReadFD
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(wrappingChannelSynchronously: channel)
}
}
} catch {
for fileDescriptor in [pipe1ReadFD, pipe1WriteFD] {
try SystemCalls.close(descriptor: fileDescriptor)
}

throw error
}

do {
toChannel = try await NIOPipeBootstrap(group: eventLoopGroup)
.channelInitializer { channel in
didCallChannelInitializer.withLockedValue { $0 += 1 }
return channel.eventLoop.makeSucceededVoidFuture()
}
.takingOwnershipOfDescriptor(
output: pipe1WriteFD
) { channel in
channel.eventLoop.makeCompletedFuture {
try NIOAsyncChannel(wrappingChannelSynchronously: channel)
}
}
} catch {
for fileDescriptor in [pipe1WriteFD] {
try SystemCalls.close(descriptor: fileDescriptor)
}
throw error
}

try await channel.executeThenClose { channelInbound, channelOutbound in
try await toChannel.executeThenClose { _, toChannelOutbound in
var inboundIterator = channelInbound.makeAsyncIterator()

try await toChannelOutbound.write(.init(string: "Request"))
try await XCTAsyncAssertEqual(try await inboundIterator.next(), ByteBuffer(string: "Request"))

let response = ByteBuffer(string: "Response")
await XCTAsyncAssertThrowsError(try await channelOutbound.write(response)) { error in
XCTAssertEqual(error as? NIOAsyncWriterError, .alreadyFinished())
}
}
}

XCTAssertEqual(didCallChannelInitializer.withLockedValue { $0 }, 2)
}

// MARK: RawSocket bootstrap

func testRawSocketBootstrap() async throws {
Expand Down