Skip to content

Commit 28fe6e4

Browse files
Lukasanormanmaurer
authored andcommitted
Support hopping event loops with EventLoopFuture (#177)
Motivation: A somewhat common requirement when working with chains of futures is needing to hop from one event loop to another. This is particularly common when relying on the fact that EventLoopFuture will synchronize with the event loop that created it: you often want to rely on that implicit locking, rather than use Dispatch or Lock to achieve the same effect. While doing this hop requires relatively little code, it's not necessarily totally apparent to new users how they would do it. Additionally, the most naive implementation incurs the overhead of allocations and reference counting in cases where it's not necessary (e.g. when you have only one event loop, or when both work items are being scheduled on the same event loop). For this reason, we should have a nice concise way for a user to request this behaviour and get a relatively performant implementation of the behaviour. Modifications: Added EventLoopFuture<T>.on(eventLoop:). Changed AcceptHandler to use the new method rather than its (slower) alternative. Result: Users will have an easier time working with EventLoopFutures.
1 parent 320561f commit 28fe6e4

File tree

4 files changed

+75
-11
lines changed

4 files changed

+75
-11
lines changed

Sources/NIO/Bootstrap.swift

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -199,11 +199,9 @@ public final class ServerBootstrap {
199199

200200
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
201201
let accepted = self.unwrapInboundIn(data)
202-
let hopEventLoopPromise: EventLoopPromise<()> = ctx.eventLoop.newPromise()
203-
self.childChannelOptions.applyAll(channel: accepted).cascade(promise: hopEventLoopPromise)
204202
let childChannelInit = self.childChannelInit ?? { (_: Channel) in ctx.eventLoop.newSucceededFuture(result: ()) }
205203

206-
hopEventLoopPromise.futureResult.then {
204+
self.childChannelOptions.applyAll(channel: accepted).hopTo(eventLoop: ctx.eventLoop).then {
207205
assert(ctx.eventLoop.inEventLoop)
208206
return childChannelInit(accepted)
209207
}.then { () -> EventLoopFuture<()> in

Sources/NIO/EventLoopFuture.swift

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -654,14 +654,7 @@ extension EventLoopFuture {
654654
return CallbackList()
655655
}
656656

657-
let hopOver: EventLoopFuture<U>
658-
if self.eventLoop === other.eventLoop {
659-
hopOver = other
660-
} else {
661-
let hopOverP = EventLoopPromise<U>(eventLoop: self.eventLoop, file: file, line: line)
662-
other.cascade(promise: hopOverP)
663-
hopOver = hopOverP.futureResult
664-
}
657+
let hopOver = other.hopTo(eventLoop: self.eventLoop)
665658
hopOver._whenComplete { () -> CallbackList in
666659
assert(self.eventLoop.inEventLoop)
667660
switch other.value! {
@@ -796,7 +789,29 @@ extension EventLoopFuture {
796789
p0.succeed(result: ())
797790
return body
798791
}
792+
}
799793

794+
extension EventLoopFuture {
795+
/// Returns an `EventLoopFuture` that fires when this future completes, but executes its callbacks on the
796+
/// target event loop instead of the original one.
797+
///
798+
/// It is common to want to "hop" event loops when you arrange some work: for example, you're closing one channel
799+
/// from another, and want to hop back when the close completes. This method lets you spell that requirement
800+
/// succinctly. It also contains an optimisation for the case when the loop you're hopping *from* is the same as
801+
/// the one you're hopping *to*, allowing you to avoid doing allocations in that case.
802+
///
803+
/// - parameters:
804+
/// - target: The `EventLoop` that the returned `EventLoopFuture` will run on.
805+
/// - returns: An `EventLoopFuture` whose callbacks run on `target` instead of the original loop.
806+
func hopTo(eventLoop target: EventLoop) -> EventLoopFuture<T> {
807+
if target === self.eventLoop {
808+
// We're already on that event loop, nothing to do here. Save an allocation.
809+
return self
810+
}
811+
let hoppingPromise: EventLoopPromise<T> = target.newPromise()
812+
self.cascade(promise: hoppingPromise)
813+
return hoppingPromise.futureResult
814+
}
800815
}
801816

802817
/// Execute the given function and synchronously complete the given `EventLoopPromise` (if not `nil`).

Tests/NIOTests/EventLoopFutureTest+XCTest.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ extension EventLoopFutureTest {
4141
("testEventLoopHoppingAndAll", testEventLoopHoppingAndAll),
4242
("testEventLoopHoppingAndAllWithFailures", testEventLoopHoppingAndAllWithFailures),
4343
("testFutureInVariousScenarios", testFutureInVariousScenarios),
44+
("testLoopHoppingHelperSuccess", testLoopHoppingHelperSuccess),
45+
("testLoopHoppingHelperFailure", testLoopHoppingHelperFailure),
46+
("testLoopHoppingHelperNoHopping", testLoopHoppingHelperNoHopping),
4447
]
4548
}
4649
}

Tests/NIOTests/EventLoopFutureTest.swift

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,4 +381,52 @@ class EventLoopFutureTest : XCTestCase {
381381

382382
XCTAssertNoThrow(try elg.syncShutdownGracefully())
383383
}
384+
385+
func testLoopHoppingHelperSuccess() throws {
386+
let group = MultiThreadedEventLoopGroup(numThreads: 2)
387+
let loop1 = group.next()
388+
let loop2 = group.next()
389+
XCTAssertFalse(loop1 === loop2)
390+
391+
let succeedingPromise: EventLoopPromise<Void> = loop1.newPromise()
392+
let succeedingFuture = succeedingPromise.futureResult.map {
393+
XCTAssertTrue(loop1.inEventLoop)
394+
}.hopTo(eventLoop: loop2).map {
395+
XCTAssertTrue(loop2.inEventLoop)
396+
}
397+
succeedingPromise.succeed(result: ())
398+
XCTAssertNoThrow(try succeedingFuture.wait())
399+
}
400+
401+
func testLoopHoppingHelperFailure() throws {
402+
let group = MultiThreadedEventLoopGroup(numThreads: 2)
403+
let loop1 = group.next()
404+
let loop2 = group.next()
405+
XCTAssertFalse(loop1 === loop2)
406+
407+
let failingPromise: EventLoopPromise<Void> = loop2.newPromise()
408+
let failingFuture = failingPromise.futureResult.thenIfErrorThrowing { error in
409+
XCTAssertEqual(error as? EventLoopFutureTestError, EventLoopFutureTestError.example)
410+
XCTAssertTrue(loop2.inEventLoop)
411+
throw error
412+
}.hopTo(eventLoop: loop1).mapIfError { error in
413+
XCTAssertEqual(error as? EventLoopFutureTestError, EventLoopFutureTestError.example)
414+
XCTAssertTrue(loop1.inEventLoop)
415+
}
416+
417+
failingPromise.fail(error: EventLoopFutureTestError.example)
418+
XCTAssertNoThrow(try failingFuture.wait())
419+
}
420+
421+
func testLoopHoppingHelperNoHopping() throws {
422+
let group = MultiThreadedEventLoopGroup(numThreads: 2)
423+
let loop1 = group.next()
424+
let loop2 = group.next()
425+
XCTAssertFalse(loop1 === loop2)
426+
427+
let noHoppingPromise: EventLoopPromise<Void> = loop1.newPromise()
428+
let noHoppingFuture = noHoppingPromise.futureResult.hopTo(eventLoop: loop1)
429+
XCTAssertTrue(noHoppingFuture === noHoppingPromise.futureResult)
430+
noHoppingPromise.succeed(result: ())
431+
}
384432
}

0 commit comments

Comments
 (0)