diff --git a/tokio/src/runtime/io/driver.rs b/tokio/src/runtime/io/driver.rs index bece3560b72..0f7b1e57acb 100644 --- a/tokio/src/runtime/io/driver.rs +++ b/tokio/src/runtime/io/driver.rs @@ -220,8 +220,17 @@ impl Handle { let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?; let token = scheduled_io.token(); - // TODO: if this returns an err, the `ScheduledIo` leaks... - self.registry.register(source, token, interest.to_mio())?; + // we should remove the `scheduled_io` from the `registrations` set if registering + // the `source` with the OS fails. Otherwise it will leak the `scheduled_io`. + if let Err(e) = self.registry.register(source, token, interest.to_mio()) { + // safety: `scheduled_io` is part of the `registrations` set. + unsafe { + self.registrations + .remove(&mut self.synced.lock(), &scheduled_io) + }; + + return Err(e); + } // TODO: move this logic to `RegistrationSet` and use a `CountedLinkedList` self.metrics.incr_fd_count(); diff --git a/tokio/src/runtime/io/registration_set.rs b/tokio/src/runtime/io/registration_set.rs index 028eb2ecdbe..1a8bd09c310 100644 --- a/tokio/src/runtime/io/registration_set.rs +++ b/tokio/src/runtime/io/registration_set.rs @@ -102,13 +102,21 @@ impl RegistrationSet { } pub(super) fn release(&self, synced: &mut Synced) { - for io in synced.pending_release.drain(..) { + let pending = std::mem::take(&mut synced.pending_release); + + for io in pending { // safety: the registration is part of our list - let _ = unsafe { synced.registrations.remove(io.as_ref().into()) }; + unsafe { self.remove(synced, io.as_ref()) } } self.num_pending_release.store(0, Release); } + + // This function is marked as unsafe, because the caller must make sure that + // `io` is part of the registration set. + pub(super) unsafe fn remove(&self, synced: &mut Synced, io: &ScheduledIo) { + let _ = synced.registrations.remove(io.into()); + } } // Safety: `Arc` pins the inner data