Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use re-exported amadeus crates in amadeus-derive #97

Merged
merged 4 commits into from
Jul 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[package]
name = "amadeus"
version = "0.3.5"
version = "0.3.6"
license = "Apache-2.0"
authors = ["Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand Down Expand Up @@ -35,14 +35,14 @@ json = ["amadeus-serde", "amadeus-derive/serde"]
features = ["constellation", "aws", "commoncrawl", "parquet", "postgres", "csv", "json"]

[dependencies]
amadeus-core = { version = "=0.3.5", path = "amadeus-core" }
amadeus-derive = { version = "=0.3.5", path = "amadeus-derive" }
amadeus-types = { version = "=0.3.5", path = "amadeus-types" }
amadeus-aws = { version = "=0.3.5", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.3.5", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.3.5", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.3.5", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.3.5", path = "amadeus-serde", optional = true }
amadeus-core = { version = "=0.3.6", path = "amadeus-core" }
amadeus-derive = { version = "=0.3.6", path = "amadeus-derive" }
amadeus-types = { version = "=0.3.6", path = "amadeus-types" }
amadeus-aws = { version = "=0.3.6", path = "amadeus-aws", optional = true }
amadeus-commoncrawl = { version = "=0.3.6", path = "amadeus-commoncrawl", optional = true }
amadeus-parquet = { version = "=0.3.6", path = "amadeus-parquet", optional = true }
amadeus-postgres = { version = "=0.3.6", path = "amadeus-postgres", optional = true }
amadeus-serde = { version = "=0.3.6", path = "amadeus-serde", optional = true }
async-channel = "1.1"
constellation-rs = { version = "0.2.0-alpha.2", default-features = false, optional = true }
derive-new = "0.5"
Expand Down
12 changes: 6 additions & 6 deletions amadeus-aws/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-aws"
version = "0.3.5"
version = "0.3.6"
license = "Apache-2.0"
authors = ["Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -19,18 +19,18 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.3.5", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.5", path = "../amadeus-types" }
amadeus-core = { version = "=0.3.6", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.6", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
async-trait = "0.1"
chrono = { version = "0.4", default-features = false }
futures = { version = "0.3" }
futures-retry = "0.5"
http = "0.2"
once_cell = "1.0"
rusoto_core = "0.44"
rusoto_credential = "0.44"
rusoto_s3 = "0.44"
rusoto_core = "0.45"
rusoto_credential = "0.45"
rusoto_s3 = "0.45"
serde_closure = "0.3"
serde = { version = "1.0", features = ["derive"] }
tokio = "0.2"
Expand Down
10 changes: 5 additions & 5 deletions amadeus-aws/src/cloudfront.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ impl Cloudfront {
}
}

#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
type Output = std::pin::Pin<Box<dyn Stream<Item = Result<CloudfrontRow, AwsError>> + Send>>;
#[cfg(all(nightly, not(doc)))]
#[cfg(nightly)]
type Output = impl Stream<Item = Result<CloudfrontRow, AwsError>> + Send;

FnMutNamed! {
Expand Down Expand Up @@ -103,7 +103,7 @@ FnMutNamed! {
}
.flatten_stream()
.map(|x: Result<Result<CloudfrontRow, _>, _>| x.and_then(identity));
#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
let ret = ret.boxed();
ret
}
Expand All @@ -114,13 +114,13 @@ impl Source for Cloudfront {
type Error = AwsError;

type ParStream = DistParStream<Self::DistStream>;
#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
#[allow(clippy::type_complexity)]
type DistStream = amadeus_core::par_stream::FlatMap<
amadeus_core::into_par_stream::IterDistStream<std::vec::IntoIter<String>>,
Closure,
>;
#[cfg(all(nightly, not(doc)))]
#[cfg(nightly)]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
Expand Down
2 changes: 1 addition & 1 deletion amadeus-aws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.5")]
#![doc(html_root_url = "https://docs.rs/amadeus-aws/0.3.6")]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
Expand Down
6 changes: 3 additions & 3 deletions amadeus-commoncrawl/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-commoncrawl"
version = "0.3.5"
version = "0.3.6"
license = "MIT OR Apache-2.0"
authors = ["Stephen Becker IV <[email protected]>", "Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand All @@ -19,8 +19,8 @@ azure-devops = { project = "alecmocatta/amadeus", pipeline = "tests", build = "2
maintenance = { status = "actively-developed" }

[dependencies]
amadeus-core = { version = "=0.3.5", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.5", path = "../amadeus-types" }
amadeus-core = { version = "=0.3.6", path = "../amadeus-core" }
amadeus-types = { version = "=0.3.6", path = "../amadeus-types" }
async-compression = { version = "0.3.3", features = ["gzip", "futures-bufread"] }
futures = "0.3"
nom = "4.2.3"
Expand Down
12 changes: 6 additions & 6 deletions amadeus-commoncrawl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. These types are re-exposed in [`amadeus::source`](https://docs.rs/amadeus/0.3/amadeus/source/index.html).

#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.5")]
#![doc(html_root_url = "https://docs.rs/amadeus-commoncrawl/0.3.6")]
#![cfg_attr(nightly, feature(type_alias_impl_trait))]
#![warn(
// missing_copy_implementations,
Expand Down Expand Up @@ -80,9 +80,9 @@ impl CommonCrawl {
}
}

#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
type Output = std::pin::Pin<Box<dyn Stream<Item = Result<Webpage<'static>, io::Error>> + Send>>;
#[cfg(all(nightly, not(doc)))]
#[cfg(nightly)]
type Output = impl Stream<Item = Result<Webpage<'static>, io::Error>> + Send;

FnMutNamed! {
Expand All @@ -99,7 +99,7 @@ FnMutNamed! {
WarcParser::new(body)
}
.flatten_stream();
#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
let ret = ret.boxed();
ret
}
Expand All @@ -110,13 +110,13 @@ impl Source for CommonCrawl {
type Error = io::Error;

type ParStream = DistParStream<Self::DistStream>;
#[cfg(not(all(nightly, not(doc))))]
#[cfg(not(nightly))]
#[allow(clippy::type_complexity)]
type DistStream = amadeus_core::par_stream::FlatMap<
amadeus_core::into_par_stream::IterDistStream<std::vec::IntoIter<String>>,
Closure,
>;
#[cfg(all(nightly, not(doc)))]
#[cfg(nightly)]
type DistStream = impl DistributedStream<Item = Result<Self::Item, Self::Error>>;

fn par_stream(self) -> Self::ParStream {
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-core"
version = "0.3.5"
version = "0.3.6"
license = "Apache-2.0"
authors = ["Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand Down
3 changes: 1 addition & 2 deletions amadeus-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. All functionality is re-exposed in [`amadeus`](https://docs.rs/amadeus/0.3/amadeus/).

#![doc(html_root_url = "https://docs.rs/amadeus-core/0.3.5")]
#![doc(html_root_url = "https://docs.rs/amadeus-core/0.3.6")]
#![cfg_attr(nightly, feature(unboxed_closures))]
#![recursion_limit = "25600"]
#![warn(
Expand Down Expand Up @@ -66,7 +66,6 @@ macro_rules! impl_par_dist_rename {
IntoParallelStream IntoDistributedStream
ParStream DistStream
Send ProcessSend
ImplParallelStream ImplDistributedStream
IterParStream IterDistStream
into_par_stream into_dist_stream
par_stream dist_stream
Expand Down
2 changes: 2 additions & 0 deletions amadeus-core/src/par_sink/combine.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#![allow(clippy::option_if_let_else)]

use derive_new::new;
use educe::Educe;
use serde::{Deserialize, Serialize};
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/src/par_sink/combiner.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#![allow(unused_imports,clippy::single_component_path_imports)]
#![allow(unused_imports, clippy::single_component_path_imports, clippy::option_if_let_else)]

use super::FolderSync;

Expand Down
38 changes: 20 additions & 18 deletions amadeus-core/src/par_sink/group_by.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,15 +181,16 @@ where
Sum2::A((k, u, r)) => {
let waker = cx.waker();
let stream = stream::poll_fn(|cx| {
if let Some(u) = u.take() {
Poll::Ready(Some(u))
} else {
let waker_ = cx.waker();
if !waker.will_wake(waker_) {
waker_.wake_by_ref();
}
Poll::Pending
}
u.take().map_or_else(
|| {
let waker_ = cx.waker();
if !waker.will_wake(waker_) {
waker_.wake_by_ref();
}
Poll::Pending
},
|u| Poll::Ready(Some(u)),
)
})
.fuse()
.pipe(self_.pipe.as_mut());
Expand Down Expand Up @@ -332,15 +333,16 @@ where
let mut v = Some(v);
let waker = cx.waker();
let stream = stream::poll_fn(|cx| {
if let Some(v) = v.take() {
Poll::Ready(Some(v))
} else {
let waker_ = cx.waker();
if !waker.will_wake(waker_) {
waker_.wake_by_ref();
}
Poll::Pending
}
v.take().map_or_else(
|| {
let waker_ = cx.waker();
if !waker.will_wake(waker_) {
waker_.wake_by_ref();
}
Poll::Pending
},
|v| Poll::Ready(Some(v)),
)
})
.fuse();
pin_mut!(stream);
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/src/par_sink/tuple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ macro_rules! impl_tuple {
$({
let stream = Peekable{stream:stream.as_mut(),peeked:&mut *self_.peeked};
pin_mut!(stream);
let stream_ = substream(cx, stream, |item| if let $enum::$t(_) = item { true } else { false }, |item| { progress = true; if let $enum::$t(item) = item { item } else { unreachable!() } });
let stream_ = substream(cx, stream, |item| matches!(item, $enum::$t(_)), |item| { progress = true; if let $enum::$t(item) = item { item } else { unreachable!() } });
pin_mut!(stream_);
if self_.ready.$num.is_none() {
if let Poll::Ready(done) = self_.$t.as_mut().poll_forward(cx, stream_) {
Expand Down
57 changes: 1 addition & 56 deletions amadeus-core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use std::{
any::{Any, TypeId}, error, fmt, hash::{Hash, Hasher}, io, marker::PhantomData, pin::Pin, sync::Arc, task::{Context, Poll}
};

use crate::par_stream::{DistributedStream, ParallelStream, StreamTask};
use crate::par_stream::{DistributedStream, ParallelStream};

pub struct ResultExpand<T, E>(pub Result<T, E>);
impl<T, E> IntoIterator for ResultExpand<T, E>
Expand Down Expand Up @@ -110,61 +110,6 @@ where
}
}

#[doc(hidden)]
pub struct ImplDistributedStream<T>(PhantomData<fn() -> T>);
impl<T> ImplDistributedStream<T> {
pub fn new<U>(_drop: U) -> Self
where
U: DistributedStream<Item = T>,
{
Self(PhantomData)
}
}
impl<T: 'static> DistributedStream for ImplDistributedStream<T> {
type Item = T;
type Task = ImplTask<T>;

fn size_hint(&self) -> (usize, Option<usize>) {
unreachable!()
}
fn next_task(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Task>> {
unreachable!()
}
}
impl<T: 'static> ParallelStream for ImplDistributedStream<T> {
type Item = T;
type Task = ImplTask<T>;

fn size_hint(&self) -> (usize, Option<usize>) {
unreachable!()
}
fn next_task(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Task>> {
unreachable!()
}
}

#[doc(hidden)]
#[derive(Serialize, Deserialize)]
pub struct ImplTask<T>(PhantomData<fn() -> T>);
impl<T> StreamTask for ImplTask<T>
where
T: 'static,
{
type Item = T;
type Async = ImplTask<T>;

fn into_async(self) -> Self::Async {
self
}
}
impl<T: 'static> Stream for ImplTask<T> {
type Item = T;

fn poll_next(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Item>> {
unreachable!()
}
}

// This is a dumb hack to avoid triggering https://github.com/rust-lang/rust/issues/48214 in amadeus-derive: see https://github.com/taiki-e/pin-project/issues/102#issuecomment-540472282
#[doc(hidden)]
#[repr(transparent)]
Expand Down
2 changes: 1 addition & 1 deletion amadeus-derive/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "amadeus-derive"
version = "0.3.5"
version = "0.3.6"
license = "Apache-2.0"
authors = ["Alec Mocatta <[email protected]>"]
categories = ["concurrency", "science", "database", "parser-implementations", "text-processing"]
Expand Down
12 changes: 6 additions & 6 deletions amadeus-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
//!
//! This is a support crate of [Amadeus](https://github.com/constellation-rs/amadeus) and is not intended to be used directly. This macro is re-exposed as [`amadeus::data::Data`](https://docs.rs/amadeus/0.3/amadeus/data/derive.Data.html).

#![doc(html_root_url = "https://docs.rs/amadeus-derive/0.3.5")]
#![doc(html_root_url = "https://docs.rs/amadeus-derive/0.3.6")]
#![recursion_limit = "400"]
#![warn(
missing_copy_implementations,
Expand Down Expand Up @@ -262,7 +262,7 @@ fn impl_struct(
let mut parquet_derives = None;
if cfg!(feature = "parquet") {
parquet_includes = Some(quote! {
pub use ::amadeus_parquet::derive::{
pub use #amadeus_path::amadeus_parquet::derive::{
ParquetData, Repetition, ColumnReader, ParquetError, ParquetResult, ParquetSchema, Reader, DisplaySchemaGroup, ColumnPath, Type
};
});
Expand Down Expand Up @@ -384,7 +384,7 @@ fn impl_struct(
let mut postgres_derives = None;
if cfg!(feature = "postgres") {
postgres_includes = Some(quote! {
pub use ::amadeus_postgres::{Names,read_be_i32,read_value,_internal as postgres,PostgresData};
pub use #amadeus_path::amadeus_postgres::{Names,read_be_i32,read_value,_internal as postgres,PostgresData};
});
postgres_derives = Some(quote! {
#[automatically_derived]
Expand Down Expand Up @@ -436,7 +436,7 @@ fn impl_struct(
let mut serde_derives = None;
if cfg!(feature = "serde") {
serde_includes = Some(quote! {
pub use ::amadeus_serde::{SerdeData,_internal::{Serialize, Deserialize, Serializer, Deserializer}};
pub use #amadeus_path::amadeus_serde::{SerdeData, _internal::{Serialize, Deserialize, Serializer, Deserializer}};
pub use #amadeus_path::data::serde_data;
});
serde_derives = Some(quote! {
Expand Down Expand Up @@ -471,8 +471,8 @@ fn impl_struct(
#parquet_includes
#postgres_includes
#serde_includes
pub use ::amadeus_core::util::Wrapper;
pub use ::amadeus_types::{AmadeusOrd, Data as CoreData, DowncastFrom, Downcast, DowncastError, Value, Group, SchemaIncomplete, ListVec, __internal::{Serialize as Serialize_, Deserialize as Deserialize_, Serializer as Serializer_, Deserializer as Deserializer_, SerializeTuple, Error as SerdeError, Visitor, SeqAccess}};
pub use #amadeus_path::amadeus_core::util::Wrapper;
pub use #amadeus_path::amadeus_types::{AmadeusOrd, Data as CoreData, DowncastFrom, Downcast, DowncastError, Value, Group, SchemaIncomplete, ListVec, __internal::{Serialize as Serialize_, Deserialize as Deserialize_, Serializer as Serializer_, Deserializer as Deserializer_, SerializeTuple, Error as SerdeError, Visitor, SeqAccess}};
pub use #amadeus_path::data::Data;
pub use ::std::{borrow::ToOwned, boxed::Box, clone::Clone, collections::HashMap, convert::{From, Into}, cmp::{Ordering, PartialEq}, default::Default, error::Error, fmt::{self, Debug, Write}, format, hash::{Hash, Hasher}, iter::{ExactSizeIterator, IntoIterator, Iterator}, marker::{PhantomData, Send, Sized, Sync}, result::Result::{self, Ok, Err}, string::String, panic, vec, vec::{IntoIter, Vec}, option::Option::{self, Some, None}};
}
Expand Down
Loading