From 95a2e3726b39f1aa03060a8d43aa6587dcd83ba5 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Thu, 25 Jan 2024 01:15:35 +0800 Subject: [PATCH 1/4] WIP --- .github/workflows/ci.yml | 177 +----------- Cargo.toml | 45 ++- README-zh_CN.md | 50 ---- README.md | 37 ++- benches/foo.rs | 1 - ci/miri.sh | 11 - ci/sanitizer.sh | 17 -- examples/foo.rs | 1 - src/future.rs | 495 +++++++++++++++++++++++++++++++++ src/future/peek.rs | 34 +++ src/future/peek_exact.rs | 48 ++++ src/future/peek_to_end.rs | 53 ++++ src/future/peek_to_string.rs | 62 +++++ src/future/peek_vectored.rs | 34 +++ src/lib.rs | 461 ++++++++++++++++++++++++++++++- src/tokio.rs | 511 +++++++++++++++++++++++++++++++++++ src/tokio/peek.rs | 57 ++++ src/tokio/peek_buf.rs | 75 +++++ src/tokio/peek_exact.rs | 73 +++++ src/tokio/peek_to_end.rs | 66 +++++ src/tokio/peek_to_string.rs | 75 +++++ tests/foo.rs | 1 - 22 files changed, 2073 insertions(+), 311 deletions(-) delete mode 100644 benches/foo.rs delete mode 100755 ci/miri.sh delete mode 100755 ci/sanitizer.sh delete mode 100644 examples/foo.rs create mode 100644 src/future.rs create mode 100644 src/future/peek.rs create mode 100644 src/future/peek_exact.rs create mode 100644 src/future/peek_to_end.rs create mode 100644 src/future/peek_to_string.rs create mode 100644 src/future/peek_vectored.rs create mode 100644 src/tokio.rs create mode 100644 src/tokio/peek.rs create mode 100644 src/tokio/peek_buf.rs create mode 100644 src/tokio/peek_exact.rs create mode 100644 src/tokio/peek_to_end.rs create mode 100644 src/tokio/peek_to_string.rs delete mode 100644 tests/foo.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c6ba154..8db5034 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -63,59 +63,7 @@ jobs: - name: Install cargo-hack run: cargo install cargo-hack - name: Apply clippy lints - run: cargo hack clippy --each-feature - - # Run tests on some extra platforms - cross: - name: cross - strategy: - matrix: - target: - - aarch64-unknown-linux-gnu - - aarch64-linux-android - - aarch64-unknown-linux-musl - - i686-linux-android - - x86_64-linux-android - - i686-pc-windows-gnu - - x86_64-pc-windows-gnu - - i686-unknown-linux-gnu - - powerpc64-unknown-linux-gnu - # - mips64-unknown-linux-gnuabi64 - - riscv64gc-unknown-linux-gnu - - wasm32-unknown-unknown - - wasm32-unknown-emscripten - - wasm32-wasi - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: Cache cargo build and registry - uses: actions/cache@v3 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cross-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-cross- - - name: Install Rust - run: rustup update stable && rustup default stable - - name: cross build --target ${{ matrix.target }} - run: | - cargo install cross - cross build --target ${{ matrix.target }} - if: matrix.target != 'wasm32-unknown-unknown' - # # WASM support - # - name: cargo build --target ${{ matrix.target }} - # run: | - # rustup target add ${{ matrix.target }} - # cargo build --features js --target ${{ matrix.target }} - # if: matrix.target == 'wasm32-unknown-unknown' - # - name: cargo build --target ${{ matrix.target }} - # run: | - # rustup target add ${{ matrix.target }} - # cargo +nightly build --no-default-features --features alloc --target ${{ matrix.target }} -Z build-std=core,alloc - # if: matrix.target == 'mips64-unknown-linux-gnuabi64' + run: cargo hack clippy --each-feature build: name: build @@ -184,125 +132,6 @@ jobs: key: ${{ runner.os }}-coverage-dotcargo - name: Run test run: cargo hack test --feature-powerset - - sanitizer: - name: sanitizer - strategy: - matrix: - os: - - ubuntu-latest - - macos-latest - - windows-latest - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v3 - - name: Cache cargo build and registry - uses: actions/cache@v3 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-sanitizer-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-sanitizer- - - name: Install Rust - run: rustup update $nightly && rustup default $nightly - - name: Install rust-src - run: rustup component add rust-src - - name: Install cargo-hack - run: cargo install cargo-hack - - name: ASAN / LSAN / TSAN - run: ci/sanitizer.sh - - miri: - name: miri - strategy: - matrix: - os: - - ubuntu-latest - - macos-latest - - windows-latest - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v3 - - name: Cache cargo build and registry - uses: actions/cache@v3 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-miri-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-miri- - - name: Install cargo-hack - run: cargo install cargo-hack - - name: Miri - run: ci/miri.sh - loom: - name: loom - strategy: - matrix: - os: - - ubuntu-latest - - macos-latest - - windows-latest - runs-on: ${{ matrix.os }} - steps: - - uses: actions/checkout@v3 - - name: Cache cargo build and registry - uses: actions/cache@v3 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-loom-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ${{ runner.os }}-loom- - - name: Install Rust - run: rustup update $nightly && rustup default $nightly - - name: Install cargo-hack - run: cargo install cargo-hack - - name: Loom tests - run: RUSTFLAGS="--cfg loom -Dwarnings" cargo hack test --test loom - - # valgrind - valgrind: - name: valgrind - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v3 - - name: Cache cargo build and registry - uses: actions/cache@v3 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ubuntu-latest-valgrind-${{ hashFiles('**/Cargo.lock') }} - restore-keys: | - ubuntu-latest-valgrind- - - - name: Install Rust ${{ env.stable }} - uses: actions-rs/toolchain@v1 - with: - toolchain: ${{ env.stable }} - override: true - - name: Install Valgrind - run: | - sudo apt-get update -y - sudo apt-get install -y valgrind - # Compile tests - # - name: cargo build foo - # run: cargo build --bin foo - # working-directory: integration - - # Run with valgrind - # - name: Run valgrind foo - # run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/foo - # working-directory: integration docs: name: docs @@ -336,11 +165,7 @@ jobs: - rustfmt - clippy - build - - cross - test - - sanitizer - - miri - - loom - docs steps: - uses: actions/checkout@v3 diff --git a/Cargo.toml b/Cargo.toml index 583f5a6..140e882 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,37 +1,34 @@ [package] -name = "template-rs" -version = "0.1.6" +name = "peekable" +version = "0.0.0" edition = "2021" -repository = "https://github.com/al8n/template-rs" -homepage = "https://github.com/al8n/template-rs" -documentation = "https://docs.rs/template-rs" -description = "A template for creating Rust open-source repo on GitHub" +repository = "https://github.com/al8n/peekable" +homepage = "https://github.com/al8n/peekable" +documentation = "https://docs.rs/peekable" +description = "Peakable reader and async reader" license = "MIT/Apache-2.0" -rust-version = "1.73" - -[[bench]] -path = "benches/foo.rs" -name = "foo" -harness = false +rust-version = "1.56" [features] default = [] +future = ["futures-util", "pin-project-lite"] +tokio = ["dep:tokio", "pin-project-lite", "bytes"] [dependencies] +smallvec = { version = "1", optional = true } -[dev-dependencies] -criterion = "0.5" -tempfile = "3" +# futures-io = { version = "0.3", optional = true } +futures-util = { version = "=0.3.29", optional = true, features = ["io"] } +pin-project-lite = { version = "0.2", optional = true } + +tokio = { version = "1", default-features = false, optional = true, features = ["io-util"] } +bytes = { version = "1", optional = true } -[profile.bench] -opt-level = 3 -debug = false -codegen-units = 1 -lto = 'thin' -incremental = false -debug-assertions = false -overflow-checks = false -rpath = false + +[dev-dependencies] +futures = { version = "0.3", features = ["executor"] } +tokio = { version = "1", features = ["full"] } +tokio-util = { version = "0.7", features = ["compat"] } [package.metadata.docs.rs] all-features = true diff --git a/README-zh_CN.md b/README-zh_CN.md index 21d7466..e69de29 100644 --- a/README-zh_CN.md +++ b/README-zh_CN.md @@ -1,50 +0,0 @@ -
-

template-rs

-
-
- -开源Rust代码库GitHub模版 - -[github][Github-url] -[Build][CI-url] -[codecov][codecov-url] - -[docs.rs][doc-url] -[crates.io][crates-url] -[rustc][rustc-url] - -[license-apache][license-apache-url] -[license-mit][license-mit-url] - -[English][en-url] | 简体中文 - -
- -## Installation -```toml -[dependencies] -template_rs = "0.1" -``` - -## Features -- [x] 更快的创建GitHub开源Rust代码库 - -#### License - -`Template-rs` is under the terms of both the MIT license and the -Apache License (Version 2.0). - -See [LICENSE-APACHE](LICENSE-APACHE), [LICENSE-MIT](LICENSE-MIT) for details. - -Copyright (c) 2021 Al Liu. - -[Github-url]: https://github.com/al8n/template-rs/ -[CI-url]: https://github.com/al8n/template/actions/workflows/template.yml -[doc-url]: https://docs.rs/template-rs -[crates-url]: https://crates.io/crates/template-rs -[codecov-url]: https://app.codecov.io/gh/al8n/template-rs/ -[license-url]: https://opensource.org/licenses/Apache-2.0 -[rustc-url]: https://github.com/rust-lang/rust/blob/master/RELEASES.md -[license-apache-url]: https://opensource.org/licenses/Apache-2.0 -[license-mit-url]: https://opensource.org/licenses/MIT -[en-url]: https://github.com/al8n/template-rs/tree/main/README.md diff --git a/README.md b/README.md index 218fb4f..9ab4e15 100644 --- a/README.md +++ b/README.md @@ -1,17 +1,17 @@
-

template-rs

+

Peekable

-A template for creating Rust open-source GitHub repo. +WIP: Peakable reader and async reader -[github][Github-url] -[Build][CI-url] -[codecov][codecov-url] +[github][Github-url] +[Build][CI-url] +[codecov][codecov-url] -[docs.rs][doc-url] -[crates.io][crates-url] -[crates.io][crates-url] +[docs.rs][doc-url] +[crates.io][crates-url] +[crates.io][crates-url] license @@ -22,24 +22,21 @@ English | [简体中文][zh-cn-url] ## Installation ```toml [dependencies] -template_rs = "0.1" +peekable = "0.0.0" ``` -## Features -- [x] Create a Rust open-source repo fast - #### License -`Template-rs` is under the terms of both the MIT license and the +`peekable` is under the terms of both the MIT license and the Apache License (Version 2.0). See [LICENSE-APACHE](LICENSE-APACHE), [LICENSE-MIT](LICENSE-MIT) for details. -Copyright (c) 2021 Al Liu. +Copyright (c) 2024 Al Liu. -[Github-url]: https://github.com/al8n/template-rs/ -[CI-url]: https://github.com/al8n/template-rs/actions/workflows/ci.yml -[doc-url]: https://docs.rs/template-rs -[crates-url]: https://crates.io/crates/template-rs -[codecov-url]: https://app.codecov.io/gh/al8n/template-rs/ -[zh-cn-url]: https://github.com/al8n/template-rs/tree/main/README-zh_CN.md +[Github-url]: https://github.com/al8n/peekable/ +[CI-url]: https://github.com/al8n/peekable/actions/workflows/ci.yml +[doc-url]: https://docs.rs/peekable +[crates-url]: https://crates.io/crates/peekable +[codecov-url]: https://app.codecov.io/gh/al8n/peekable/ +[zh-cn-url]: https://github.com/al8n/peekable/tree/main/README-zh_CN.md diff --git a/benches/foo.rs b/benches/foo.rs deleted file mode 100644 index f328e4d..0000000 --- a/benches/foo.rs +++ /dev/null @@ -1 +0,0 @@ -fn main() {} diff --git a/ci/miri.sh b/ci/miri.sh deleted file mode 100755 index 7ea1a2c..0000000 --- a/ci/miri.sh +++ /dev/null @@ -1,11 +0,0 @@ -#!/bin/bash -set -e - -rustup toolchain install nightly --component miri -rustup override set nightly -cargo miri setup - -export MIRIFLAGS="-Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-symbolic-alignment-check" - -cargo hack miri test --each-feature - diff --git a/ci/sanitizer.sh b/ci/sanitizer.sh deleted file mode 100755 index a21beb3..0000000 --- a/ci/sanitizer.sh +++ /dev/null @@ -1,17 +0,0 @@ -#!/bin/bash - -set -ex - -export ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0" - -# Run address sanitizer with cargo-hack -RUSTFLAGS="-Z sanitizer=address" \ -cargo hack test --lib --each-feature - -# Run leak sanitizer with cargo-hack -RUSTFLAGS="-Z sanitizer=leak" \ -cargo hack test --lib --each-feature - -# Run thread sanitizer with cargo-hack -RUSTFLAGS="-Z sanitizer=thread" \ -cargo hack -Zbuild-std test --lib --each-feature diff --git a/examples/foo.rs b/examples/foo.rs deleted file mode 100644 index f328e4d..0000000 --- a/examples/foo.rs +++ /dev/null @@ -1 +0,0 @@ -fn main() {} diff --git a/src/future.rs b/src/future.rs new file mode 100644 index 0000000..19d025a --- /dev/null +++ b/src/future.rs @@ -0,0 +1,495 @@ +use std::{ + cmp, + future::Future, + io::Result, + ops::DerefMut, + pin::Pin, + task::{Context, Poll}, +}; + +use futures_util::{io::IoSliceMut, AsyncRead}; + +use super::*; + +mod peek; +pub use peek::Peek; +mod peek_exact; +pub use peek_exact::PeekExact; +mod peek_to_string; +pub use peek_to_string::PeekToString; +mod peek_to_end; +pub use peek_to_end::PeekToEnd; +mod peek_vectored; +pub use peek_vectored::PeekVectored; + +/// Peek bytes asynchronously. +/// +/// This trait is analogous to the `peekable::Peek` trait, but integrates +/// with the asynchronous task system. In particular, the `poll_read` +/// method, unlike `Peek::peek`, will automatically queue the current task +/// for wakeup and return if data is not yet available, rather than blocking +/// the calling thread. +pub trait AsyncPeek: AsyncRead { + /// Attempt to peek from the `AsyncPeek` into `buf`. + /// + /// On success, returns `Poll::Ready(Ok(num_bytes_peek))`. + /// + /// If no data is available for reading, the method returns + /// `Poll::Pending` and arranges for the current task (via + /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes + /// readable or is closed. + /// + /// # Implementation + /// + /// This function may not return errors of kind `WouldBlock` or + /// `Interrupted`. Implementations must convert `WouldBlock` into + /// `Poll::Pending` and either internally retry or convert + /// `Interrupted` into another error kind. + fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll>; + + /// Attempt to read from the `AsyncPeek` into `bufs` using vectored + /// IO operations. + /// + /// This method is similar to `poll_peek`, but allows data to be read + /// into multiple buffers using a single operation. + /// + /// On success, returns `Poll::Ready(Ok(num_bytes_read))`. + /// + /// If no data is available for reading, the method returns + /// `Poll::Pending` and arranges for the current task (via + /// `cx.waker().wake_by_ref()`) to receive a notification when the object becomes + /// readable or is closed. + /// By default, this method delegates to using `poll_read` on the first + /// nonempty buffer in `bufs`, or an empty one if none exists. Objects which + /// support vectored IO should override this method. + /// + /// # Implementation + /// + /// This function may not return errors of kind `WouldBlock` or + /// `Interrupted`. Implementations must convert `WouldBlock` into + /// `Poll::Pending` and either internally retry or convert + /// `Interrupted` into another error kind. + fn poll_peek_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + for b in bufs { + if !b.is_empty() { + return self.poll_peek(cx, b); + } + } + + self.poll_peek(cx, &mut []) + } +} + +macro_rules! deref_async_peek { + () => { + fn poll_peek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut [u8], + ) -> Poll> { + Pin::new(&mut **self).poll_peek(cx, buf) + } + + fn poll_peek_vectored( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + Pin::new(&mut **self).poll_peek_vectored(cx, bufs) + } + }; +} + +impl AsyncPeek for Box { + deref_async_peek!(); +} + +impl AsyncPeek for &mut T { + deref_async_peek!(); +} + +impl

