Skip to content

Commit

Permalink
update actix-codec/utils dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
robjtede committed Aug 19, 2020
1 parent 3892a95 commit 1932fd9
Show file tree
Hide file tree
Showing 23 changed files with 133 additions and 85 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
# Changes

## Unreleased- 2020-xx-xx
### Changed
* Update actix-codec and actix-utils dependencies.


## 3.0.0-beta.3 - 2020-08-17
### Changed
* Update `rustls` to 0.18


## 3.0.0-beta.2 - 2020-08-17
### Changed
* `PayloadConfig` is now also considered in `Bytes` and `String` extractors when set
Expand Down
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ name = "test_server"
required-features = ["compress"]

[dependencies]
actix-codec = "0.2.0"
actix-service = "1.0.2"
actix-utils = "1.0.6"
actix-codec = "0.3.0-beta.2"
actix-service = "1.0.6"
actix-utils = "2.0.0-beta.1"
actix-router = "0.2.4"
actix-rt = "1.1.1"
actix-server = "1.0.0"
Expand Down
3 changes: 2 additions & 1 deletion actix-http/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Changes

## Unreleased

### Changed
* Update actix-codec and actix-utils dependencies.

## [2.0.0-beta.3] - 2020-08-14

Expand Down
4 changes: 2 additions & 2 deletions actix-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ actors = ["actix"]

[dependencies]
actix-service = "1.0.5"
actix-codec = "0.2.0"
actix-codec = "0.3.0-beta.2"
actix-connect = "2.0.0-alpha.4"
actix-utils = "1.0.6"
actix-utils = "2.0.0-beta.1"
actix-rt = "1.0.0"
actix-threadpool = "0.3.1"
actix-tls = { version = "2.0.0-alpha.2", optional = true }
Expand Down
12 changes: 8 additions & 4 deletions actix-http/src/client/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,10 @@ pub trait Connection {

pub(crate) trait ConnectionLifetime: AsyncRead + AsyncWrite + 'static {
/// Close connection
fn close(&mut self);
fn close(self: Pin<&mut Self>);

/// Release connection to the connection pool
fn release(&mut self);
fn release(self: Pin<&mut Self>);
}

#[doc(hidden)]
Expand Down Expand Up @@ -195,11 +195,15 @@ where
match self {
EitherConnection::A(con) => con
.open_tunnel(head)
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::A))))
.map(|res| {
res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::A)))
})
.boxed_local(),
EitherConnection::B(con) => con
.open_tunnel(head)
.map(|res| res.map(|(head, framed)| (head, framed.map_io(EitherIo::B))))
.map(|res| {
res.map(|(head, framed)| (head, framed.into_map_io(EitherIo::B)))
})
.boxed_local(),
}
}
Expand Down
65 changes: 35 additions & 30 deletions actix-http/src/client/h1proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,32 +67,32 @@ where
};

// create Framed and send request
let mut framed = Framed::new(io, h1::ClientCodec::default());
framed.send((head, body.size()).into()).await?;
let mut framed_inner = Framed::new(io, h1::ClientCodec::default());
framed_inner.send((head, body.size()).into()).await?;

// send request body
match body.size() {
BodySize::None | BodySize::Empty | BodySize::Sized(0) => (),
_ => send_body(body, &mut framed).await?,
_ => send_body(body, Pin::new(&mut framed_inner)).await?,
};

// read response and init read body
let res = framed.into_future().await;
let res = Pin::new(&mut framed_inner).into_future().await;
let (head, framed) = if let (Some(result), framed) = res {
let item = result.map_err(SendRequestError::from)?;
(item, framed)
} else {
return Err(SendRequestError::from(ConnectError::Disconnected));
};

match framed.get_codec().message_type() {
match framed.codec_ref().message_type() {
h1::MessageType::None => {
let force_close = !framed.get_codec().keepalive();
let force_close = !framed.codec_ref().keepalive();
release_connection(framed, force_close);
Ok((head, Payload::None))
}
_ => {
let pl: PayloadStream = PlStream::new(framed).boxed_local();
let pl: PayloadStream = PlStream::new(framed_inner).boxed_local();
Ok((head, pl.into()))
}
}
Expand All @@ -119,35 +119,36 @@ where
}

