Skip to content

Commit

Permalink
Merge pull request #97 from constellation-rs/fix
Browse files Browse the repository at this point in the history
Use re-exported amadeus crates in amadeus-derive
  • Loading branch information
alecmocatta authored Jul 26, 2020
2 parents 3699c7c + 057df4b commit 59937f0
Show file tree
Hide file tree
Showing 28 changed files with 182 additions and 347 deletions.
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "amadeus"
version = "0.3.5"
version = "0.3.6"
license = "Apache-2.0"
authors = ["Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand Down Expand Up @@ -35,14 +35,14 @@ json = ["amadeus-serde", "amadeus-derive/serde"]
features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]

[dependencies]
amadeus-core = { version = "=0.3.5", path = "amadeus-core" }
amadeus-derive = { version = "=0.3.5", path = "amadeus-derive" }
amadeus-types = { version = "=0.3.5", path = "amadeus-types" }
amadeus-aws = { version = "=0.3.5", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.3.5", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.3.5", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.3.5", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.3.5", path = "amadeus-serde", optional = true }
amadeus-core = { version = "=0.3.6", path = "amadeus-core" }
amadeus-derive = { version = "=0.3.6", path = "amadeus-derive" }
amadeus-types = { version = "=0.3.6", path = "amadeus-types" }
amadeus-aws = { version = "=0.3.6", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.3.6", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.3.6", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.3.6", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.3.6", path = "amadeus-serde", optional = true }
async-channel = "1.1"
constellation-rs = { version = "0.2.0-alpha.2", default-features = false, optional = true }
derive-new = "0.5"
Expand Down
12 changes: 6 additions & 6 deletions amadeus-aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-aws"
version = "0.3.5"
version = "0.3.6"
license = "Apache-2.0"
authors = ["Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -19,18 +19,18 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.3.5", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.5", path = "../amadeus-types" }
amadeus-core = { version = "=0.3.6", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.6", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
async-trait = "0.1"
chrono = { version = "0.4", default-features = false }
futures = { version = "0.3" }
futures-retry = "0.5"
http = "0.2"
once_cell = "1.0"
rusoto_core = "0.44"
rusoto_credential = "0.44"
rusoto_s3 = "0.44"
rusoto_core = "0.45"
rusoto_credential = "0.45"
rusoto_s3 = "0.45"
serde_closure = "0.3"
serde = { version = "1.0", features = ["derive"] }
tokio = "0.2"
Expand Down
10 changes: 5 additions & 5 deletions amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ impl Cloudfront {
}
}

#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
type Output = std::pin::Pin<Box<dyn Stream<Item = Result<CloudfrontRow, AwsError>> + Send>>;
#[cfg(all(nightly, not(doc)))]
#[cfg(nightly)]
type Output = impl Stream<Item = Result<CloudfrontRow, AwsError>> + Send;

FnMutNamed! {
Expand Down Expand Up @@ -103,7 +103,7 @@ FnMutNamed! {
}
.flatten_stream()
.map(|x: Result<Result<CloudfrontRow, _>, _>| x.and_then(identity));
#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
let ret = ret.boxed();
ret
}
Expand All @@ -114,13 +114,13 @@ impl Source for Cloudfront {
type Error = AwsError;

type ParStream = DistParStream<Self::DistStream>;
#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
#[allow(clippy::type_complexity)]
type DistStream = amadeus_core::par_stream::FlatMap<
amadeus_core::into_par_stream::IterDistStream<std::vec::IntoIter<String>>,
Closure,
>;
#[cfg(all(nightly, not(doc)))]
#[cfg(nightly)]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
Expand Down
2 changes: 1 addition & 1 deletion amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.5")]
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.6")]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
Expand Down
6 changes: 3 additions & 3 deletions amadeus-commoncrawl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-commoncrawl"
version = "0.3.5"
version = "0.3.6"
license = "MIT OR Apache-2.0"
authors = ["Stephen Becker IV <[email protected]>", "Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.3.5", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.5", path = "../amadeus-types" }
amadeus-core = { version = "=0.3.6", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.6", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
futures = "0.3"
nom = "4.2.3"
Expand Down
12 changes: 6 additions & 6 deletions amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).
#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.5")]
#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.6")]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
Expand Down Expand Up @@ -80,9 +80,9 @@ impl CommonCrawl {
}
}

#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
type Output = std::pin::Pin<Box<dyn Stream<Item = Result<Webpage<'static>, io::Error>> + Send>>;
#[cfg(all(nightly, not(doc)))]
#[cfg(nightly)]
type Output = impl Stream<Item = Result<Webpage<'static>, io::Error>> + Send;

FnMutNamed! {
Expand All @@ -99,7 +99,7 @@ FnMutNamed! {
WarcParser::new(body)
}
.flatten_stream();
#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
let ret = ret.boxed();
ret
}
Expand All @@ -110,13 +110,13 @@ impl Source for CommonCrawl {
type Error = io::Error;

type ParStream = DistParStream<Self::DistStream>;
#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
#[allow(clippy::type_complexity)]
type DistStream = amadeus_core::par_stream::FlatMap<
amadeus_core::into_par_stream::IterDistStream<std::vec::IntoIter<String>>,
Closure,
>;
#[cfg(all(nightly, not(doc)))]
#[cfg(nightly)]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-core"
version = "0.3.5"
version = "0.3.6"
license = "Apache-2.0"
authors = ["Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand Down
3 changes: 1 addition & 2 deletions amadeus-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. All functionality is re-exposed in [`amadeus`](https://docs.rs/amadeus/0.3/amadeus/).
#![doc(html_root_url = "https://docs.rs/amadeus-core/0.3.5")]
#![doc(html_root_url = "https://docs.rs/amadeus-core/0.3.6")]
#![cfg_attr(nightly, feature(unboxed_closures))]
#![recursion_limit = "25600"]
#![warn(
Expand Down Expand Up @@ -66,7 +66,6 @@ macro_rules! impl_par_dist_rename {
IntoParallelStream IntoDistributedStream
ParStream DistStream
Send ProcessSend
ImplParallelStream ImplDistributedStream
IterParStream IterDistStream
into_par_stream into_dist_stream
par_stream dist_stream
Expand Down
2 changes: 2 additions & 0 deletions amadeus-core/src/par_sink/combine.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::option_if_let_else)]

use derive_new::new;
use educe::Educe;
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/src/par_sink/combiner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(unused_imports,clippy::single_component_path_imports)]
#![allow(unused_imports, clippy::single_component_path_imports, clippy::option_if_let_else)]

use super::FolderSync;

Expand Down
38 changes: 20 additions & 18 deletions amadeus-core/src/par_sink/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,16 @@ where
Sum2::A((k, u, r)) => {
let waker = cx.waker();
let stream = stream::poll_fn(|cx| {
if let Some(u) = u.take() {
Poll::Ready(Some(u))
} else {
let waker_ = cx.waker();
if !waker.will_wake(waker_) {
waker_.wake_by_ref();
}
Poll::Pending
}
u.take().map_or_else(
|| {
let waker_ = cx.waker();
if !waker.will_wake(waker_) {
waker_.wake_by_ref();
}
Poll::Pending
},
|u| Poll::Ready(Some(u)),
)
})
.fuse()
.pipe(self_.pipe.as_mut());
Expand Down Expand Up @@ -332,15 +333,16 @@ where
let mut v = Some(v);
let waker = cx.waker();
let stream = stream::poll_fn(|cx| {
if let Some(v) = v.take() {
Poll::Ready(Some(v))
} else {
let waker_ = cx.waker();
if !waker.will_wake(waker_) {
waker_.wake_by_ref();
}
Poll::Pending
}
v.take().map_or_else(
|| {
let waker_ = cx.waker();
if !waker.will_wake(waker_) {
waker_.wake_by_ref();
}
Poll::Pending
},
|v| Poll::Ready(Some(v)),
)
})
.fuse();
pin_mut!(stream);
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/src/par_sink/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ macro_rules! impl_tuple {
$({
let stream = Peekable{stream:stream.as_mut(),peeked:&mut *self_.peeked};
pin_mut!(stream);
let stream_ = substream(cx, stream, |item| if let $enum::$t(_) = item { true } else { false }, |item| { progress = true; if let $enum::$t(item) = item { item } else { unreachable!() } });
let stream_ = substream(cx, stream, |item| matches!(item, $enum::$t(_)), |item| { progress = true; if let $enum::$t(item) = item { item } else { unreachable!() } });
pin_mut!(stream_);
if self_.ready.$num.is_none() {
if let Poll::Ready(done) = self_.$t.as_mut().poll_forward(cx, stream_) {
Expand Down
57 changes: 1 addition & 56 deletions amadeus-core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
any::{Any, TypeId}, error, fmt, hash::{Hash, Hasher}, io, marker::PhantomData, pin::Pin, sync::Arc, task::{Context, Poll}
};

use crate::par_stream::{DistributedStream, ParallelStream, StreamTask};
use crate::par_stream::{DistributedStream, ParallelStream};

pub struct ResultExpand<T, E>(pub Result<T, E>);
impl<T, E> IntoIterator for ResultExpand<T, E>
Expand Down Expand Up @@ -110,61 +110,6 @@ where
}
}

#[doc(hidden)]
pub struct ImplDistributedStream<T>(PhantomData<fn() -> T>);
impl<T> ImplDistributedStream<T> {
pub fn new<U>(_drop: U) -> Self
where
U: DistributedStream<Item = T>,
{
Self(PhantomData)
}
}
impl<T: 'static> DistributedStream for ImplDistributedStream<T> {
type Item = T;
type Task = ImplTask<T>;

fn size_hint(&self) -> (usize, Option<usize>) {
unreachable!()
}
fn next_task(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Task>> {
unreachable!()
}
}
impl<T: 'static> ParallelStream for ImplDistributedStream<T> {
type Item = T;
type Task = ImplTask<T>;

fn size_hint(&self) -> (usize, Option<usize>) {
unreachable!()
}
fn next_task(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Task>> {
unreachable!()
}
}

#[doc(hidden)]
#[derive(Serialize, Deserialize)]
pub struct ImplTask<T>(PhantomData<fn() -> T>);
impl<T> StreamTask for ImplTask<T>
where
T: 'static,
{
type Item = T;
type Async = ImplTask<T>;

fn into_async(self) -> Self::Async {
self
}
}
impl<T: 'static> Stream for ImplTask<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
unreachable!()
}
}

// This is a dumb hack to avoid triggering https://github.com/rust-lang/rust/issues/48214 in amadeus-derive: see https://github.com/taiki-e/pin-project/issues/102#issuecomment-540472282
#[doc(hidden)]
#[repr(transparent)]
Expand Down
2 changes: 1 addition & 1 deletion amadeus-derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-derive"
version = "0.3.5"
version = "0.3.6"
license = "Apache-2.0"
authors = ["Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand Down
12 changes: 6 additions & 6 deletions amadeus-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. This macro is re-exposed as [`amadeus::data::Data`](https://docs.rs/amadeus/0.3/amadeus/data/derive.Data.html).
#![doc(html_root_url = "https://docs.rs/amadeus-derive/0.3.5")]
#![doc(html_root_url = "https://docs.rs/amadeus-derive/0.3.6")]
#![recursion_limit = "400"]
#![warn(
missing_copy_implementations,
Expand Down Expand Up @@ -262,7 +262,7 @@ fn impl_struct(
let mut parquet_derives = None;
if cfg!(feature = "parquet") {
parquet_includes = Some(quote! {
pub use ::amadeus_parquet::derive::{
pub use #amadeus_path::amadeus_parquet::derive::{
ParquetData, Repetition, ColumnReader, ParquetError, ParquetResult, ParquetSchema, Reader, DisplaySchemaGroup, ColumnPath, Type
};
});
Expand Down Expand Up @@ -384,7 +384,7 @@ fn impl_struct(
let mut postgres_derives = None;
if cfg!(feature = "postgres") {
postgres_includes = Some(quote! {
pub use ::amadeus_postgres::{Names,read_be_i32,read_value,_internal as postgres,PostgresData};
pub use #amadeus_path::amadeus_postgres::{Names,read_be_i32,read_value,_internal as postgres,PostgresData};
});
postgres_derives = Some(quote! {
#[automatically_derived]
Expand Down Expand Up @@ -436,7 +436,7 @@ fn impl_struct(
let mut serde_derives = None;
if cfg!(feature = "serde") {
serde_includes = Some(quote! {
pub use ::amadeus_serde::{SerdeData,_internal::{Serialize, Deserialize, Serializer, Deserializer}};
pub use #amadeus_path::amadeus_serde::{SerdeData, _internal::{Serialize, Deserialize, Serializer, Deserializer}};
pub use #amadeus_path::data::serde_data;
});
serde_derives = Some(quote! {
Expand Down Expand Up @@ -471,8 +471,8 @@ fn impl_struct(
#parquet_includes
#postgres_includes
#serde_includes
pub use ::amadeus_core::util::Wrapper;
pub use ::amadeus_types::{AmadeusOrd, Data as CoreData, DowncastFrom, Downcast, DowncastError, Value, Group, SchemaIncomplete, ListVec, __internal::{Serialize as Serialize_, Deserialize as Deserialize_, Serializer as Serializer_, Deserializer as Deserializer_, SerializeTuple, Error as SerdeError, Visitor, SeqAccess}};
pub use #amadeus_path::amadeus_core::util::Wrapper;
pub use #amadeus_path::amadeus_types::{AmadeusOrd, Data as CoreData, DowncastFrom, Downcast, DowncastError, Value, Group, SchemaIncomplete, ListVec, __internal::{Serialize as Serialize_, Deserialize as Deserialize_, Serializer as Serializer_, Deserializer as Deserializer_, SerializeTuple, Error as SerdeError, Visitor, SeqAccess}};
pub use #amadeus_path::data::Data;
pub use ::std::{borrow::ToOwned, boxed::Box, clone::Clone, collections::HashMap, convert::{From, Into}, cmp::{Ordering, PartialEq}, default::Default, error::Error, fmt::{self, Debug, Write}, format, hash::{Hash, Hasher}, iter::{ExactSizeIterator, IntoIterator, Iterator}, marker::{PhantomData, Send, Sized, Sync}, result::Result::{self, Ok, Err}, string::String, panic, vec, vec::{IntoIter, Vec}, option::Option::{self, Some, None}};
}
Expand Down
Loading

0 comments on commit 59937f0

Please sign in to comment.