Skip to content

Commit b2896fe

Browse files
authored
io: use Mutex instead of spinlock (#6403)
1 parent 3d0d0fd commit b2896fe

File tree

1 file changed

+19
-53
lines changed

1 file changed

+19
-53
lines changed

tokio/src/io/split.rs

+19-53
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@
66
77
use crate::io::{AsyncRead, AsyncWrite, ReadBuf};
88

9-
use std::cell::UnsafeCell;
109
use std::fmt;
1110
use std::io;
1211
use std::pin::Pin;
13-
use std::sync::atomic::AtomicBool;
14-
use std::sync::atomic::Ordering::{Acquire, Release};
1512
use std::sync::Arc;
13+
use std::sync::Mutex;
1614
use std::task::{Context, Poll};
1715

1816
cfg_io_util! {
@@ -38,8 +36,7 @@ cfg_io_util! {
3836
let is_write_vectored = stream.is_write_vectored();
3937

4038
let inner = Arc::new(Inner {
41-
locked: AtomicBool::new(false),
42-
stream: UnsafeCell::new(stream),
39+
stream: Mutex::new(stream),
4340
is_write_vectored,
4441
});
4542

@@ -54,13 +51,19 @@ cfg_io_util! {
5451
}
5552

5653
struct Inner<T> {
57-
locked: AtomicBool,
58-
stream: UnsafeCell<T>,
54+
stream: Mutex<T>,
5955
is_write_vectored: bool,
6056
}
6157

62-
struct Guard<'a, T> {
63-
inner: &'a Inner<T>,
58+
impl<T> Inner<T> {
59+
fn with_lock<R>(&self, f: impl FnOnce(Pin<&mut T>) -> R) -> R {
60+
let mut guard = self.stream.lock().unwrap();
61+
62+
// safety: we do not move the stream.
63+
let stream = unsafe { Pin::new_unchecked(&mut *guard) };
64+
65+
f(stream)
66+
}
6467
}
6568

6669
impl<T> ReadHalf<T> {
@@ -90,7 +93,7 @@ impl<T> ReadHalf<T> {
9093
.ok()
9194
.expect("`Arc::try_unwrap` failed");
9295

93-
inner.stream.into_inner()
96+
inner.stream.into_inner().unwrap()
9497
} else {
9598
panic!("Unrelated `split::Write` passed to `split::Read::unsplit`.")
9699
}
@@ -111,8 +114,7 @@ impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
111114
cx: &mut Context<'_>,
112115
buf: &mut ReadBuf<'_>,
113116
) -> Poll<io::Result<()>> {
114-
let mut inner = ready!(self.inner.poll_lock(cx));
115-
inner.stream_pin().poll_read(cx, buf)
117+
self.inner.with_lock(|stream| stream.poll_read(cx, buf))
116118
}
117119
}
118120

@@ -122,67 +124,31 @@ impl<T: AsyncWrite> AsyncWrite for WriteHalf<T> {
122124
cx: &mut Context<'_>,
123125
buf: &[u8],
124126
) -> Poll<Result<usize, io::Error>> {
125-
let mut inner = ready!(self.inner.poll_lock(cx));
126-
inner.stream_pin().poll_write(cx, buf)
127+
self.inner.with_lock(|stream| stream.poll_write(cx, buf))
127128
}
128129

129130
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
130-
let mut inner = ready!(self.inner.poll_lock(cx));
131-
inner.stream_pin().poll_flush(cx)
131+
self.inner.with_lock(|stream| stream.poll_flush(cx))
132132
}
133133

134134
fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
135-
let mut inner = ready!(self.inner.poll_lock(cx));
136-
inner.stream_pin().poll_shutdown(cx)
135+
self.inner.with_lock(|stream| stream.poll_shutdown(cx))
137136
}
138137

139138
fn poll_write_vectored(
140139
self: Pin<&mut Self>,
141140
cx: &mut Context<'_>,
142141
bufs: &[io::IoSlice<'_>],
143142
) -> Poll<Result<usize, io::Error>> {
144-
let mut inner = ready!(self.inner.poll_lock(cx));
145-
inner.stream_pin().poll_write_vectored(cx, bufs)
143+
self.inner
144+
.with_lock(|stream| stream.poll_write_vectored(cx, bufs))
146145
}
147146

148147
fn is_write_vectored(&self) -> bool {
149148
self.inner.is_write_vectored
150149
}
151150
}
152151

153-
impl<T> Inner<T> {
154-
fn poll_lock(&self, cx: &mut Context<'_>) -> Poll<Guard<'_, T>> {
155-
if self
156-
.locked
157-
.compare_exchange(false, true, Acquire, Acquire)
158-
.is_ok()
159-
{
160-
Poll::Ready(Guard { inner: self })
161-
} else {
162-
// Spin... but investigate a better strategy
163-
164-
std::thread::yield_now();
165-
cx.waker().wake_by_ref();
166-
167-
Poll::Pending
168-
}
169-
}
170-
}
171-
172-
impl<T> Guard<'_, T> {
173-
fn stream_pin(&mut self) -> Pin<&mut T> {
174-
// safety: the stream is pinned in `Arc` and the `Guard` ensures mutual
175-
// exclusion.
176-
unsafe { Pin::new_unchecked(&mut *self.inner.stream.get()) }
177-
}
178-
}
179-
180-
impl<T> Drop for Guard<'_, T> {
181-
fn drop(&mut self) {
182-
self.inner.locked.store(false, Release);
183-
}
184-
}
185-
186152
unsafe impl<T: Send> Send for ReadHalf<T> {}
187153
unsafe impl<T: Send> Send for WriteHalf<T> {}
188154
unsafe impl<T: Sync> Sync for ReadHalf<T> {}

0 commit comments

Comments
 (0)