Skip to content

Commit

Permalink
Upgrade tokio utils to 0.3 (#138)
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede authored Jul 19, 2020
1 parent a9b5a7b commit 334c985
Show file tree
Hide file tree
Showing 20 changed files with 83 additions and 1,053 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
members = [
"actix-codec",
"actix-connect",
"actix-ioframe",
"actix-rt",
"actix-macros",
"actix-service",
Expand All @@ -19,7 +18,6 @@ members = [
[patch.crates-io]
actix-codec = { path = "actix-codec" }
actix-connect = { path = "actix-connect" }
actix-ioframe = { path = "actix-ioframe" }
actix-rt = { path = "actix-rt" }
actix-macros = { path = "actix-macros" }
actix-server = { path = "actix-server" }
Expand Down
6 changes: 4 additions & 2 deletions actix-codec/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# Changes

* Use `.advance()` intead of `.split_to()`
## Unreleased - 2020-xx-xx
* Use `.advance()` instead of `.split_to()`.
* Upgrade `tokio-util` to `0.3`.

## [0.2.0] - 2019-12-10

* Use specific futures dependencies

## [0.2.0-alpha.4]

* Fix buffer remaining capacity calcualtion
* Fix buffer remaining capacity calculation

## [0.2.0-alpha.3]

Expand Down
4 changes: 2 additions & 2 deletions actix-codec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ bitflags = "1.2.1"
bytes = "0.5.2"
futures-core = { version = "0.3.4", default-features = false }
futures-sink = { version = "0.3.4", default-features = false }
tokio = { version = "0.2.4", default-features=false }
tokio-util = { version = "0.2.0", default-features=false, features=["codec"] }
tokio = { version = "0.2.5", default-features = false }
tokio-util = { version = "0.3.1", default-features = false, features = ["codec"] }
log = "0.4"
pin-project = "0.4.17"
3 changes: 1 addition & 2 deletions actix-codec/src/bcodec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@ use super::{Decoder, Encoder};
#[derive(Debug, Copy, Clone)]
pub struct BytesCodec;

impl Encoder for BytesCodec {
type Item = Bytes;
impl Encoder<Bytes> for BytesCodec {
type Error = io::Error;

fn encode(&mut self, item: Bytes, dst: &mut BytesMut) -> Result<(), Self::Error> {
Expand Down
50 changes: 22 additions & 28 deletions actix-codec/src/framed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ use pin_project::pin_project;

use crate::{AsyncRead, AsyncWrite, Decoder, Encoder};

/// Low-water mark
const LW: usize = 1024;
/// High-water mark
const HW: usize = 8 * 1024;

bitflags::bitflags! {
Expand All @@ -34,7 +36,7 @@ pub struct Framed<T, U> {
impl<T, U> Framed<T, U>
where
T: AsyncRead + AsyncWrite,
U: Decoder + Encoder,
U: Decoder,
{
/// Provides a `Stream` and `Sink` interface for reading and writing to this
/// `Io` object, using `Decode` and `Encode` to read and write the raw data.
Expand Down Expand Up @@ -129,7 +131,7 @@ impl<T, U> Framed<T, U> {
}

/// Consume the `Frame`, returning `Frame` with different codec.
pub fn into_framed<U2>(self, codec: U2) -> Framed<T, U2> {
pub fn into_framed<U2, I2>(self, codec: U2) -> Framed<T, U2> {
Framed {
codec,
io: self.io,
Expand All @@ -140,7 +142,7 @@ impl<T, U> Framed<T, U> {
}

/// Consume the `Frame`, returning `Frame` with different io.
pub fn map_io<F, T2>(self, f: F) -> Framed<T2, U>
pub fn map_io<F, T2, I2>(self, f: F) -> Framed<T2, U>
where
F: Fn(T) -> T2,
{
Expand All @@ -154,7 +156,7 @@ impl<T, U> Framed<T, U> {
}

/// Consume the `Frame`, returning `Frame` with different codec.
pub fn map_codec<F, U2>(self, f: F) -> Framed<T, U2>
pub fn map_codec<F, U2, I2>(self, f: F) -> Framed<T, U2>
where
F: Fn(U) -> U2,
{
Expand Down Expand Up @@ -186,10 +188,10 @@ impl<T, U> Framed<T, U> {

impl<T, U> Framed<T, U> {
/// Serialize item and Write to the inner buffer
pub fn write(mut self: Pin<&mut Self>, item: <U as Encoder>::Item) -> Result<(), <U as Encoder>::Error>
pub fn write<I>(mut self: Pin<&mut Self>, item: I) -> Result<(), <U as Encoder<I>>::Error>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let this = self.as_mut().project();
let remaining = this.write_buf.capacity() - this.write_buf.len();
Expand All @@ -209,7 +211,10 @@ impl<T, U> Framed<T, U> {
}

/// Try to read underlying I/O stream and decode item.
pub fn next_item(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Result<U::Item, U::Error>>>
pub fn next_item(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<<U as Decoder>::Item, U::Error>>>
where
T: AsyncRead,
U: Decoder,
Expand Down Expand Up @@ -266,20 +271,18 @@ impl<T, U> Framed<T, U> {
}

/// Flush write buffer to underlying I/O stream.
pub fn flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
pub fn flush<I>(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let mut this = self.as_mut().project();
log::trace!("flushing framed transport");

while !this.write_buf.is_empty() {
log::trace!("writing; remaining={}", this.write_buf.len());

let n = ready!(
this.io.as_mut().poll_write(cx, this.write_buf)
)?;
let n = ready!(this.io.as_mut().poll_write(cx, this.write_buf))?;

if n == 0 {
return Poll::Ready(Err(io::Error::new(
Expand All @@ -301,10 +304,10 @@ impl<T, U> Framed<T, U> {
}

/// Flush write buffer and shutdown underlying I/O stream.
pub fn close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
pub fn close<I>(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), U::Error>>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
{
let mut this = self.as_mut().project();
ready!(this.io.as_mut().poll_flush(cx))?;
Expand All @@ -325,10 +328,10 @@ where
}
}

impl<T, U> Sink<U::Item> for Framed<T, U>
impl<T, U, I> Sink<I> for Framed<T, U>
where
T: AsyncWrite,
U: Encoder,
U: Encoder<I>,
U::Error: From<io::Error>,
{
type Error = U::Error;
Expand All @@ -341,24 +344,15 @@ where
}
}

fn start_send(
self: Pin<&mut Self>,
item: <U as Encoder>::Item,
) -> Result<(), Self::Error> {
fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> {
self.write(item)
}

fn poll_flush(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.flush(cx)
}

fn poll_close(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), Self::Error>> {
fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.close(cx)
}
}
Expand Down
2 changes: 1 addition & 1 deletion actix-codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! [`AsyncWrite`]: AsyncWrite
//! [`Sink`]: futures_sink::Sink
//! [`Stream`]: futures_core::Stream
#![deny(rust_2018_idioms, warnings)]
#![deny(rust_2018_idioms)]

mod bcodec;
mod framed;
Expand Down
33 changes: 0 additions & 33 deletions actix-ioframe/CHANGES.md

This file was deleted.

33 changes: 0 additions & 33 deletions actix-ioframe/Cargo.toml

This file was deleted.

1 change: 0 additions & 1 deletion actix-ioframe/LICENSE-APACHE

This file was deleted.

1 change: 0 additions & 1 deletion actix-ioframe/LICENSE-MIT

This file was deleted.

3 changes: 3 additions & 0 deletions actix-ioframe/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# actix-ioframe

**This crate has been deprecated and removed.**
Loading

0 comments on commit 334c985

Please sign in to comment.