/// send request body to the peer
pub(crate) async fn send_body<I, B>(
pub(crate) async fn send_body<T, B>(
body: B,
framed: &mut Framed<I, h1::ClientCodec>,
mut framed: Pin<&mut Framed<T, h1::ClientCodec>>,
) -> Result<(), SendRequestError>
where
I: ConnectionLifetime,
T: ConnectionLifetime + Unpin,
B: MessageBody,
{
let mut eof = false;
pin_mut!(body);

let mut eof = false;
while !eof {
while !eof && !framed.is_write_buf_full() {
while !eof && !framed.as_ref().is_write_buf_full() {
match poll_fn(|cx| body.as_mut().poll_next(cx)).await {
Some(result) => {
framed.write(h1::Message::Chunk(Some(result?)))?;
framed.as_mut().write(h1::Message::Chunk(Some(result?)))?;
}
None => {
eof = true;
framed.write(h1::Message::Chunk(None))?;
framed.as_mut().write(h1::Message::Chunk(None))?;
}
}
}

if !framed.is_write_buf_empty() {
poll_fn(|cx| match framed.flush(cx) {
if !framed.as_ref().is_write_buf_empty() {
poll_fn(|cx| match framed.as_mut().flush(cx) {
Poll::Ready(Ok(_)) => Poll::Ready(Ok(())),
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
Poll::Pending => {
if !framed.is_write_buf_full() {
if !framed.as_ref().is_write_buf_full() {
Poll::Ready(Ok(()))
} else {
Poll::Pending
Expand All @@ -158,13 +159,14 @@ where
}
}

SinkExt::flush(framed).await?;
SinkExt::flush(Pin::into_inner(framed)).await?;
Ok(())
}

#[doc(hidden)]
/// HTTP client connection
pub struct H1Connection<T> {
/// T should be `Unpin`
io: Option<T>,
created: time::Instant,
pool: Option<Acquired<T>>,
Expand All @@ -175,7 +177,7 @@ where
T: AsyncRead + AsyncWrite + Unpin + 'static,
{
/// Close connection
fn close(&mut self) {
fn close(mut self: Pin<&mut Self>) {
if let Some(mut pool) = self.pool.take() {
if let Some(io) = self.io.take() {
pool.close(IoConnection::new(
Expand All @@ -188,7 +190,7 @@ where
}

/// Release this connection to the connection pool
fn release(&mut self) {
fn release(mut self: Pin<&mut Self>) {
if let Some(mut pool) = self.pool.take() {
if let Some(io) = self.io.take() {
pool.release(IoConnection::new(
Expand Down Expand Up @@ -242,14 +244,18 @@ impl<T: AsyncRead + AsyncWrite + Unpin + 'static> AsyncWrite for H1Connection<T>
}
}

#[pin_project::pin_project]
pub(crate) struct PlStream<Io> {
#[pin]
framed: Option<Framed<Io, h1::ClientPayloadCodec>>,
}

impl<Io: ConnectionLifetime> PlStream<Io> {
fn new(framed: Framed<Io, h1::ClientCodec>) -> Self {
let framed = framed.into_map_codec(|codec| codec.into_payload_codec());

PlStream {
framed: Some(framed.map_codec(|codec| codec.into_payload_codec())),
framed: Some(framed),
}
}
}
Expand All @@ -261,16 +267,16 @@ impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let mut this = self.project();

match this.framed.as_mut().unwrap().next_item(cx)? {
match this.framed.as_mut().as_pin_mut().unwrap().next_item(cx)? {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(chunk)) => {
if let Some(chunk) = chunk {
Poll::Ready(Some(Ok(chunk)))
} else {
let framed = this.framed.take().unwrap();
let force_close = !framed.get_codec().keepalive();
let framed = this.framed.as_mut().as_pin_mut().unwrap();
let force_close = !framed.codec_ref().keepalive();
release_connection(framed, force_close);
Poll::Ready(None)
}
Expand All @@ -280,14 +286,13 @@ impl<Io: ConnectionLifetime> Stream for PlStream<Io> {
}
}

fn release_connection<T, U>(framed: Framed<T, U>, force_close: bool)
fn release_connection<T, U>(framed: Pin<&mut Framed<T, U>>, force_close: bool)
where
T: ConnectionLifetime,
{
let mut parts = framed.into_parts();
if !force_close && parts.read_buf.is_empty() && parts.write_buf.is_empty() {
parts.io.release()
if !force_close && framed.is_read_buf_empty() && framed.is_write_buf_empty() {
framed.io_pin().release()
} else {
parts.io.close()
framed.io_pin().close()
}
}
7 changes: 4 additions & 3 deletions actix-http/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Error and Result module
use std::cell::RefCell;
use std::io::Write;
use std::str::Utf8Error;
Expand All @@ -7,7 +8,7 @@ use std::{fmt, io, result};

use actix_codec::{Decoder, Encoder};
pub use actix_threadpool::BlockingError;
use actix_utils::framed::DispatcherError as FramedDispatcherError;
use actix_utils::dispatcher::DispatcherError as FramedDispatcherError;
use actix_utils::timeout::TimeoutError;
use bytes::BytesMut;
use derive_more::{Display, From};
Expand Down Expand Up @@ -452,10 +453,10 @@ impl ResponseError for ContentTypeError {
}
}

impl<E, U: Encoder + Decoder> ResponseError for FramedDispatcherError<E, U>
impl<E, U: Encoder<I> + Decoder, I> ResponseError for FramedDispatcherError<E, U, I>
where
E: fmt::Debug + fmt::Display,
<U as Encoder>::Error: fmt::Debug,
<U as Encoder<I>>::Error: fmt::Debug,
<U as Decoder>::Error: fmt::Debug,
{
}
Expand Down
5 changes: 2 additions & 3 deletions actix-http/src/h1/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,12 @@ impl Decoder for ClientPayloadCodec {
}
}

impl Encoder for ClientCodec {
type Item = Message<(RequestHeadType, BodySize)>;
impl Encoder<Message<(RequestHeadType, BodySize)>> for ClientCodec {
type Error = io::Error;

fn encode(
&mut self,
item: Self::Item,
item: Message<(RequestHeadType, BodySize)>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match item {
Expand Down
5 changes: 2 additions & 3 deletions actix-http/src/h1/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,12 @@ impl Decoder for Codec {
}
}

impl Encoder for Codec {
type Item = Message<(Response<()>, BodySize)>;
impl Encoder<Message<(Response<()>, BodySize)>> for Codec {
type Error = io::Error;

fn encode(
&mut self,
item: Self::Item,
item: Message<(Response<()>, BodySize)>,
dst: &mut BytesMut,
) -> Result<(), Self::Error> {
match item {
Expand Down
16 changes: 10 additions & 6 deletions actix-http/src/h1/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -548,10 +548,12 @@ where
}

#[doc(hidden)]
#[pin_project::pin_project]
pub struct OneRequestServiceResponse<T>
where
T: AsyncRead + AsyncWrite + Unpin,
{
#[pin]
framed: Option<Framed<T, Codec>>,
}

Expand All @@ -562,16 +564,18 @@ where
type Output = Result<(Request, Framed<T, Codec>), ParseError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.framed.as_mut().unwrap().next_item(cx) {
Poll::Ready(Some(Ok(req))) => match req {
let this = self.as_mut().project();

match ready!(this.framed.as_pin_mut().unwrap().next_item(cx)) {
Some(Ok(req)) => match req {
Message::Item(req) => {
Poll::Ready(Ok((req, self.framed.take().unwrap())))
let mut this = self.as_mut().project();
Poll::Ready(Ok((req, this.framed.take().unwrap())))
}
Message::Chunk(_) => unreachable!("Something is wrong"),
},
Poll::Ready(Some(Err(err))) => Poll::Ready(Err(err)),
Poll::Ready(None) => Poll::Ready(Err(ParseError::Incomplete)),
Poll::Pending => Poll::Pending,
Some(Err(err)) => Poll::Ready(Err(err)),
None => Poll::Ready(Err(ParseError::Incomplete)),
}
}
}
Loading

0 comments on commit 1932fd9

Please sign in to comment.