AsyncPeek for Pin

+where + P: DerefMut + Unpin, + P::Target: AsyncPeek, +{ + fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + self.get_mut().as_mut().poll_peek(cx, buf) + } + + fn poll_peek_vectored( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + bufs: &mut [IoSliceMut<'_>], + ) -> Poll> { + self.get_mut().as_mut().poll_peek_vectored(cx, bufs) + } +} + +pin_project_lite::pin_project! { + /// A wrapper around an [`AsyncRead`] types to make them support [`AsyncPeek`] methods. + #[derive(Debug)] + pub struct AsyncPeekable { + #[pin] + reader: R, + buffer: Buffer, + } +} + +impl From for AsyncPeekable { + fn from(reader: R) -> Self { + Self { + reader, + buffer: Buffer::new(), + } + } +} + +impl AsyncRead for AsyncPeekable { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + mut buf: &mut [u8], + ) -> Poll> { + let want_read = buf.len(); + + // check if the peek buffer has data + let buffer_len = self.buffer.len(); + + let this = self.project(); + if buffer_len > 0 { + return match want_read.cmp(&buffer_len) { + cmp::Ordering::Less => { + buf.copy_from_slice(&this.buffer[..want_read]); + this.buffer.drain(..want_read); + return Poll::Ready(Ok(want_read)); + } + cmp::Ordering::Equal => { + buf.copy_from_slice(this.buffer); + this.buffer.clear(); + return Poll::Ready(Ok(want_read)); + } + cmp::Ordering::Greater => { + buf[..buffer_len].copy_from_slice(this.buffer); + buf = &mut buf[buffer_len..]; + match this.reader.poll_read(cx, buf) { + Poll::Ready(Ok(bytes)) => { + this.buffer.clear(); + Poll::Ready(Ok(bytes + buffer_len)) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => { + this.buffer.clear(); + Poll::Ready(Ok(buffer_len)) + } + } + } + }; + } + + this.reader.poll_read(cx, buf) + } +} + +impl AsyncPeek for AsyncPeekable { + fn poll_peek(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + let want_peek = buf.len(); + + // check if the peek buffer has data + let buffer_len = self.buffer.len(); + + if buffer_len > 0 { + return match want_peek.cmp(&buffer_len) { + cmp::Ordering::Less => { + buf.copy_from_slice(&self.buffer[..want_peek]); + Poll::Ready(Ok(want_peek)) + } + cmp::Ordering::Equal => { + buf.copy_from_slice(&self.buffer); + Poll::Ready(Ok(want_peek)) + } + cmp::Ordering::Greater => { + let this = self.project(); + this.buffer.resize(want_peek, 0); + match this.reader.poll_read(cx, &mut this.buffer[buffer_len..]) { + Poll::Ready(Ok(n)) => { + this.buffer.truncate(n + buffer_len); + buf[..buffer_len + n].copy_from_slice(this.buffer); + Poll::Ready(Ok(buffer_len + n)) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => { + buf[..buffer_len].copy_from_slice(this.buffer); + Poll::Ready(Ok(buffer_len)) + } + } + } + }; + } + + let this = self.project(); + match this.reader.poll_read(cx, buf) { + Poll::Ready(Ok(bytes)) => { + this.buffer.extend_from_slice(&buf[..bytes]); + Poll::Ready(Ok(bytes)) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + } + } +} + +impl AsyncPeekable { + /// Creates a new `AsyncPeekable` which will wrap the given reader. + #[inline] + pub fn new(reader: R) -> Self { + Self { + reader, + buffer: Buffer::new(), + } + } + + /// Creates a new peekable wrapper around the given reader with the specified + /// capacity for the peek buffer. + #[inline] + pub fn with_capacity(reader: R, capacity: usize) -> Self { + Self { + reader, + buffer: Buffer::with_capacity(capacity), + } + } +} + +impl AsyncPeekable { + /// Tries to peek some bytes directly into the given `buf` in asynchronous + /// manner, returning a future type. + /// + /// The returned future will resolve to the number of bytes read once the read + /// operation is completed. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// use peekable::future::AsyncPeekExt; + /// + /// + /// let mut peekable = Cursor::new([1, 2, 3, 4]).peekable(); + /// let mut output = [0u8; 5]; + /// + /// let bytes = peekable.peek(&mut output[..3]).await?; + /// + /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous + /// // reader. In a real system you could get anywhere from 1 to + /// // `output.len()` bytes in a single read. + /// assert_eq!(bytes, 3); + /// assert_eq!(output, [1, 2, 3, 0, 0]); + /// + /// // you can peek mutiple times + /// + /// let bytes = peekable.peek(&mut output[..]).await?; + /// assert_eq!(bytes, 4); + /// assert_eq!(output, [1, 2, 3, 4, 0]); + /// + /// // you can read after peek + /// let mut output = [0u8; 5]; + /// let bytes = peekable.read(&mut output[..2]).await?; + /// assert_eq!(bytes, 2); + /// assert_eq!(output, [1, 2, 0, 0, 0]); + /// + /// // peek after read + /// let mut output = [0u8; 5]; + /// let bytes = peekable.peek(&mut output[..]).await?; + /// assert_eq!(bytes, 2); + /// assert_eq!(output, [3, 4, 0, 0, 0]); + /// + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + pub fn peek<'a>(&'a mut self, buf: &'a mut [u8]) -> Peek<'a, R> + where + Self: Unpin, + { + assert_future(Peek::new(self, buf)) + } + + /// Creates a future which will peek from the `AsyncPeek` into `bufs` using vectored + /// IO operations. + /// + /// The returned future will resolve to the number of bytes read once the read + /// operation is completed. + pub fn peek_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> PeekVectored<'a, R> + where + Self: Unpin, + { + assert_future(PeekVectored::new(self, bufs)) + } + + /// Creates a future which will peek exactly enough bytes to fill `buf`, + /// returning an error if end of file (EOF) is hit sooner. + /// + /// The returned future will resolve once the read operation is completed. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Examples + /// + /// ```rust + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// use peekable::future::AsyncPeekExt; + /// + /// let mut peekable = Cursor::new([1, 2, 3, 4]).peekable(); + /// let mut output = [0u8; 4]; + /// + /// peekable.peek_exact(&mut output).await?; + /// + /// assert_eq!(output, [1, 2, 3, 4]); + /// + /// // read after peek + /// let mut output = [0u8; 2]; + /// + /// peekable.read_exact(&mut output[..]).await?; + /// + /// assert_eq!(output, [1, 2]); + /// + /// // peek after read + /// let mut output = [0u8; 2]; + /// peekable.peek_exact(&mut output).await?; + /// + /// assert_eq!(output, [3, 4]); + /// + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + /// + /// ## EOF is hit before `buf` is filled + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{self, AsyncReadExt, Cursor}; + /// use peekable::future::AsyncPeekExt; + /// + /// let mut peekable = Cursor::new([1, 2, 3, 4]).peekable(); + /// let mut output = [0u8; 5]; + + /// let result = peekable.peek_exact(&mut output).await; + /// assert_eq!( + /// result.unwrap_err().kind(), + /// std::io::ErrorKind::UnexpectedEof + /// ); + + /// let result = peekable.peek_exact(&mut output[..4]).await; + /// assert!(result.is_ok()); + /// assert_eq!(output, [1, 2, 3, 4, 0]); + /// # }); + /// ``` + pub fn peek_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> PeekExact<'a, R> + where + Self: Unpin, + { + assert_future::, _>(PeekExact::new(self, buf)) + } + + /// Creates a future which will peek all the bytes from this `AsyncPeek`. + /// + /// On success the total number of bytes peek is returned. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// use peekable::future::AsyncPeekExt; + /// + /// let mut peekable = Cursor::new([1, 2, 3, 4]).peekable(); + /// let mut output = Vec::with_capacity(4); + /// + /// let bytes = peekable.peek_to_end(&mut output).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// + /// // read after peek + /// let mut output = Vec::with_capacity(4); + /// + /// let bytes = peekable.read_to_end(&mut output).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + pub fn peek_to_end<'a>(&'a mut self, buf: &'a mut Vec) -> PeekToEnd<'a, R> + where + Self: Unpin, + { + assert_future::, _>(PeekToEnd::new(self, buf)) + } + + /// Creates a future which will peek all the bytes from this `AsyncPeek`. + /// + /// On success the total number of bytes peek is returned. + /// + /// # Examples + /// + /// ``` + /// # futures::executor::block_on(async { + /// use futures::io::{AsyncReadExt, Cursor}; + /// use peekable::future::AsyncPeekExt; + /// + /// let mut peekable = Cursor::new(&b"1234"[..]).peekable(); + /// let mut buffer = String::with_capacity(4); + /// + /// let bytes = peekable.peek_to_string(&mut buffer).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(buffer, String::from("1234")); + /// + /// // read after peek + /// let mut buffer = String::with_capacity(4); + /// let bytes = peekable.peek_to_string(&mut buffer).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(buffer, String::from("1234")); + /// + /// # Ok::<(), Box>(()) }).unwrap(); + /// ``` + pub fn peek_to_string<'a>(&'a mut self, buf: &'a mut String) -> PeekToString<'a, R> + where + Self: Unpin, + { + assert_future::, _>(PeekToString::new(self, buf)) + } +} + +/// An extension trait which adds peek related utility methods to [`AsyncRead`] types +pub trait AsyncPeekExt: AsyncRead { + /// Creates a new `AsyncPeekable` which will wrap the given reader. + fn peekable(self) -> AsyncPeekable + where + Self: Sized, + { + AsyncPeekable { + reader: self, + buffer: Buffer::new(), + } + } +} + +impl AsyncPeekExt for R {} + +// impl AsyncPeekExt for R {} + +// Just a helper function to ensure the futures we're returning all have the +// right implementations. +pub(crate) fn assert_future(future: F) -> F +where + F: Future, +{ + future +} diff --git a/src/future/peek.rs b/src/future/peek.rs new file mode 100644 index 0000000..73f993c --- /dev/null +++ b/src/future/peek.rs @@ -0,0 +1,34 @@ +use futures_util::AsyncRead; + +use super::{AsyncPeek, AsyncPeekable}; +use std::{ + future::Future, + io, + pin::Pin, + task::{Context, Poll}, +}; + +/// Future for the [`peek`](AsyncPeekable::peek) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct Peek<'a, R> { + peekable: &'a mut AsyncPeekable, + buf: &'a mut [u8], +} + +impl Unpin for Peek<'_, R> {} + +impl<'a, R: AsyncRead + Unpin> Peek<'a, R> { + pub(super) fn new(peekable: &'a mut AsyncPeekable, buf: &'a mut [u8]) -> Self { + Self { peekable, buf } + } +} + +impl Future for Peek<'_, R> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + Pin::new(&mut this.peekable).poll_peek(cx, this.buf) + } +} diff --git a/src/future/peek_exact.rs b/src/future/peek_exact.rs new file mode 100644 index 0000000..631dbcb --- /dev/null +++ b/src/future/peek_exact.rs @@ -0,0 +1,48 @@ +use futures_util::{AsyncRead, AsyncReadExt, FutureExt}; + +use super::AsyncPeekable; +use std::{ + future::Future, + io, + pin::Pin, + task::{Context, Poll}, +}; + +/// Future for the [`peek_exact`](super::AsyncPeekable::peek_exact) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PeekExact<'a, P> { + peekable: &'a mut AsyncPeekable

, + buf: &'a mut [u8], +} + +impl Unpin for PeekExact<'_, P> {} + +impl<'a, P: AsyncRead + Unpin> PeekExact<'a, P> { + pub(super) fn new(peekable: &'a mut AsyncPeekable

, buf: &'a mut [u8]) -> Self { + Self { peekable, buf } + } +} + +impl Future for PeekExact<'_, P> { + type Output = io::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + + let buf_len = this.buf.len(); + let peek_buf_len = this.peekable.buffer.len(); + + if buf_len <= peek_buf_len { + this.buf.copy_from_slice(&this.peekable.buffer[..buf_len]); + return Poll::Ready(Ok(())); + } + + this.buf[..peek_buf_len].copy_from_slice(&this.peekable.buffer); + let mut fut = this + .peekable + .read_exact(&mut this.buf[peek_buf_len..]); + + fut.poll_unpin(cx) + } +} diff --git a/src/future/peek_to_end.rs b/src/future/peek_to_end.rs new file mode 100644 index 0000000..52f687f --- /dev/null +++ b/src/future/peek_to_end.rs @@ -0,0 +1,53 @@ +use futures_util::{AsyncRead, AsyncReadExt, FutureExt}; + +use super::AsyncPeekable; +use std::{ + future::Future, + io, + pin::Pin, + task::{Context, Poll}, +}; + +/// Future for the [`peek_to_end`](super::AsyncPeekable::peek_to_end) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PeekToEnd<'a, P> { + peekable: &'a mut AsyncPeekable

, + buf: &'a mut Vec, +} + +impl Unpin for PeekToEnd<'_, P> {} + +impl<'a, P: AsyncRead + Unpin> PeekToEnd<'a, P> { + pub(super) fn new(peekable: &'a mut AsyncPeekable

, buf: &'a mut Vec) -> Self { + Self { peekable, buf } + } +} + +impl Future for PeekToEnd<'_, A> +where + A: AsyncRead + Unpin, +{ + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + let inbuf = this.peekable.buffer.len(); + + let original_buf_len = this.buf.len(); + this.buf.extend_from_slice(&this.peekable.buffer); + + let mut fut = this.peekable.reader.read_to_end(this.buf); + match fut.poll_unpin(cx) { + Poll::Ready(Ok(read)) => { + this.peekable.buffer.extend_from_slice(&this.buf[original_buf_len + inbuf..]); + Poll::Ready(Ok(read + inbuf)) + } + Poll::Ready(Err(e)) => { + this.peekable.buffer.extend_from_slice(&this.buf[original_buf_len + inbuf..]); + Poll::Ready(Err(e)) + } + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/future/peek_to_string.rs b/src/future/peek_to_string.rs new file mode 100644 index 0000000..e1fed51 --- /dev/null +++ b/src/future/peek_to_string.rs @@ -0,0 +1,62 @@ +use futures_util::{AsyncRead, AsyncReadExt, FutureExt}; + +use super::AsyncPeekable; +use std::{ + future::Future, + io, + pin::Pin, + task::{Context, Poll}, +}; + +/// Future for the [`peek_to_string`](super::AsyncPeekable::peek_to_string) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PeekToString<'a, P> { + reader: &'a mut AsyncPeekable

, + buf: &'a mut String, +} + +impl Unpin for PeekToString<'_, P> {} + +impl<'a, P: AsyncRead + Unpin> PeekToString<'a, P> { + pub(super) fn new(reader: &'a mut AsyncPeekable

, buf: &'a mut String) -> Self { + Self { reader, buf } + } +} + +impl Future for PeekToString<'_, A> +where + A: AsyncRead + Unpin, +{ + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let Self { reader, buf } = &mut *self; + let s = match core::str::from_utf8(&reader.buffer) { + Ok(s) => s, + Err(_) => { + return Poll::Ready(Err(io::Error::new( + io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + ))) + } + }; + + let original_buf_len = buf.len(); + buf.push_str(s); + + let inbuf = reader.buffer.len(); + let mut fut = reader.reader.read_to_string(buf); + match fut.poll_unpin(cx) { + Poll::Ready(Ok(read)) => { + reader.buffer.extend_from_slice(&buf.as_bytes()[original_buf_len + inbuf..]); + Poll::Ready(Ok(read + inbuf)) + } + Poll::Ready(Err(e)) => { + reader.buffer.extend_from_slice(&buf.as_bytes()[original_buf_len + inbuf..]); + Poll::Ready(Err(e)) + } + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/future/peek_vectored.rs b/src/future/peek_vectored.rs new file mode 100644 index 0000000..4ec3dcb --- /dev/null +++ b/src/future/peek_vectored.rs @@ -0,0 +1,34 @@ +use futures_util::{io::IoSliceMut, AsyncRead}; + +use super::{AsyncPeek, AsyncPeekable}; +use std::{ + future::Future, + io, + pin::Pin, + task::{Context, Poll}, +}; + +/// Future for the [`peek_vectored`](AsyncPeekable::peek_vectored) method. +#[derive(Debug)] +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct PeekVectored<'a, R> { + peekable: &'a mut AsyncPeekable, + bufs: &'a mut [IoSliceMut<'a>], +} + +impl Unpin for PeekVectored<'_, R> {} + +impl<'a, R: AsyncRead + Unpin> PeekVectored<'a, R> { + pub(super) fn new(peekable: &'a mut AsyncPeekable, bufs: &'a mut [IoSliceMut<'a>]) -> Self { + Self { peekable, bufs } + } +} + +impl Future for PeekVectored<'_, R> { + type Output = io::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + Pin::new(&mut this.peekable).poll_peek_vectored(cx, this.bufs) + } +} diff --git a/src/lib.rs b/src/lib.rs index dbdf2a0..8167f4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,19 +1,460 @@ -//! A template for creating Rust open-source repo on GitHub +//! Peakable peeker and async peeker +//! +//! #![cfg_attr(docsrs, feature(doc_cfg))] #![cfg_attr(docsrs, allow(unused_attributes))] #![deny(missing_docs)] -/// template -pub fn it_works() -> usize { - 4 +use std::{cmp, io::{IoSliceMut, Read, Result}}; + +#[cfg(feature = "smallvec")] +type Buffer = smallvec::SmallVec<[u8; 64]>; + +#[cfg(not(feature = "smallvec"))] +type Buffer = Vec; + +/// Extracts the successful type of a `Poll`. +/// +/// This macro bakes in propagation of `Pending` signals by returning early. +#[cfg(any(feature = "tokio", feature = "future"))] +#[macro_export] +macro_rules! ready { + ($e:expr $(,)?) => { + match $e { + ::std::task::Poll::Ready(t) => t, + ::std::task::Poll::Pending => return ::std::task::Poll::Pending, + } + }; } -#[cfg(test)] -mod tests { - use super::*; +/// Asynchronous peek I/O +/// +/// This crate contains the `AsyncPeek` and `AsyncPeekExt` +/// traits, the asynchronous analogs to +/// `peekable::{Peek, Peekable}`. The primary difference is +/// that these traits integrate with the asynchronous task system. +/// +/// All items of this library are only available when the `future` feature of this +/// library is activated, and it is not activated by default. +#[cfg(feature = "future")] +#[cfg_attr(docsrs, doc(cfg(feature = "future")))] +pub mod future; + +/// Traits, helpers, and type definitions for asynchronous peekable I/O functionality. +/// +/// This module is the asynchronous version of `peekable::{Peek, Peekable}`. Primarily, it +/// defines one trait, [`AsyncPeek`], which is asynchronous +/// version of the [`Peek`] trait. +#[cfg(feature = "tokio")] +#[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] +pub mod tokio; + +/// A wrapper around an [`Read`] types to make them support [`Peek`] methods. +pub struct Peekable { + /// The inner peeker. + peeker: R, + /// The buffer used to store peeked bytes. + buffer: Buffer, +} + +impl Read for Peekable { + fn read(&mut self, mut buf: &mut [u8]) -> Result { + let this = self; + let want_peek = buf.len(); + + // check if the peek buffer has data + let buffer_len = this.buffer.len(); + if buffer_len > 0 { + return match want_peek.cmp(&buffer_len) { + cmp::Ordering::Less => { + buf.copy_from_slice(&this.buffer[..want_peek]); + this.buffer.drain(..want_peek); + return Ok(want_peek); + } + cmp::Ordering::Equal => { + buf.copy_from_slice(&this.buffer); + this.buffer.clear(); + return Ok(want_peek); + } + cmp::Ordering::Greater => { + buf[..buffer_len].copy_from_slice(&this.buffer); + buf = &mut buf[buffer_len..]; + match this.peeker.read(buf) { + Ok(bytes) => { + this.buffer.clear(); + Ok(bytes + buffer_len) + } + Err(e) => Err(e), + } + } + }; + } + + this.peeker.read(buf) + } +} + +impl From for Peekable { + fn from(peeker: R) -> Self { + Peekable { + peeker, + buffer: Buffer::new(), + } + } +} + +impl Peekable { + /// Creates a new peekable wrapper around the given reader. + pub fn new(reader: R) -> Self { + Self { + peeker: reader, + buffer: Buffer::new(), + } + } + + /// Creates a new peekable wrapper around the given reader with the specified + /// capacity for the peek buffer. + pub fn with_capacity(reader: R, capacity: usize) -> Self { + Self { + peeker: reader, + buffer: Buffer::with_capacity(capacity), + } + } +} + +impl Peekable { + /// Pull some bytes from this source into the specified buffer, returning + /// how many bytes were peeked. + /// + /// This function does not provide any guarantees about whether it blocks + /// waiting for data, but if an object needs to block for a peek and cannot, + /// it will typically signal this via an [`Err`] return value. + /// + /// If the return value of this method is [`Ok(n)`], then implementations must + /// guarantee that `0 <= n <= buf.len()`. A nonzero `n` value indicates + /// that the buffer `buf` has been filled in with `n` bytes of data from this + /// source. If `n` is `0`, then it can indicate one of two scenarios: + /// + /// 1. This peeker has reached its "end of file" and will likely no longer + /// be able to produce bytes. Note that this does not mean that the + /// peeker will *always* no longer be able to produce bytes. As an example, + /// on Linux, this method will call the `recv` syscall for a [`TcpStream`], + /// where returning zero indicates the connection was shut down correctly. While + /// for [`File`], it is possible to reach the end of file and get zero as result, + /// but if more data is appended to the file, future calls to `peek` will return + /// more data. + /// 2. The buffer specified was 0 bytes in length. + /// + /// It is not an error if the returned value `n` is smaller than the buffer size, + /// even when the peeker is not at the end of the stream yet. + /// This may happen for example because fewer bytes are actually available right now + /// (e. g. being close to end-of-file) or because peek() was interrupted by a signal. + /// + /// As this trait is safe to implement, callers in unsafe code cannot rely on + /// `n <= buf.len()` for safety. + /// Extra care needs to be taken when `unsafe` functions are used to access the peek bytes. + /// Callers have to ensure that no unchecked out-of-bounds accesses are possible even if + /// `n > buf.len()`. + /// + /// No guarantees are provided about the contents of `buf` when this + /// function is called, so implementations cannot rely on any property of the + /// contents of `buf` being true. It is recommended that *implementations* + /// only write data to `buf` instead of peeking its contents. + /// + /// Correspondingly, however, *callers* of this method in unsafe code must not assume + /// any guarantees about how the implementation uses `buf`. The trait is safe to implement, + /// so it is possible that the code that's supposed to write to the buffer might also peek + /// from it. It is your responsibility to make sure that `buf` is initialized + /// before calling `peek`. Calling `peek` with an uninitialized `buf` (of the kind one + /// obtains via [`MaybeUninit`]) is not safe, and can lead to undefined behavior. + /// + /// [`MaybeUninit`]: crate::mem::MaybeUninit + /// + /// # Errors + /// + /// If this function encounters any form of I/O or other error, an error + /// variant will be returned. If an error is returned then it must be + /// guaranteed that no bytes were peek. + /// + /// An error of the [`ErrorKind::Interrupted`] kind is non-fatal and the peek + /// operation should be retried if there is nothing else to do. + /// + /// # Examples + /// + /// [`File`]s implement `Read`: + /// + /// [`Ok(n)`]: Ok + /// [`File`]: std::fs::File + /// [`TcpStream`]: std::net::TcpStream + /// + /// ```no_run + /// use std::io; + /// use std::io::prelude::*; + /// use std::fs::File; + /// use peekable::PeekExt; + /// + /// fn main() -> io::Result<()> { + /// let mut f = File::open("foo.txt")?.peekable(); + /// let mut buffer = [0; 10]; + /// + /// // peek up to 10 bytes + /// let n = f.peek(&mut buffer[..])?; + /// + /// println!("The bytes: {:?}", &buffer[..n]); + /// Ok(()) + /// } + /// ``` + pub fn peek(&mut self, buf: &mut [u8]) -> std::io::Result { + let want_peek = buf.len(); + + // check if the peek buffer has data + let buffer_len = self.buffer.len(); - #[test] - fn test_works() { - assert_eq!(it_works(), 4); + if buffer_len > 0 { + return match want_peek.cmp(&buffer_len) { + cmp::Ordering::Less => { + buf.copy_from_slice(&self.buffer[..want_peek]); + Ok(want_peek) + } + cmp::Ordering::Equal => { + buf.copy_from_slice(&self.buffer); + Ok(want_peek) + } + cmp::Ordering::Greater => { + let this = self; + this.buffer.resize(want_peek, 0); + match this.peeker.read(&mut this.buffer[buffer_len..]) { + Ok(n) => { + this.buffer.truncate(n + buffer_len); + buf[..buffer_len + n].copy_from_slice(&this.buffer); + Ok(buffer_len + n) + } + Err(e) => Err(e), + } + } + }; + } + + let this = self; + match this.peeker.read(buf) { + Ok(bytes) => { + this.buffer.extend_from_slice(&buf[..bytes]); + Ok(bytes) + } + Err(e) => Err(e), + } + } + + /// Like `peek`, except that it peeks into a slice of buffers. + /// + /// Data is copied to fill each buffer in order, with the final buffer + /// written to possibly being only partially filled. This method must + /// behave equivalently to a single call to `peek` with concatenated + /// buffers. + /// + /// The default implementation calls `peek` with either the first nonempty + /// buffer provided, or an empty one if none exists. + pub fn peek_vectored(&mut self, bufs: &mut [IoSliceMut<'_>]) -> Result { + for b in bufs { + if !b.is_empty() { + return self.peek(b); + } + } + + self.peek(&mut []) + } + + /// Peek all bytes until EOF in this source, placing them into `buf`. + /// + /// All bytes peek from this source will be appended to the specified buffer + /// `buf`. This function will continuously call [`peek()`] to append more data to + /// `buf` until [`peek()`] returns either [`Ok(0)`] or an error of + /// non-[`ErrorKind::Interrupted`] kind. + /// + /// If successful, this function will return the total number of bytes peek. + /// + /// # Errors + /// + /// If this function encounters an error of the kind + /// [`ErrorKind::Interrupted`] then the error is ignored and the operation + /// will continue. + /// + /// If any other peek error is encountered then this function immediately + /// returns. Any bytes which have alpeeky been peek will be appended to + /// `buf`. + /// + /// # Examples + /// + /// [`File`]s implement `Peek`: + /// + /// [`peek()`]: Peek::peek + /// [`Ok(0)`]: Ok + /// [`File`]: crate::fs::File + /// + /// ```no_run + /// use std::io; + /// use std::io::prelude::*; + /// use std::fs::File; + /// use peekable::PeekExt; + /// + /// fn main() -> io::Result<()> { + /// let mut f = File::open("foo.txt")?.peekable(); + /// let mut buffer = Vec::new(); + /// + /// // peek the whole file + /// f.peek_to_end(&mut buffer)?; + /// Ok(()) + /// } + /// ``` + pub fn peek_to_end(&mut self, buf: &mut Vec) -> Result { + let this = &mut *self; + let inbuf = this.buffer.len(); + + let original_buf = buf.len(); + buf.extend_from_slice(&this.buffer); + + let fut = this.peeker.read_to_end(buf); + match fut { + Ok(read) => { + this.buffer.extend_from_slice(&buf[original_buf + inbuf..]); + Ok(read + inbuf) + } + Err(e) => Err(e), + } + } + + /// Peek all bytes until EOF in this source, appending them to `buf`. + /// + /// If successful, this function returns the number of bytes which were peek + /// and appended to `buf`. + /// + /// # Errors + /// + /// If the data in this stream is *not* valid UTF-8 then an error is + /// returned and `buf` is unchanged. + /// + /// See [`peek_to_end`] for other error semantics. + /// + /// [`peek_to_end`]: Peek::peek_to_end + /// + /// # Examples + /// + /// + /// ```no_run + /// use std::io; + /// use std::io::prelude::*; + /// use std::fs::File; + /// use peekable::PeekExt; + /// + /// fn main() -> io::Result<()> { + /// let mut f = File::open("foo.txt")?.peekable(); + /// let mut buffer = String::new(); + /// + /// f.peek_to_string(&mut buffer)?; + /// Ok(()) + /// } + /// ``` + pub fn peek_to_string(&mut self, buf: &mut String) -> Result { + let s = match core::str::from_utf8(&self.buffer) { + Ok(s) => s, + Err(_) => { + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "stream did not contain valid UTF-8", + )) + } + }; + + buf.push_str(s); + + let inbuf = self.buffer.len(); + let fut = self.peeker.read_to_string(buf); + match fut { + Ok(read) => { + self.buffer.extend_from_slice(&buf.as_bytes()[inbuf..]); + Ok(read + inbuf) + } + Err(e) => Err(e), + } + } + + /// Peek the exact number of bytes required to fill `buf`. + /// + /// This function peeks as many bytes as necessary to completely fill the + /// specified buffer `buf`. + /// + /// No guarantees are provided about the contents of `buf` when this + /// function is called, so implementations cannot rely on any property of the + /// contents of `buf` being true. It is recommended that implementations + /// only write data to `buf` instead of peeking its contents. The + /// documentation on [`peek`] has a more detailed explanation on this + /// subject. + /// + /// # Errors + /// + /// If this function encounters an error of the kind + /// [`ErrorKind::Interrupted`](std::io::ErrorKind::Interrupted) then the error is ignored and the operation + /// will continue. + /// + /// If this function encounters an "end of file" before completely filling + /// the buffer, it returns an error of the kind [`ErrorKind::UnexpectedEof`](std::io::ErrorKind::Interrupted). + /// The contents of `buf` are unspecified in this case. + /// + /// If any other peek error is encountered then this function immediately + /// returns. The contents of `buf` are unspecified in this case. + /// + /// If this function returns an error, it is unspecified how many bytes it + /// has peek, but it will never peek more than would be necessary to + /// completely fill the buffer. + /// + /// # Examples + /// + /// ```no_run + /// use std::io; + /// use std::io::prelude::*; + /// use std::fs::File; + /// use peekable::PeekExt; + /// + /// fn main() -> io::Result<()> { + /// let mut f = File::open("foo.txt")?.peekable(); + /// let mut buffer = [0; 10]; + /// + /// // peek exactly 10 bytes + /// f.peek_exact(&mut buffer)?; + /// Ok(()) + /// } + /// ``` + pub fn peek_exact(&mut self, buf: &mut [u8]) -> Result<()> { + let this = self; + + let buf_len = buf.len(); + let peek_buf_len = this.buffer.len(); + + if buf_len <= peek_buf_len { + buf.copy_from_slice(&this.buffer[..buf_len]); + return Ok(()); + } + + buf[..buf_len].copy_from_slice(&this.buffer); + this + .read_exact(buf) } } + +/// An extension trait which adds utility methods to [`Read`] types. +pub trait PeekExt: Read { + /// Wraps a [`Read`] type in a `Peekable` which provides a [`peek`] method. + /// + /// [`peek`]: Peek::peek + fn peekable(self) -> Peekable + where + Self: Sized, + { + Peekable { + peeker: self, + buffer: Buffer::new(), + } + } +} + +impl PeekExt for R {} diff --git a/src/tokio.rs b/src/tokio.rs new file mode 100644 index 0000000..679d607 --- /dev/null +++ b/src/tokio.rs @@ -0,0 +1,511 @@ +use std::{ + cmp, io, + mem::MaybeUninit, + ops::DerefMut, + pin::Pin, + task::{Context, Poll}, +}; + +use super::Buffer; +use ::tokio::io::{AsyncRead, ReadBuf}; + +mod peek; +pub use peek::*; + +mod peek_buf; +pub use peek_buf::*; + +mod peek_exact; +pub use peek_exact::*; + +mod peek_to_end; +pub use peek_to_end::*; + +mod peek_to_string; +pub use peek_to_string::*; + +/// Peeks bytes from a source. +/// +/// This trait is analogous to the [`Peek`] trait, but integrates with +/// the asynchronous task system. In particular, the [`poll_peek`] method, +/// unlike [`Peek::peek`], will automatically queue the current task for wakeup +/// and return if data is not yet available, rather than blocking the calling +/// thpeek. +/// +/// Specifically, this means that the `poll_peek` function will return one of +/// the following: +/// +/// * `Poll::Ready(Ok(()))` means that data was immediately peek and placed into +/// the output buffer. The amount of data peek can be determined by the +/// increase in the length of the slice returned by `ReadBuf::filled`. If the +/// difference is 0, EOF has been reached. +/// +/// * `Poll::Pending` means that no data was peek into the buffer +/// provided. The I/O object is not currently peekable but may become peekable +/// in the future. Most importantly, **the current future's task is scheduled +/// to get unparked when the object is peekable**. This means that like +/// `Future::poll` you'll receive a notification when the I/O object is +/// peekable again. +/// +/// * `Poll::Ready(Err(e))` for other errors are standard I/O errors coming from the +/// underlying object. +/// +/// This trait importantly means that the `peek` method only works in the +/// context of a future's task. The object may panic if used outside of a task. +/// +/// Utilities for working with `AsyncPeek` values are provided by +/// [`AsyncPeekExt`]. +/// +/// [`poll_peek`]: AsyncPeek::poll_peek +/// [`Peek`]: crate::Peek +/// [`Peek::peek`]: create::Peek::peek +pub trait AsyncPeek: ::tokio::io::AsyncRead { + /// Attempts to peek from the `AsyncPeek` into `buf`. + /// + /// On success, returns `Poll::Peeky(Ok(()))` and places data in the + /// unfilled portion of `buf`. If no data was peek (`buf.filled().len()` is + /// unchanged), it implies that EOF has been reached. + /// + /// If no data is available for peeking, the method returns `Poll::Pending` + /// and arranges for the current task (via `cx.waker()`) to receive a + /// notification when the object becomes peekable or is closed. + fn poll_peek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll>; +} + +macro_rules! deref_async_peek { + () => { + fn poll_peek( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + Pin::new(&mut **self).poll_peek(cx, buf) + } + }; +} + +impl AsyncPeek for Box { + deref_async_peek!(); +} + +impl AsyncPeek for &mut T { + deref_async_peek!(); +} + +impl

AsyncPeek for Pin

+where + P: DerefMut + Unpin, + P::Target: AsyncPeek, +{ + fn poll_peek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + self.get_mut().as_mut().poll_peek(cx, buf) + } +} + +pin_project_lite::pin_project! { + /// A wrapper around an [`AsyncRead`] types to make them support [`AsyncPeek`] methods. + #[derive(Debug)] + pub struct AsyncPeekable { + #[pin] + peeker: R, + buffer: Buffer, + } +} + +impl From for AsyncPeekable { + fn from(peeker: R) -> Self { + Self { + peeker, + buffer: Buffer::new(), + } + } +} + +impl AsyncRead for AsyncPeekable { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.project(); + let buffer_len = this.buffer.len(); + if buffer_len > 0 { + let available = buf.remaining(); + + return match available.cmp(&buffer_len) { + cmp::Ordering::Greater => { + // Continue peeking into the buffer if there's still space + buf.put_slice(this.buffer); + + match this.peeker.poll_read(cx, buf) { + Poll::Ready(Ok(())) => { + this.buffer.clear(); + Poll::Ready(Ok(())) + } + Poll::Ready(Err(e)) => { + let len = buf.filled().len(); + // Safety: len - buffer_len..len guarantees we uninit the exact number of bytes from peek buffer + unsafe { + buf.inner_mut()[len - buffer_len..len].fill(MaybeUninit::uninit()); + } + Poll::Ready(Err(e)) + } + Poll::Pending => Poll::Pending, + } + } + cmp::Ordering::Equal => { + buf.put_slice(this.buffer); + this.buffer.clear(); + return Poll::Ready(Ok(())); + } + cmp::Ordering::Less => { + buf.put_slice(&this.buffer[..available]); + this.buffer.drain(..available); + return Poll::Ready(Ok(())); + } + }; + } + + this.peeker.poll_read(cx, buf) + } +} + +impl AsyncPeek for AsyncPeekable { + fn poll_peek( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.project(); + let buffer_len = this.buffer.len(); + + // Check if the buffer has enough data to fill `buf` + if buffer_len > 0 { + let available = buf.remaining(); + if available > buffer_len { + // Not enough data in the buffer, need to peek more + buf.put_slice(this.buffer); + let cur = buf.filled().len(); + match this.peeker.poll_read(cx, buf) { + Poll::Ready(Ok(())) => { + let filled = buf.filled(); + let read = filled.len() - cur; + this.buffer.extend_from_slice(&filled[cur..cur + read]); + Poll::Ready(Ok(())) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => { + // Put what we have in the buffer into `buf` and return + buf.put_slice(&this.buffer[..buffer_len]); + Poll::Ready(Ok(())) + } + } + } else { + // Enough data in the buffer, copy it to `buf` + buf.put_slice(&this.buffer[..available]); + Poll::Ready(Ok(())) + } + } else { + // No data in the buffer, try to peek directly into `buf` + let cur = buf.filled().len(); + match this.peeker.poll_read(cx, buf) { + Poll::Ready(Ok(())) => { + let filled = buf.filled(); + let read = filled.len() - cur; + this.buffer.extend_from_slice(&filled[cur..cur + read]); + Poll::Ready(Ok(())) + } + Poll::Ready(Err(e)) => Poll::Ready(Err(e)), + Poll::Pending => Poll::Pending, + } + } + } +} + +impl AsyncPeekable { + /// Creates a new `AsyncPeekable` which will wrap the given peeker. + #[inline] + pub fn new(reader: R) -> Self { + Self { + peeker: reader, + buffer: Buffer::new(), + } + } + + /// Creates a new `AsyncPeekable` which will wrap the given peeker with the specified + /// capacity for the peek buffer. + #[inline] + pub fn with_capacity(reader: R, capacity: usize) -> Self { + Self { + peeker: reader, + buffer: Buffer::with_capacity(capacity), + } + } +} + +/// An extension trait which adds peek related utility methods to [`AsyncRead`] types +pub trait AsyncPeekExt: AsyncRead { + /// Creates a new `AsyncPeekable` which will wrap the given peeker. + fn peekable(self) -> AsyncPeekable + where + Self: Sized, + { + AsyncPeekable { + peeker: self, + buffer: Buffer::new(), + } + } +} + +impl AsyncPeekExt for R {} + +impl AsyncPeekable { + /// Creates a future which will peek exactly enough bytes to fill `buf`, + /// returning an error if end of file (EOF) is hit sooner. + /// + /// The returned future will resolve once the read operation is completed. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// use futures::io::Cursor; + /// use tokio::io::AsyncReadExt; + /// use tokio_util::compat::FuturesAsyncReadCompatExt; + /// use peekable::tokio::AsyncPeekExt; + /// + /// + /// let mut peekable = Cursor::new([1, 2, 3, 4]).compat().peekable(); + /// let mut output = [0u8; 5]; + /// + /// let bytes = peekable.peek(&mut output[..3]).await?; + /// + /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous + /// // reader. In a real system you could get anywhere from 1 to + /// // `output.len()` bytes in a single read. + /// assert_eq!(bytes, 3); + /// assert_eq!(output, [1, 2, 3, 0, 0]); + /// + /// // you can peek mutiple times + /// + /// let bytes = peekable.peek(&mut output[..]).await?; + /// assert_eq!(bytes, 4); + /// assert_eq!(output, [1, 2, 3, 4, 0]); + /// + /// // you can read after peek + /// let mut output = [0u8; 5]; + /// let bytes = peekable.read(&mut output[..2]).await?; + /// assert_eq!(bytes, 2); + /// assert_eq!(output, [1, 2, 0, 0, 0]); + /// + /// // peek after read + /// let mut output = [0u8; 5]; + /// let bytes = peekable.peek(&mut output[..]).await?; + /// assert_eq!(bytes, 2); + /// assert_eq!(output, [3, 4, 0, 0, 0]); + /// + /// # Ok::<(), std::io::Error>(()) + /// # } + /// ``` + pub fn peek<'a>(&'a mut self, buf: &'a mut [u8]) -> Peek<'a, R> + where + Self: Unpin, + { + peek(self, buf) + } + + /// Pulls some bytes from this source into the specified buffer, + /// advancing the buffer's internal cursor. + pub fn peek_buf<'a, B>(&'a mut self, buf: &'a mut B) -> PeekBuf<'a, R, B> + where + Self: Unpin, + B: bytes::BufMut + ?Sized, + { + peek_buf(self, buf) + } + + /// Creates a future which will peek exactly enough bytes to fill `buf`, + /// returning an error if end of file (EOF) is hit sooner. + /// + /// The returned future will resolve once the read operation is completed. + /// + /// In the case of an error the buffer and the object will be discarded, with + /// the error yielded. + /// + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// use futures::io::Cursor; + /// use tokio::io::AsyncReadExt; + /// use tokio_util::compat::FuturesAsyncReadCompatExt; + /// use peekable::tokio::AsyncPeekExt; + /// + /// let mut peekable = Cursor::new([1, 2, 3, 4]).compat().peekable(); + /// let mut output = [0u8; 4]; + /// + /// peekable.peek_exact(&mut output).await?; + /// + /// assert_eq!(output, [1, 2, 3, 4]); + /// + /// // read after peek + /// let mut output = [0u8; 2]; + /// + /// peekable.read_exact(&mut output[..]).await?; + /// + /// assert_eq!(output, [1, 2]); + /// + /// // peek after read + /// let mut output = [0u8; 2]; + /// peekable.peek_exact(&mut output).await?; + /// + /// assert_eq!(output, [3, 4]); + /// + /// # Ok::<(), std::io::Error>(()) + /// # } + /// ``` + /// + /// ## EOF is hit before `buf` is filled + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// + /// use futures::io::Cursor; + /// use tokio::io::AsyncReadExt; + /// use tokio_util::compat::FuturesAsyncReadCompatExt; + /// use peekable::tokio::AsyncPeekExt; + /// + /// let mut peekable = Cursor::new([1, 2, 3, 4]).compat().peekable(); + /// let mut output = [0u8; 5]; + /// + /// let result = peekable.peek_exact(&mut output).await; + /// + /// assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::UnexpectedEof); + /// + /// let result = peekable.peek_exact(&mut output[..4]).await; + /// + /// assert!(result.is_ok()); + /// assert_eq!(output, [1, 2, 3, 4, 0]); + /// # Ok::<(), std::io::Error>(()) + /// # } + /// ``` + pub fn peek_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> PeekExact<'a, R> + where + Self: Unpin, + { + peek_exact(self, buf) + } + + /// Creates a future which will peek all the bytes from this `AsyncPeek`. + /// + /// On success the total number of bytes peek is returned. + /// + /// Equivalent to: + /// + /// ```ignore + /// async fn peek_to_end(&mut self, buf: &mut Vec) -> io::Result; + /// ``` + /// + /// All bytes peek from this source will be appended to the specified + /// buffer `buf`. This function will continuously call [`peek()`] to + /// append more data to `buf` until [`peek()`] returns `Ok(0)`. + /// + /// If successful, the total number of bytes read is returned. + /// + /// [`peek()`]: AsyncPeekable::peek + /// + /// # Errors + /// + /// If a peek error is encountered then the `peek_to_end` operation + /// immediately completes. Any bytes which have already been peek will + /// be appended to `buf`. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// use futures::io::Cursor; + /// use tokio::io::AsyncReadExt; + /// use tokio_util::compat::FuturesAsyncReadCompatExt; + /// use peekable::tokio::AsyncPeekExt; + /// + /// let mut peekable = Cursor::new([1, 2, 3, 4]).compat().peekable(); + /// let mut output = Vec::with_capacity(4); + /// + /// let bytes = peekable.peek_to_end(&mut output).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// + /// // read after peek + /// let mut output = Vec::with_capacity(4); + /// + /// let bytes = peekable.read_to_end(&mut output).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// # Ok::<(), std::io::Error>(()) + /// # } + /// ``` + pub fn peek_to_end<'a>(&'a mut self, buf: &'a mut Vec) -> PeekToEnd<'a, R> + where + Self: Unpin, + { + peek_to_end(self, buf) + } + + /// Creates a future which will peek all the bytes from this `AsyncPeek`. + /// + /// On success the total number of bytes peek is returned. + /// + /// # Examples + /// + /// ``` + /// # #[tokio::main] + /// # async fn main() -> std::io::Result<()> { + /// use futures::io::Cursor; + /// use tokio::io::AsyncReadExt; + /// use tokio_util::compat::FuturesAsyncReadCompatExt; + /// use peekable::tokio::AsyncPeekExt; + /// + /// let mut peekable = Cursor::new(&b"1234"[..]).compat().peekable(); + /// let mut buffer = String::with_capacity(4); + /// + /// let bytes = peekable.peek_to_string(&mut buffer).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(buffer, String::from("1234")); + /// + /// // read after peek + /// let mut buffer = String::with_capacity(4); + /// let bytes = peekable.peek_to_string(&mut buffer).await?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(buffer, String::from("1234")); + /// + /// # Ok::<(), std::io::Error>(()) + /// # } + /// ``` + pub fn peek_to_string<'a>(&'a mut self, dst: &'a mut String) -> PeekToString<'a, R> + where + Self: Unpin, + { + peek_to_string(self, dst) + } +} diff --git a/src/tokio/peek.rs b/src/tokio/peek.rs new file mode 100644 index 0000000..816b5ba --- /dev/null +++ b/src/tokio/peek.rs @@ -0,0 +1,57 @@ +use ::tokio::io::{AsyncRead, ReadBuf}; +use pin_project_lite::pin_project; + +use std::future::Future; +use std::io; +use std::marker::PhantomPinned; +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use super::{AsyncPeek, AsyncPeekable}; + +/// Tries to read some bytes directly into the given `buf` in asynchronous +/// manner, returning a future type. +/// +/// The returned future will resolve to both the I/O stream and the buffer +/// as well as the number of bytes read once the read operation is completed. +pub(crate) fn peek<'a, R>(reader: &'a mut AsyncPeekable, buf: &'a mut [u8]) -> Peek<'a, R> +where + R: AsyncRead + Unpin, +{ + Peek { + reader, + buf, + _pin: PhantomPinned, + } +} + +pin_project! { + /// A future which can be used to easily read available number of bytes to fill + /// a buffer. + /// + /// Created by the [`peek`] function. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Peek<'a, R> { + reader: &'a mut AsyncPeekable, + buf: &'a mut [u8], + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl Future for Peek<'_, R> +where + R: AsyncRead + Unpin, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let me = self.project(); + let mut buf = ReadBuf::new(me.buf); + ready!(Pin::new(me.reader).poll_peek(cx, &mut buf))?; + Poll::Ready(Ok(buf.filled().len())) + } +} diff --git a/src/tokio/peek_buf.rs b/src/tokio/peek_buf.rs new file mode 100644 index 0000000..e903974 --- /dev/null +++ b/src/tokio/peek_buf.rs @@ -0,0 +1,75 @@ +use super::{AsyncPeek, AsyncPeekable, AsyncRead}; + +use bytes::BufMut; +use pin_project_lite::pin_project; +use std::future::Future; +use std::io; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pub(crate) fn peek_buf<'a, R, B>( + peeker: &'a mut AsyncPeekable, + buf: &'a mut B, +) -> PeekBuf<'a, R, B> +where + R: AsyncRead + Unpin, + B: BufMut + ?Sized, +{ + PeekBuf { + peeker, + buf, + _pin: PhantomPinned, + } +} + +pin_project! { + /// Future returned by [`peek_buf`](crate::io::AsyncReadExt::peek_buf). + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PeekBuf<'a, R, B: ?Sized> { + peeker: &'a mut AsyncPeekable, + buf: &'a mut B, + #[pin] + _pin: PhantomPinned, + } +} + +impl Future for PeekBuf<'_, R, B> +where + R: AsyncRead + Unpin, + B: BufMut + ?Sized, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + use ::tokio::io::ReadBuf; + use std::mem::MaybeUninit; + + let me = self.project(); + + if !me.buf.has_remaining_mut() { + return Poll::Ready(Ok(0)); + } + + let n = { + let dst = me.buf.chunk_mut(); + let dst = unsafe { &mut *(dst as *mut _ as *mut [MaybeUninit]) }; + let mut buf = ReadBuf::uninit(dst); + let ptr = buf.filled().as_ptr(); + ready!(Pin::new(me.peeker).poll_peek(cx, &mut buf)?); + + // Ensure the pointer does not change from under us + assert_eq!(ptr, buf.filled().as_ptr()); + buf.filled().len() + }; + + // Safety: This is guaranteed to be the number of initialized (and peek) + // bytes due to the invariants provided by `PeekBuf::filled`. + unsafe { + me.buf.advance_mut(n); + } + + Poll::Ready(Ok(n)) + } +} diff --git a/src/tokio/peek_exact.rs b/src/tokio/peek_exact.rs new file mode 100644 index 0000000..04be198 --- /dev/null +++ b/src/tokio/peek_exact.rs @@ -0,0 +1,73 @@ +use ::tokio::io::{AsyncRead, ReadBuf}; +use pin_project_lite::pin_project; +use std::future::Future; +use std::io; +use std::marker::PhantomPinned; +use std::marker::Unpin; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::io::AsyncReadExt; + +use super::AsyncPeekable; + +/// A future which can be used to easily read exactly enough bytes to fill +/// a buffer. +/// +/// Created by the [`AsyncPeekExt::peek_exact`][peek_exact]. +/// [peek_exact]: [super::AsyncPeekExt::peek_exact] +pub(crate) fn peek_exact<'a, A>( + reader: &'a mut AsyncPeekable, + buf: &'a mut [u8], +) -> PeekExact<'a, A> +where + A: AsyncRead + Unpin, +{ + PeekExact { + reader, + buf: ReadBuf::new(buf), + _pin: PhantomPinned, + } +} + +pin_project! { + /// Creates a future which will read exactly enough bytes to fill `buf`, + /// returning an error if EOF is hit sooner. + /// + /// On success the number of bytes is returned + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PeekExact<'a, A> { + reader: &'a mut AsyncPeekable, + buf: ReadBuf<'a>, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +impl Future for PeekExact<'_, A> +where + A: AsyncRead + Unpin, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let me = self.project(); + + let buf_len = me.buf.remaining(); + let peek_buf_len = me.reader.buffer.len(); + + if buf_len <= peek_buf_len { + me.buf.put_slice(&me.reader.buffer[..buf_len]); + return Poll::Ready(Ok(buf_len)); + } + + me.buf.put_slice(&me.reader.buffer); + + let fut = me + .reader + .read_exact(me.buf.initialize_unfilled()); + ::tokio::pin!(fut); + fut.poll(cx) + } +} diff --git a/src/tokio/peek_to_end.rs b/src/tokio/peek_to_end.rs new file mode 100644 index 0000000..9e72352 --- /dev/null +++ b/src/tokio/peek_to_end.rs @@ -0,0 +1,66 @@ +use ::tokio::io::{AsyncRead, AsyncReadExt}; +use pin_project_lite::pin_project; + +use std::future::Future; +use std::io; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use super::AsyncPeekable; + +pin_project! { + /// Peek to end + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PeekToEnd<'a, R> { + reader: &'a mut AsyncPeekable, + buf: &'a mut Vec, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +pub(crate) fn peek_to_end<'a, R>( + reader: &'a mut AsyncPeekable, + buffer: &'a mut Vec, +) -> PeekToEnd<'a, R> +where + R: AsyncRead + Unpin, +{ + PeekToEnd { + reader, + buf: buffer, + _pin: PhantomPinned, + } +} + +impl Future for PeekToEnd<'_, A> +where + A: AsyncRead + Unpin, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + + let original_buf_len = me.buf.len(); + let peek_buf_len = me.reader.buffer.len(); + me.buf.extend_from_slice(&me.reader.buffer); + + let fut = me.reader.peeker.read_to_end(me.buf); + tokio::pin!(fut); + match fut.poll(cx) { + Poll::Ready(Ok(read)) => { + me.reader.buffer.extend_from_slice(&me.buf[original_buf_len + peek_buf_len..]); + Poll::Ready(Ok(peek_buf_len + read)) + } + Poll::Ready(Err(e)) => { + me.reader.buffer.extend_from_slice(&me.buf[original_buf_len + peek_buf_len..]); + Poll::Ready(Err(e)) + }, + Poll::Pending => Poll::Pending, + } + } +} diff --git a/src/tokio/peek_to_string.rs b/src/tokio/peek_to_string.rs new file mode 100644 index 0000000..9bac39f --- /dev/null +++ b/src/tokio/peek_to_string.rs @@ -0,0 +1,75 @@ +use super::AsyncPeekable; +use ::tokio::io::{AsyncRead, AsyncReadExt}; + +use pin_project_lite::pin_project; +use std::future::Future; +use std::io; +use std::marker::PhantomPinned; +use std::pin::Pin; +use std::task::{Context, Poll}; + +pin_project! { + /// Future for the [`peek_to_string`](super::AsyncPeekExt::peek_to_string) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PeekToString<'a, R> { + reader: &'a mut AsyncPeekable, + // This is the buffer we were provided. It will be replaced with an empty string + // while reading to postpone utf-8 handling until after reading. + output: &'a mut String, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } +} + +pub(crate) fn peek_to_string<'a, R>( + reader: &'a mut AsyncPeekable, + string: &'a mut String, +) -> PeekToString<'a, R> +where + R: AsyncRead + Unpin, +{ + PeekToString { + reader, + output: string, + _pin: PhantomPinned, + } +} + +impl Future for PeekToString<'_, A> +where + A: AsyncRead + Unpin, +{ + type Output = io::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let me = self.project(); + + let s = match core::str::from_utf8(&me.reader.buffer) { + Ok(s) => s, + Err(e) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::InvalidData, e))), + }; + + let original_buf_len = me.output.len(); + let peek_buf_len = me.reader.buffer.len(); + me.output.push_str(s); + let fut = me.reader.peeker.read_to_string(me.output); + ::tokio::pin!(fut); + match fut.poll(cx) { + Poll::Ready(Ok(read)) => { + me.reader + .buffer + .extend_from_slice(&me.output.as_bytes()[original_buf_len + peek_buf_len..]); + Poll::Ready(Ok(peek_buf_len + read)) + } + Poll::Ready(Err(e)) => { + me.reader + .buffer + .extend_from_slice(&me.output.as_bytes()[original_buf_len + peek_buf_len..]); + Poll::Ready(Err(e)) + }, + Poll::Pending => Poll::Pending, + } + } +} diff --git a/tests/foo.rs b/tests/foo.rs deleted file mode 100644 index 8b13789..0000000 --- a/tests/foo.rs +++ /dev/null @@ -1 +0,0 @@ - From 39319ca99245a96300670305cf752af2aadcb6d3 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Thu, 25 Jan 2024 01:15:58 +0800 Subject: [PATCH 2/4] WIP --- src/future/peek_exact.rs | 4 +--- src/future/peek_to_end.rs | 10 ++++++++-- src/future/peek_to_string.rs | 8 ++++++-- src/lib.rs | 8 +++++--- src/tokio.rs | 4 ++-- src/tokio/peek_exact.rs | 4 +--- src/tokio/peek_to_end.rs | 10 +++++++--- src/tokio/peek_to_string.rs | 2 +- 8 files changed, 31 insertions(+), 19 deletions(-) diff --git a/src/future/peek_exact.rs b/src/future/peek_exact.rs index 631dbcb..5baa8ee 100644 --- a/src/future/peek_exact.rs +++ b/src/future/peek_exact.rs @@ -39,9 +39,7 @@ impl Future for PeekExact<'_, P> { } this.buf[..peek_buf_len].copy_from_slice(&this.peekable.buffer); - let mut fut = this - .peekable - .read_exact(&mut this.buf[peek_buf_len..]); + let mut fut = this.peekable.read_exact(&mut this.buf[peek_buf_len..]); fut.poll_unpin(cx) } diff --git a/src/future/peek_to_end.rs b/src/future/peek_to_end.rs index 52f687f..22c840e 100644 --- a/src/future/peek_to_end.rs +++ b/src/future/peek_to_end.rs @@ -40,11 +40,17 @@ where let mut fut = this.peekable.reader.read_to_end(this.buf); match fut.poll_unpin(cx) { Poll::Ready(Ok(read)) => { - this.peekable.buffer.extend_from_slice(&this.buf[original_buf_len + inbuf..]); + this + .peekable + .buffer + .extend_from_slice(&this.buf[original_buf_len + inbuf..]); Poll::Ready(Ok(read + inbuf)) } Poll::Ready(Err(e)) => { - this.peekable.buffer.extend_from_slice(&this.buf[original_buf_len + inbuf..]); + this + .peekable + .buffer + .extend_from_slice(&this.buf[original_buf_len + inbuf..]); Poll::Ready(Err(e)) } Poll::Pending => Poll::Pending, diff --git a/src/future/peek_to_string.rs b/src/future/peek_to_string.rs index e1fed51..b0f0a50 100644 --- a/src/future/peek_to_string.rs +++ b/src/future/peek_to_string.rs @@ -49,11 +49,15 @@ where let mut fut = reader.reader.read_to_string(buf); match fut.poll_unpin(cx) { Poll::Ready(Ok(read)) => { - reader.buffer.extend_from_slice(&buf.as_bytes()[original_buf_len + inbuf..]); + reader + .buffer + .extend_from_slice(&buf.as_bytes()[original_buf_len + inbuf..]); Poll::Ready(Ok(read + inbuf)) } Poll::Ready(Err(e)) => { - reader.buffer.extend_from_slice(&buf.as_bytes()[original_buf_len + inbuf..]); + reader + .buffer + .extend_from_slice(&buf.as_bytes()[original_buf_len + inbuf..]); Poll::Ready(Err(e)) } Poll::Pending => Poll::Pending, diff --git a/src/lib.rs b/src/lib.rs index 8167f4b..d795c2f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,10 @@ #![cfg_attr(docsrs, allow(unused_attributes))] #![deny(missing_docs)] -use std::{cmp, io::{IoSliceMut, Read, Result}}; +use std::{ + cmp, + io::{IoSliceMut, Read, Result}, +}; #[cfg(feature = "smallvec")] type Buffer = smallvec::SmallVec<[u8; 64]>; @@ -436,8 +439,7 @@ impl Peekable { } buf[..buf_len].copy_from_slice(&this.buffer); - this - .read_exact(buf) + this.read_exact(buf) } } diff --git a/src/tokio.rs b/src/tokio.rs index 679d607..524e665 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -414,7 +414,7 @@ impl AsyncPeekable { /// Creates a future which will peek all the bytes from this `AsyncPeek`. /// /// On success the total number of bytes peek is returned. - /// + /// /// Equivalent to: /// /// ```ignore @@ -434,7 +434,7 @@ impl AsyncPeekable { /// If a peek error is encountered then the `peek_to_end` operation /// immediately completes. Any bytes which have already been peek will /// be appended to `buf`. - /// + /// /// # Examples /// /// ``` diff --git a/src/tokio/peek_exact.rs b/src/tokio/peek_exact.rs index 04be198..e23bb7f 100644 --- a/src/tokio/peek_exact.rs +++ b/src/tokio/peek_exact.rs @@ -64,9 +64,7 @@ where me.buf.put_slice(&me.reader.buffer); - let fut = me - .reader - .read_exact(me.buf.initialize_unfilled()); + let fut = me.reader.read_exact(me.buf.initialize_unfilled()); ::tokio::pin!(fut); fut.poll(cx) } diff --git a/src/tokio/peek_to_end.rs b/src/tokio/peek_to_end.rs index 9e72352..cbb9689 100644 --- a/src/tokio/peek_to_end.rs +++ b/src/tokio/peek_to_end.rs @@ -53,13 +53,17 @@ where tokio::pin!(fut); match fut.poll(cx) { Poll::Ready(Ok(read)) => { - me.reader.buffer.extend_from_slice(&me.buf[original_buf_len + peek_buf_len..]); + me.reader + .buffer + .extend_from_slice(&me.buf[original_buf_len + peek_buf_len..]); Poll::Ready(Ok(peek_buf_len + read)) } Poll::Ready(Err(e)) => { - me.reader.buffer.extend_from_slice(&me.buf[original_buf_len + peek_buf_len..]); + me.reader + .buffer + .extend_from_slice(&me.buf[original_buf_len + peek_buf_len..]); Poll::Ready(Err(e)) - }, + } Poll::Pending => Poll::Pending, } } diff --git a/src/tokio/peek_to_string.rs b/src/tokio/peek_to_string.rs index 9bac39f..af6715a 100644 --- a/src/tokio/peek_to_string.rs +++ b/src/tokio/peek_to_string.rs @@ -68,7 +68,7 @@ where .buffer .extend_from_slice(&me.output.as_bytes()[original_buf_len + peek_buf_len..]); Poll::Ready(Err(e)) - }, + } Poll::Pending => Poll::Pending, } } From 37f780ae3d157dc639b2fbf4132537e7f9276dac Mon Sep 17 00:00:00 2001 From: Al Liu Date: Thu, 25 Jan 2024 01:56:21 +0800 Subject: [PATCH 3/4] Finish basic functionalities --- README.md | 2 +- src/future/peek_exact.rs | 21 ++++- src/lib.rs | 187 +++++++++++++++++++++++++++++---------- src/tokio/peek_exact.rs | 25 +++++- 4 files changed, 178 insertions(+), 57 deletions(-) diff --git a/README.md b/README.md index 9ab4e15..32ea38a 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@

-WIP: Peakable reader and async reader +Peakable reader and async reader [github][Github-url] [Build][CI-url] diff --git a/src/future/peek_exact.rs b/src/future/peek_exact.rs index 5baa8ee..09a1390 100644 --- a/src/future/peek_exact.rs +++ b/src/future/peek_exact.rs @@ -1,9 +1,9 @@ -use futures_util::{AsyncRead, AsyncReadExt, FutureExt}; +use futures_util::AsyncRead; use super::AsyncPeekable; use std::{ future::Future, - io, + io, mem, pin::Pin, task::{Context, Poll}, }; @@ -39,8 +39,21 @@ impl Future for PeekExact<'_, P> { } this.buf[..peek_buf_len].copy_from_slice(&this.peekable.buffer); - let mut fut = this.peekable.read_exact(&mut this.buf[peek_buf_len..]); - fut.poll_unpin(cx) + let mut readed = peek_buf_len; + while !this.buf.is_empty() { + let n = ready!(Pin::new(&mut this.peekable.reader).poll_read(cx, this.buf))?; + { + let (read, rest) = mem::take(&mut this.buf).split_at_mut(n); + this.peekable.buffer.extend_from_slice(read); + readed += n; + this.buf = rest; + } + if n == 0 && readed != buf_len { + return Poll::Ready(Err(io::ErrorKind::UnexpectedEof.into())); + } + } + + Poll::Ready(Ok(())) } } diff --git a/src/lib.rs b/src/lib.rs index d795c2f..f83a009 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ use std::{ cmp, io::{IoSliceMut, Read, Result}, + mem, }; #[cfg(feature = "smallvec")] @@ -184,28 +185,48 @@ impl Peekable { /// /// # Examples /// - /// [`File`]s implement `Read`: /// /// [`Ok(n)`]: Ok /// [`File`]: std::fs::File /// [`TcpStream`]: std::net::TcpStream /// - /// ```no_run + /// ```rust /// use std::io; - /// use std::io::prelude::*; - /// use std::fs::File; + /// use std::io::{Cursor, Read}; /// use peekable::PeekExt; /// - /// fn main() -> io::Result<()> { - /// let mut f = File::open("foo.txt")?.peekable(); - /// let mut buffer = [0; 10]; + /// # fn main() -> io::Result<()> { /// - /// // peek up to 10 bytes - /// let n = f.peek(&mut buffer[..])?; + /// let mut peekable = Cursor::new([1, 2, 3, 4]).peekable(); + /// let mut output = [0u8; 5]; /// - /// println!("The bytes: {:?}", &buffer[..n]); - /// Ok(()) - /// } + /// let bytes = peekable.peek(&mut output[..3])?; + /// + /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous + /// // reader. In a real system you could get anywhere from 1 to + /// // `output.len()` bytes in a single read. + /// assert_eq!(bytes, 3); + /// assert_eq!(output, [1, 2, 3, 0, 0]); + /// + /// // you can peek mutiple times + /// + /// let bytes = peekable.peek(&mut output[..])?; + /// assert_eq!(bytes, 4); + /// assert_eq!(output, [1, 2, 3, 4, 0]); + /// + /// // you can read after peek + /// let mut output = [0u8; 5]; + /// let bytes = peekable.read(&mut output[..2])?; + /// assert_eq!(bytes, 2); + /// assert_eq!(output, [1, 2, 0, 0, 0]); + /// + /// // peek after read + /// let mut output = [0u8; 5]; + /// let bytes = peekable.peek(&mut output[..])?; + /// assert_eq!(bytes, 2); + /// assert_eq!(output, [3, 4, 0, 0, 0]); + /// # Ok(()) + /// # } /// ``` pub fn peek(&mut self, buf: &mut [u8]) -> std::io::Result { let want_peek = buf.len(); @@ -288,26 +309,32 @@ impl Peekable { /// /// # Examples /// - /// [`File`]s implement `Peek`: - /// - /// [`peek()`]: Peek::peek + /// [`peek()`]: Peekable::peek /// [`Ok(0)`]: Ok - /// [`File`]: crate::fs::File /// - /// ```no_run + /// ```rust /// use std::io; - /// use std::io::prelude::*; - /// use std::fs::File; + /// use std::io::{Cursor, Read}; /// use peekable::PeekExt; /// - /// fn main() -> io::Result<()> { - /// let mut f = File::open("foo.txt")?.peekable(); - /// let mut buffer = Vec::new(); + /// # fn main() -> io::Result<()> { + /// let mut peekable = Cursor::new([1, 2, 3, 4]).peekable(); + /// let mut output = Vec::with_capacity(4); + /// + /// let bytes = peekable.peek_to_end(&mut output)?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// + /// // read after peek + /// let mut output = Vec::with_capacity(4); /// - /// // peek the whole file - /// f.peek_to_end(&mut buffer)?; - /// Ok(()) - /// } + /// let bytes = peekable.read_to_end(&mut output)?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(output, vec![1, 2, 3, 4]); + /// # Ok(()) + /// # } /// ``` pub fn peek_to_end(&mut self, buf: &mut Vec) -> Result { let this = &mut *self; @@ -343,19 +370,29 @@ impl Peekable { /// # Examples /// /// - /// ```no_run + /// ```rust /// use std::io; - /// use std::io::prelude::*; - /// use std::fs::File; + /// use std::io::{Cursor, Read}; /// use peekable::PeekExt; /// - /// fn main() -> io::Result<()> { - /// let mut f = File::open("foo.txt")?.peekable(); - /// let mut buffer = String::new(); + /// # fn main() -> io::Result<()> { + /// + /// let mut peekable = Cursor::new(&b"1234"[..]).peekable(); + /// let mut buffer = String::with_capacity(4); + /// + /// let bytes = peekable.peek_to_string(&mut buffer)?; + /// + /// assert_eq!(bytes, 4); + /// assert_eq!(buffer, String::from("1234")); + /// + /// // read after peek + /// let mut buffer = String::with_capacity(4); + /// let bytes = peekable.peek_to_string(&mut buffer)?; /// - /// f.peek_to_string(&mut buffer)?; - /// Ok(()) - /// } + /// assert_eq!(bytes, 4); + /// assert_eq!(buffer, String::from("1234")); + /// # Ok(()) + /// # } /// ``` pub fn peek_to_string(&mut self, buf: &mut String) -> Result { let s = match core::str::from_utf8(&self.buffer) { @@ -412,22 +449,62 @@ impl Peekable { /// /// # Examples /// - /// ```no_run + /// ```rust + /// use std::io; + /// use std::io::{Cursor, Read}; + /// use peekable::PeekExt; + /// + /// # fn main() -> io::Result<()> { + /// + /// let mut peekable = Cursor::new([1, 2, 3, 4]).peekable(); + /// let mut output = [0u8; 4]; + /// + /// peekable.peek_exact(&mut output)?; + /// + /// assert_eq!(output, [1, 2, 3, 4]); + /// + /// // read after peek + /// let mut output = [0u8; 2]; + /// + /// peekable.read_exact(&mut output[..])?; + /// + /// assert_eq!(output, [1, 2]); + /// + /// // peek after read + /// let mut output = [0u8; 2]; + /// peekable.peek_exact(&mut output)?; + /// + /// assert_eq!(output, [3, 4]); + /// # Ok(()) + /// # } + /// ``` + /// + /// ## EOF is hit before `buf` is filled + /// + /// ``` /// use std::io; - /// use std::io::prelude::*; - /// use std::fs::File; + /// use std::io::{Cursor, Read}; /// use peekable::PeekExt; /// - /// fn main() -> io::Result<()> { - /// let mut f = File::open("foo.txt")?.peekable(); - /// let mut buffer = [0; 10]; + /// # fn main() -> io::Result<()> { + /// + /// let mut peekable = Cursor::new([1, 2, 3, 4]).peekable(); + /// let mut output = [0u8; 5]; + /// + /// let result = peekable.peek_exact(&mut output); + /// assert_eq!( + /// result.unwrap_err().kind(), + /// std::io::ErrorKind::UnexpectedEof + /// ); + /// + /// let result = peekable.peek_exact(&mut output[..4]); + /// assert!(result.is_ok()); + /// assert_eq!(output, [1, 2, 3, 4, 0]); /// - /// // peek exactly 10 bytes - /// f.peek_exact(&mut buffer)?; - /// Ok(()) - /// } + /// # Ok(()) + /// # } /// ``` - pub fn peek_exact(&mut self, buf: &mut [u8]) -> Result<()> { + pub fn peek_exact(&mut self, mut buf: &mut [u8]) -> Result<()> { let this = self; let buf_len = buf.len(); @@ -438,8 +515,22 @@ impl Peekable { return Ok(()); } - buf[..buf_len].copy_from_slice(&this.buffer); - this.read_exact(buf) + buf[..peek_buf_len].copy_from_slice(&this.buffer); + let mut readed = peek_buf_len; + while !buf.is_empty() { + let n = this.peeker.read(buf)?; + { + let (read, rest) = mem::take(&mut buf).split_at_mut(n); + this.buffer.extend_from_slice(read); + readed += n; + buf = rest; + } + if n == 0 && readed != buf_len { + return Err(std::io::ErrorKind::UnexpectedEof.into()); + } + } + + Ok(()) } } diff --git a/src/tokio/peek_exact.rs b/src/tokio/peek_exact.rs index e23bb7f..b1bd080 100644 --- a/src/tokio/peek_exact.rs +++ b/src/tokio/peek_exact.rs @@ -6,7 +6,6 @@ use std::marker::PhantomPinned; use std::marker::Unpin; use std::pin::Pin; use std::task::{Context, Poll}; -use tokio::io::AsyncReadExt; use super::AsyncPeekable; @@ -45,6 +44,10 @@ pin_project! { } } +fn eof() -> io::Error { + io::Error::new(io::ErrorKind::UnexpectedEof, "early eof") +} + impl Future for PeekExact<'_, A> where A: AsyncRead + Unpin, @@ -63,9 +66,23 @@ where } me.buf.put_slice(&me.reader.buffer); + let mut readed = me.reader.buffer.len(); + + while me.buf.remaining() != 0 { + let before = me.buf.filled().len(); + ready!(Pin::new(&mut me.reader.peeker).poll_read(cx, me.buf))?; + let after = me.buf.filled().len(); + let n = after - before; + readed += n; + me.reader + .buffer + .extend_from_slice(&me.buf.filled()[before..after]); + + if n == 0 && readed != buf_len { + return Err(eof()).into(); + } + } - let fut = me.reader.read_exact(me.buf.initialize_unfilled()); - ::tokio::pin!(fut); - fut.poll(cx) + Poll::Ready(Ok(me.buf.capacity())) } } From 0dcdaac244610da9ad4a73f0fbd5d881623bb859 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Thu, 25 Jan 2024 02:04:44 +0800 Subject: [PATCH 4/4] Polish --- src/future/peek_to_string.rs | 18 ++++++++-------- src/lib.rs | 30 +++++++++++++------------- src/tokio.rs | 22 +++++++++---------- src/tokio/peek.rs | 32 +++++++++++++-------------- src/tokio/peek_buf.rs | 18 ++++++++-------- src/tokio/peek_exact.rs | 42 ++++++++++++++++++------------------ src/tokio/peek_to_end.rs | 36 +++++++++++++++---------------- src/tokio/peek_to_string.rs | 38 ++++++++++++++++---------------- 8 files changed, 118 insertions(+), 118 deletions(-) diff --git a/src/future/peek_to_string.rs b/src/future/peek_to_string.rs index b0f0a50..a2b8450 100644 --- a/src/future/peek_to_string.rs +++ b/src/future/peek_to_string.rs @@ -12,15 +12,15 @@ use std::{ #[derive(Debug)] #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct PeekToString<'a, P> { - reader: &'a mut AsyncPeekable

, + peekable: &'a mut AsyncPeekable

, buf: &'a mut String, } impl Unpin for PeekToString<'_, P> {} impl<'a, P: AsyncRead + Unpin> PeekToString<'a, P> { - pub(super) fn new(reader: &'a mut AsyncPeekable

, buf: &'a mut String) -> Self { - Self { reader, buf } + pub(super) fn new(peekable: &'a mut AsyncPeekable

, buf: &'a mut String) -> Self { + Self { peekable, buf } } } @@ -31,8 +31,8 @@ where type Output = io::Result; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let Self { reader, buf } = &mut *self; - let s = match core::str::from_utf8(&reader.buffer) { + let Self { peekable, buf } = &mut *self; + let s = match core::str::from_utf8(&peekable.buffer) { Ok(s) => s, Err(_) => { return Poll::Ready(Err(io::Error::new( @@ -45,17 +45,17 @@ where let original_buf_len = buf.len(); buf.push_str(s); - let inbuf = reader.buffer.len(); - let mut fut = reader.reader.read_to_string(buf); + let inbuf = peekable.buffer.len(); + let mut fut = peekable.reader.read_to_string(buf); match fut.poll_unpin(cx) { Poll::Ready(Ok(read)) => { - reader + peekable .buffer .extend_from_slice(&buf.as_bytes()[original_buf_len + inbuf..]); Poll::Ready(Ok(read + inbuf)) } Poll::Ready(Err(e)) => { - reader + peekable .buffer .extend_from_slice(&buf.as_bytes()[original_buf_len + inbuf..]); Poll::Ready(Err(e)) diff --git a/src/lib.rs b/src/lib.rs index f83a009..d8b3bc5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -53,10 +53,10 @@ pub mod future; #[cfg_attr(docsrs, doc(cfg(feature = "tokio")))] pub mod tokio; -/// A wrapper around an [`Read`] types to make them support [`Peek`] methods. +/// A wrapper around an [`Read`] types to make them support peek related methods. pub struct Peekable { - /// The inner peeker. - peeker: R, + /// The inner reader. + reader: R, /// The buffer used to store peeked bytes. buffer: Buffer, } @@ -83,7 +83,7 @@ impl Read for Peekable { cmp::Ordering::Greater => { buf[..buffer_len].copy_from_slice(&this.buffer); buf = &mut buf[buffer_len..]; - match this.peeker.read(buf) { + match this.reader.read(buf) { Ok(bytes) => { this.buffer.clear(); Ok(bytes + buffer_len) @@ -94,14 +94,14 @@ impl Read for Peekable { }; } - this.peeker.read(buf) + this.reader.read(buf) } } impl From for Peekable { - fn from(peeker: R) -> Self { + fn from(reader: R) -> Self { Peekable { - peeker, + reader, buffer: Buffer::new(), } } @@ -111,7 +111,7 @@ impl Peekable { /// Creates a new peekable wrapper around the given reader. pub fn new(reader: R) -> Self { Self { - peeker: reader, + reader, buffer: Buffer::new(), } } @@ -120,7 +120,7 @@ impl Peekable { /// capacity for the peek buffer. pub fn with_capacity(reader: R, capacity: usize) -> Self { Self { - peeker: reader, + reader, buffer: Buffer::with_capacity(capacity), } } @@ -247,7 +247,7 @@ impl Peekable { cmp::Ordering::Greater => { let this = self; this.buffer.resize(want_peek, 0); - match this.peeker.read(&mut this.buffer[buffer_len..]) { + match this.reader.read(&mut this.buffer[buffer_len..]) { Ok(n) => { this.buffer.truncate(n + buffer_len); buf[..buffer_len + n].copy_from_slice(&this.buffer); @@ -260,7 +260,7 @@ impl Peekable { } let this = self; - match this.peeker.read(buf) { + match this.reader.read(buf) { Ok(bytes) => { this.buffer.extend_from_slice(&buf[..bytes]); Ok(bytes) @@ -343,7 +343,7 @@ impl Peekable { let original_buf = buf.len(); buf.extend_from_slice(&this.buffer); - let fut = this.peeker.read_to_end(buf); + let fut = this.reader.read_to_end(buf); match fut { Ok(read) => { this.buffer.extend_from_slice(&buf[original_buf + inbuf..]); @@ -408,7 +408,7 @@ impl Peekable { buf.push_str(s); let inbuf = self.buffer.len(); - let fut = self.peeker.read_to_string(buf); + let fut = self.reader.read_to_string(buf); match fut { Ok(read) => { self.buffer.extend_from_slice(&buf.as_bytes()[inbuf..]); @@ -518,7 +518,7 @@ impl Peekable { buf[..peek_buf_len].copy_from_slice(&this.buffer); let mut readed = peek_buf_len; while !buf.is_empty() { - let n = this.peeker.read(buf)?; + let n = this.reader.read(buf)?; { let (read, rest) = mem::take(&mut buf).split_at_mut(n); this.buffer.extend_from_slice(read); @@ -544,7 +544,7 @@ pub trait PeekExt: Read { Self: Sized, { Peekable { - peeker: self, + reader: self, buffer: Buffer::new(), } } diff --git a/src/tokio.rs b/src/tokio.rs index 524e665..9ac2e79 100644 --- a/src/tokio.rs +++ b/src/tokio.rs @@ -115,15 +115,15 @@ pin_project_lite::pin_project! { #[derive(Debug)] pub struct AsyncPeekable { #[pin] - peeker: R, + reader: R, buffer: Buffer, } } impl From for AsyncPeekable { - fn from(peeker: R) -> Self { + fn from(reader: R) -> Self { Self { - peeker, + reader, buffer: Buffer::new(), } } @@ -145,7 +145,7 @@ impl AsyncRead for AsyncPeekable { // Continue peeking into the buffer if there's still space buf.put_slice(this.buffer); - match this.peeker.poll_read(cx, buf) { + match this.reader.poll_read(cx, buf) { Poll::Ready(Ok(())) => { this.buffer.clear(); Poll::Ready(Ok(())) @@ -174,7 +174,7 @@ impl AsyncRead for AsyncPeekable { }; } - this.peeker.poll_read(cx, buf) + this.reader.poll_read(cx, buf) } } @@ -194,7 +194,7 @@ impl AsyncPeek for AsyncPeekable { // Not enough data in the buffer, need to peek more buf.put_slice(this.buffer); let cur = buf.filled().len(); - match this.peeker.poll_read(cx, buf) { + match this.reader.poll_read(cx, buf) { Poll::Ready(Ok(())) => { let filled = buf.filled(); let read = filled.len() - cur; @@ -216,7 +216,7 @@ impl AsyncPeek for AsyncPeekable { } else { // No data in the buffer, try to peek directly into `buf` let cur = buf.filled().len(); - match this.peeker.poll_read(cx, buf) { + match this.reader.poll_read(cx, buf) { Poll::Ready(Ok(())) => { let filled = buf.filled(); let read = filled.len() - cur; @@ -235,7 +235,7 @@ impl AsyncPeekable { #[inline] pub fn new(reader: R) -> Self { Self { - peeker: reader, + reader, buffer: Buffer::new(), } } @@ -245,7 +245,7 @@ impl AsyncPeekable { #[inline] pub fn with_capacity(reader: R, capacity: usize) -> Self { Self { - peeker: reader, + reader, buffer: Buffer::with_capacity(capacity), } } @@ -253,13 +253,13 @@ impl AsyncPeekable { /// An extension trait which adds peek related utility methods to [`AsyncRead`] types pub trait AsyncPeekExt: AsyncRead { - /// Creates a new `AsyncPeekable` which will wrap the given peeker. + /// Creates a new `AsyncPeekable` which will wrap the given reader. fn peekable(self) -> AsyncPeekable where Self: Sized, { AsyncPeekable { - peeker: self, + reader: self, buffer: Buffer::new(), } } diff --git a/src/tokio/peek.rs b/src/tokio/peek.rs index 816b5ba..bbc0db0 100644 --- a/src/tokio/peek.rs +++ b/src/tokio/peek.rs @@ -15,31 +15,31 @@ use super::{AsyncPeek, AsyncPeekable}; /// /// The returned future will resolve to both the I/O stream and the buffer /// as well as the number of bytes read once the read operation is completed. -pub(crate) fn peek<'a, R>(reader: &'a mut AsyncPeekable, buf: &'a mut [u8]) -> Peek<'a, R> +pub(crate) fn peek<'a, R>(peekable: &'a mut AsyncPeekable, buf: &'a mut [u8]) -> Peek<'a, R> where R: AsyncRead + Unpin, { Peek { - reader, + peekable, buf, _pin: PhantomPinned, } } pin_project! { - /// A future which can be used to easily read available number of bytes to fill - /// a buffer. - /// - /// Created by the [`peek`] function. - #[derive(Debug)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct Peek<'a, R> { - reader: &'a mut AsyncPeekable, - buf: &'a mut [u8], - // Make this future `!Unpin` for compatibility with async trait methods. - #[pin] - _pin: PhantomPinned, - } + /// A future which can be used to easily read available number of bytes to fill + /// a buffer. + /// + /// Created by the [`peek`] function. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Peek<'a, R> { + peekable: &'a mut AsyncPeekable, + buf: &'a mut [u8], + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } } impl Future for Peek<'_, R> @@ -51,7 +51,7 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let me = self.project(); let mut buf = ReadBuf::new(me.buf); - ready!(Pin::new(me.reader).poll_peek(cx, &mut buf))?; + ready!(Pin::new(me.peekable).poll_peek(cx, &mut buf))?; Poll::Ready(Ok(buf.filled().len())) } } diff --git a/src/tokio/peek_buf.rs b/src/tokio/peek_buf.rs index e903974..f576519 100644 --- a/src/tokio/peek_buf.rs +++ b/src/tokio/peek_buf.rs @@ -24,15 +24,15 @@ where } pin_project! { - /// Future returned by [`peek_buf`](crate::io::AsyncReadExt::peek_buf). - #[derive(Debug)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct PeekBuf<'a, R, B: ?Sized> { - peeker: &'a mut AsyncPeekable, - buf: &'a mut B, - #[pin] - _pin: PhantomPinned, - } + /// Future returned by [`peek_buf`](crate::io::AsyncReadExt::peek_buf). + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PeekBuf<'a, R, B: ?Sized> { + peeker: &'a mut AsyncPeekable, + buf: &'a mut B, + #[pin] + _pin: PhantomPinned, + } } impl Future for PeekBuf<'_, R, B> diff --git a/src/tokio/peek_exact.rs b/src/tokio/peek_exact.rs index b1bd080..ce538bc 100644 --- a/src/tokio/peek_exact.rs +++ b/src/tokio/peek_exact.rs @@ -15,33 +15,33 @@ use super::AsyncPeekable; /// Created by the [`AsyncPeekExt::peek_exact`][peek_exact]. /// [peek_exact]: [super::AsyncPeekExt::peek_exact] pub(crate) fn peek_exact<'a, A>( - reader: &'a mut AsyncPeekable, + peekable: &'a mut AsyncPeekable, buf: &'a mut [u8], ) -> PeekExact<'a, A> where A: AsyncRead + Unpin, { PeekExact { - reader, + peekable, buf: ReadBuf::new(buf), _pin: PhantomPinned, } } pin_project! { - /// Creates a future which will read exactly enough bytes to fill `buf`, - /// returning an error if EOF is hit sooner. - /// - /// On success the number of bytes is returned - #[derive(Debug)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct PeekExact<'a, A> { - reader: &'a mut AsyncPeekable, - buf: ReadBuf<'a>, - // Make this future `!Unpin` for compatibility with async trait methods. - #[pin] - _pin: PhantomPinned, - } + /// Creates a future which will read exactly enough bytes to fill `buf`, + /// returning an error if EOF is hit sooner. + /// + /// On success the number of bytes is returned + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PeekExact<'a, A> { + peekable: &'a mut AsyncPeekable, + buf: ReadBuf<'a>, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } } fn eof() -> io::Error { @@ -58,23 +58,23 @@ where let me = self.project(); let buf_len = me.buf.remaining(); - let peek_buf_len = me.reader.buffer.len(); + let peek_buf_len = me.peekable.buffer.len(); if buf_len <= peek_buf_len { - me.buf.put_slice(&me.reader.buffer[..buf_len]); + me.buf.put_slice(&me.peekable.buffer[..buf_len]); return Poll::Ready(Ok(buf_len)); } - me.buf.put_slice(&me.reader.buffer); - let mut readed = me.reader.buffer.len(); + me.buf.put_slice(&me.peekable.buffer); + let mut readed = me.peekable.buffer.len(); while me.buf.remaining() != 0 { let before = me.buf.filled().len(); - ready!(Pin::new(&mut me.reader.peeker).poll_read(cx, me.buf))?; + ready!(Pin::new(&mut me.peekable.reader).poll_read(cx, me.buf))?; let after = me.buf.filled().len(); let n = after - before; readed += n; - me.reader + me.peekable .buffer .extend_from_slice(&me.buf.filled()[before..after]); diff --git a/src/tokio/peek_to_end.rs b/src/tokio/peek_to_end.rs index cbb9689..7cfcb95 100644 --- a/src/tokio/peek_to_end.rs +++ b/src/tokio/peek_to_end.rs @@ -10,27 +10,27 @@ use std::task::{Context, Poll}; use super::AsyncPeekable; pin_project! { - /// Peek to end - #[derive(Debug)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct PeekToEnd<'a, R> { - reader: &'a mut AsyncPeekable, - buf: &'a mut Vec, - // Make this future `!Unpin` for compatibility with async trait methods. - #[pin] - _pin: PhantomPinned, - } + /// Peek to end + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PeekToEnd<'a, R> { + peekable: &'a mut AsyncPeekable, + buf: &'a mut Vec, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } } pub(crate) fn peek_to_end<'a, R>( - reader: &'a mut AsyncPeekable, + peekable: &'a mut AsyncPeekable, buffer: &'a mut Vec, ) -> PeekToEnd<'a, R> where R: AsyncRead + Unpin, { PeekToEnd { - reader, + peekable, buf: buffer, _pin: PhantomPinned, } @@ -45,21 +45,21 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - let original_buf_len = me.buf.len(); - let peek_buf_len = me.reader.buffer.len(); - me.buf.extend_from_slice(&me.reader.buffer); + let original_buf_len: usize = me.buf.len(); + let peek_buf_len = me.peekable.buffer.len(); + me.buf.extend_from_slice(&me.peekable.buffer); - let fut = me.reader.peeker.read_to_end(me.buf); + let fut = me.peekable.reader.read_to_end(me.buf); tokio::pin!(fut); match fut.poll(cx) { Poll::Ready(Ok(read)) => { - me.reader + me.peekable .buffer .extend_from_slice(&me.buf[original_buf_len + peek_buf_len..]); Poll::Ready(Ok(peek_buf_len + read)) } Poll::Ready(Err(e)) => { - me.reader + me.peekable .buffer .extend_from_slice(&me.buf[original_buf_len + peek_buf_len..]); Poll::Ready(Err(e)) diff --git a/src/tokio/peek_to_string.rs b/src/tokio/peek_to_string.rs index af6715a..4bba1ed 100644 --- a/src/tokio/peek_to_string.rs +++ b/src/tokio/peek_to_string.rs @@ -9,29 +9,29 @@ use std::pin::Pin; use std::task::{Context, Poll}; pin_project! { - /// Future for the [`peek_to_string`](super::AsyncPeekExt::peek_to_string) method. - #[derive(Debug)] - #[must_use = "futures do nothing unless you `.await` or poll them"] - pub struct PeekToString<'a, R> { - reader: &'a mut AsyncPeekable, - // This is the buffer we were provided. It will be replaced with an empty string - // while reading to postpone utf-8 handling until after reading. - output: &'a mut String, - // Make this future `!Unpin` for compatibility with async trait methods. - #[pin] - _pin: PhantomPinned, - } + /// Future for the [`peek_to_string`](super::AsyncPeekExt::peek_to_string) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct PeekToString<'a, R> { + peekable: &'a mut AsyncPeekable, + // This is the buffer we were provided. It will be replaced with an empty string + // while reading to postpone utf-8 handling until after reading. + output: &'a mut String, + // Make this future `!Unpin` for compatibility with async trait methods. + #[pin] + _pin: PhantomPinned, + } } pub(crate) fn peek_to_string<'a, R>( - reader: &'a mut AsyncPeekable, + peekable: &'a mut AsyncPeekable, string: &'a mut String, ) -> PeekToString<'a, R> where R: AsyncRead + Unpin, { PeekToString { - reader, + peekable, output: string, _pin: PhantomPinned, } @@ -46,25 +46,25 @@ where fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let me = self.project(); - let s = match core::str::from_utf8(&me.reader.buffer) { + let s = match core::str::from_utf8(&me.peekable.buffer) { Ok(s) => s, Err(e) => return Poll::Ready(Err(io::Error::new(io::ErrorKind::InvalidData, e))), }; let original_buf_len = me.output.len(); - let peek_buf_len = me.reader.buffer.len(); + let peek_buf_len = me.peekable.buffer.len(); me.output.push_str(s); - let fut = me.reader.peeker.read_to_string(me.output); + let fut = me.peekable.reader.read_to_string(me.output); ::tokio::pin!(fut); match fut.poll(cx) { Poll::Ready(Ok(read)) => { - me.reader + me.peekable .buffer .extend_from_slice(&me.output.as_bytes()[original_buf_len + peek_buf_len..]); Poll::Ready(Ok(peek_buf_len + read)) } Poll::Ready(Err(e)) => { - me.reader + me.peekable .buffer .extend_from_slice(&me.output.as_bytes()[original_buf_len + peek_buf_len..]); Poll::Ready(Err(e))