Skip to content

Commit

Permalink
refactor: Bump OpenDAL 0.46, arrow 51, tonic 0.11, reqwest 0.12, hype…
Browse files Browse the repository at this point in the history
…r 1, http 1 (#15442)

* Save current work

Signed-off-by: Xuanwo <[email protected]>

* Refactor layer

Signed-off-by: Xuanwo <[email protected]>

* Save work

Signed-off-by: Xuanwo <[email protected]>

* Save current work

Signed-off-by: Xuanwo <[email protected]>

* Build pass

Signed-off-by: Xuanwo <[email protected]>

* cargo fix

Signed-off-by: Xuanwo <[email protected]>

* cargo check pass

Signed-off-by: Xuanwo <[email protected]>

* Cleanup deps

Signed-off-by: Xuanwo <[email protected]>

* Format files

Signed-off-by: Xuanwo <[email protected]>

* Fix bytes reader use too small range

Signed-off-by: Xuanwo <[email protected]>

* Split read offset and consume offset

Signed-off-by: Xuanwo <[email protected]>

* Fix input pipeline

Signed-off-by: Xuanwo <[email protected]>

* leave a todo here

Signed-off-by: Xuanwo <[email protected]>

* Fix eof not calculated correctly

Signed-off-by: Xuanwo <[email protected]>

* Fix offset check

Signed-off-by: Xuanwo <[email protected]>

* Try concurrent load

Signed-off-by: Xuanwo <[email protected]>

* format toml

Signed-off-by: Xuanwo <[email protected]>

* Let's try use opendal's async read

Signed-off-by: Xuanwo <[email protected]>

* reduce to 2 concurrent

Signed-off-by: Xuanwo <[email protected]>

* Also fix support for input pipeline

Signed-off-by: Xuanwo <[email protected]>

* try 4 concurrent

Signed-off-by: Xuanwo <[email protected]>

* Remove an extra head

Signed-off-by: Xuanwo <[email protected]>

---------

Signed-off-by: Xuanwo <[email protected]>
  • Loading branch information
Xuanwo authored May 11, 2024
1 parent 5698d16 commit 220787d
Show file tree
Hide file tree
Showing 107 changed files with 1,389 additions and 1,284 deletions.
834 changes: 538 additions & 296 deletions Cargo.lock

Large diffs are not rendered by default.

55 changes: 33 additions & 22 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,22 @@ members = [
# databend maintains
jsonb = { git = "https://github.com/datafuselabs/jsonb", rev = "3fe3acd" }

opendal = { version = "0.45.1", features = [
opendal = { version = "0.46.0", features = [
"layers-minitrace",
"layers-prometheus-client",
"layers-async-backtrace",
"services-s3",
"services-fs",
"services-gcs",
"services-cos",
"services-obs",
"services-oss",
"services-azblob",
"services-azdls",
"services-ipfs",
"services-http",
"services-moka",
"services-webhdfs",
"services-huggingface",
] }
sled = { git = "https://github.com/datafuse-extras/sled", tag = "v0.34.7-datafuse.1", default-features = false }
Expand All @@ -135,6 +145,7 @@ openraft = { version = "0.9.9", features = [
] }

# Core crates and utilities
base64 = "0.22"
async-backtrace = "0.2"
async-trait = { version = "0.1.77", package = "async-trait-fn" }
bincode = { version = "2.0.0-rc.3", features = ["serde", "std", "alloc"] }
Expand Down Expand Up @@ -164,23 +175,25 @@ poem = { version = "~1.3.57", features = ["rustls", "multipart", "compression"]
prometheus-client = "0.22"
rand = { version = "0.8.5", features = ["small_rng"] }
regex = "1.8.1"
reqwest = { version = "0.11.19", default-features = false, features = [
reqwest = { version = "0.12", default-features = false, features = [
"json",
"http2",
"rustls-tls",
"rustls-tls-native-roots",
] }
reqwest-hickory-resolver = "0.0.2"
reqwest-hickory-resolver = "0.1"
semver = "1.0.14"
serfig = "0.1.0"
tantivy = "0.22.0"
tokio = { version = "1.35.0", features = ["full"] }
tokio-stream = "0.1.11"
tonic = { version = "0.10.2", features = ["transport", "codegen", "prost", "tls-roots", "tls"] }
tonic-reflection = { version = "0.10.2" }
tonic = { version = "0.11.0", features = ["transport", "codegen", "prost", "tls-roots", "tls"] }
tonic-reflection = { version = "0.11.0" }
typetag = "0.2.3"
uuid = { version = "1.1.2", features = ["serde", "v4"] }
walkdir = "2.3.2"
derive-visitor = "0.3.0"
http = "1"

# Synchronization
dashmap = "5.4.0"
Expand All @@ -200,19 +213,19 @@ anyhow = { version = "1.0.65" }
thiserror = { version = "1" }

# Crates from arrow-rs
arrow = { version = "50" }
arrow-array = { version = "50" }
arrow-buffer = { version = "50" }
arrow-cast = { version = "50", features = ["prettyprint"] }
arrow-data = { version = "50" }
arrow-flight = { version = "50", features = ["flight-sql-experimental", "tls"] }
arrow = { version = "51" }
arrow-array = { version = "51" }
arrow-buffer = { version = "51" }
arrow-cast = { version = "51", features = ["prettyprint"] }
arrow-data = { version = "51" }
arrow-flight = { version = "51", features = ["flight-sql-experimental", "tls"] }
arrow-format = { version = "0.8.1", features = ["flight-data", "flight-service", "ipc"] }
arrow-ipc = { version = "50" }
arrow-ord = { version = "50" }
arrow-schema = { version = "50", features = ["serde"] }
arrow-select = { version = "50" }
parquet = { version = "50", features = ["async"] }
parquet_rs = { package = "parquet", version = "50" }
arrow-ipc = { version = "51" }
arrow-ord = { version = "51" }
arrow-schema = { version = "51", features = ["serde"] }
arrow-select = { version = "51" }
parquet = { version = "51", features = ["async"] }
parquet_rs = { package = "parquet", version = "51" }

# Crates from risingwavelabs
arrow-udf-js = { package = "arrow-udf-js", git = "https://github.com/datafuse-extras/arrow-udf", rev = "a8fdfdd" }
Expand Down Expand Up @@ -272,15 +285,13 @@ rpath = false

[patch.crates-io]
# If there are dependencies that need patching, they can be listed below.
arrow-format = { git = "https://github.com/everpcpc/arrow-format", rev = "ad8f2dd" }
# Changes to upstream: update to tonic 0.11
arrow-format = { git = "https://github.com/Xuanwo/arrow-format", rev = "be633a0" }
metrics = { git = "https://github.com/datafuse-extras/metrics.git", rev = "fc2ecd1" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "54fd72f" }
icelake = { git = "https://github.com/icelake-io/icelake", rev = "be8b2c2" }
micromarshal = { git = "https://github.com/ariesdevil/opensrv", rev = "6c96813" }
async-backtrace = { git = "https://github.com/zhang2014/async-backtrace.git", rev = "dea4553" }
z3 = { git = "https://github.com/prove-rs/z3.rs", rev = "247d308" }
z3-sys = { git = "https://github.com/prove-rs/z3.rs", rev = "247d308" }
geozero = { git = "https://github.com/georust/geozero", rev = "1d78b36" }
# Hot fix for cos, remove this during opendal 0.46 upgrade.
opendal = { git = "https://github.com/Xuanwo/opendal", rev = "53377ca" }
reqsign = { git = "https://github.com/Xuanwo/reqsign", rev = "122dc12" }
# proj = { git = "https://github.com/ariesdevil/proj", rev = "51e1c60" }
10 changes: 5 additions & 5 deletions src/binaries/tool/table_meta_inspector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ use serde::Deserialize;
use serde::Serialize;
use serfig::collectors::from_file;
use serfig::parsers::Toml;
use tokio::io::AsyncReadExt;

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Parser)]
#[clap(about, version = &**DATABEND_COMMIT_VERSION, author)]
Expand Down Expand Up @@ -78,7 +77,6 @@ fn parse_output(config: &InspectorConfig) -> Result<Box<dyn Write>> {
}

async fn parse_input_data(config: &InspectorConfig) -> Result<Vec<u8>> {
let mut buffer: Vec<u8> = vec![];
match &config.input {
Some(input) => {
let op = match &config.config {
Expand All @@ -98,15 +96,17 @@ async fn parse_input_data(config: &InspectorConfig) -> Result<Vec<u8>> {
Operator::new(builder)?.finish()
}
};
op.reader(input).await?.read_to_end(&mut buffer).await?;
let buf = op.read(input).await?.to_vec();
Ok(buf)
}
None => {
let mut buffer: Vec<u8> = vec![];
let stdin = io::stdin();
let handle = stdin.lock();
io::BufReader::new(handle).read_to_end(&mut buffer)?;
Ok(buffer)
}
};
Ok(buffer)
}
}

async fn run(config: &InspectorConfig) -> Result<()> {
Expand Down
1 change: 1 addition & 0 deletions src/common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ bytes = "^1"
indexmap = "2.2.3"
log = { workspace = true }
num = { version = "0.4", default-features = false, features = ["std"] }
opendal = { workspace = true }
ordered-float = "3.7.0"
ringbuffer = "0.14.2"
roaring = "0.10.1"
Expand Down
2 changes: 1 addition & 1 deletion src/common/arrow/src/arrow/datatypes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl From<arrow_schema::DataType> for DataType {
}
DataType::Decimal128(precision, scale) => Self::Decimal(precision as _, scale as _),
DataType::Decimal256(precision, scale) => Self::Decimal256(precision as _, scale as _),
DataType::RunEndEncoded(_, _) => panic!("Run-end encoding not supported by arrow2"),
v => panic!("{:?} encoding not supported by arrow2", v),
}
}
}
Expand Down
10 changes: 2 additions & 8 deletions src/common/arrow/src/arrow/io/parquet/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ pub use deserialize::NestedState;
pub use deserialize::StructIterator;
pub use file::FileReader;
pub use file::RowGroupReader;
#[cfg(feature = "io_parquet_async")]
use futures::AsyncRead;
#[cfg(feature = "io_parquet_async")]
use futures::AsyncSeek;
pub use parquet2::error::Error as ParquetError;
pub use parquet2::fallible_streaming_iterator;
pub use parquet2::metadata::ColumnChunkMetaData;
Expand Down Expand Up @@ -103,10 +99,8 @@ pub fn read_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetaData> {
/// Reads parquets' metadata asynchronously.
#[cfg(feature = "io_parquet_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_parquet_async")))]
pub async fn read_metadata_async<R: AsyncRead + AsyncSeek + Send + Unpin>(
reader: &mut R,
) -> Result<FileMetaData> {
Ok(_read_metadata_async(reader).await?)
pub async fn read_metadata_async(reader: opendal::Reader, file_size: u64) -> Result<FileMetaData> {
Ok(_read_metadata_async(reader, file_size).await?)
}

fn convert_days_ms(value: &[u8]) -> crate::arrow::types::days_ms {
Expand Down
63 changes: 34 additions & 29 deletions src/common/arrow/src/arrow/temporal_conversions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use chrono::format::parse;
use chrono::format::Parsed;
use chrono::format::StrftimeItems;
use chrono::DateTime;
use chrono::Datelike;
use chrono::Duration;
use chrono::FixedOffset;
Expand Down Expand Up @@ -56,7 +57,7 @@ pub fn date32_to_datetime(v: i32) -> NaiveDateTime {
/// converts a `i32` representing a `date32` to [`NaiveDateTime`]
#[inline]
pub fn date32_to_datetime_opt(v: i32) -> Option<NaiveDateTime> {
NaiveDateTime::from_timestamp_opt(v as i64 * SECONDS_IN_DAY, 0)
DateTime::from_timestamp(v as i64 * SECONDS_IN_DAY, 0).map(|v| v.naive_utc())
}

/// converts a `i32` representing a `date32` to [`NaiveDate`]
Expand All @@ -74,13 +75,14 @@ pub fn date32_to_date_opt(days: i32) -> Option<NaiveDate> {
/// converts a `i64` representing a `date64` to [`NaiveDateTime`]
#[inline]
pub fn date64_to_datetime(v: i64) -> NaiveDateTime {
NaiveDateTime::from_timestamp_opt(
DateTime::from_timestamp(
// extract seconds from milliseconds
v / MILLISECONDS,
// discard extracted seconds and convert milliseconds to nanoseconds
(v % MILLISECONDS * MICROSECONDS) as u32,
)
.expect("invalid or out-of-range datetime")
.naive_utc()
}

/// converts a `i64` representing a `date64` to [`NaiveDate`]
Expand Down Expand Up @@ -175,7 +177,7 @@ pub fn timestamp_s_to_datetime(seconds: i64) -> NaiveDateTime {
/// converts a `i64` representing a `timestamp(s)` to [`NaiveDateTime`]
#[inline]
pub fn timestamp_s_to_datetime_opt(seconds: i64) -> Option<NaiveDateTime> {
NaiveDateTime::from_timestamp_opt(seconds, 0)
DateTime::from_timestamp(seconds, 0).map(|v| v.naive_utc())
}

/// converts a `i64` representing a `timestamp(ms)` to [`NaiveDateTime`]
Expand All @@ -187,8 +189,8 @@ pub fn timestamp_ms_to_datetime(v: i64) -> NaiveDateTime {
/// converts a `i64` representing a `timestamp(ms)` to [`NaiveDateTime`]
#[inline]
pub fn timestamp_ms_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
if v >= 0 {
NaiveDateTime::from_timestamp_opt(
let t = if v >= 0 {
DateTime::from_timestamp(
// extract seconds from milliseconds
v / MILLISECONDS,
// discard extracted seconds and convert milliseconds to nanoseconds
Expand All @@ -198,16 +200,18 @@ pub fn timestamp_ms_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
let secs_rem = (v / MILLISECONDS, v % MILLISECONDS);
if secs_rem.1 == 0 {
// whole/integer seconds; no adjustment required
NaiveDateTime::from_timestamp_opt(secs_rem.0, 0)
DateTime::from_timestamp(secs_rem.0, 0)
} else {
// negative values with fractional seconds require 'div_floor' rounding behaviour.
// (which isn't yet stabilised: https://github.com/rust-lang/rust/issues/88581)
NaiveDateTime::from_timestamp_opt(
DateTime::from_timestamp(
secs_rem.0 - 1,
(NANOSECONDS + (v % MILLISECONDS * MICROSECONDS)) as u32,
)
}
}
};

t.map(|v| v.naive_utc())
}

/// converts a `i64` representing a `timestamp(us)` to [`NaiveDateTime`]
Expand All @@ -219,8 +223,8 @@ pub fn timestamp_us_to_datetime(v: i64) -> NaiveDateTime {
/// converts a `i64` representing a `timestamp(us)` to [`NaiveDateTime`]
#[inline]
pub fn timestamp_us_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
if v >= 0 {
NaiveDateTime::from_timestamp_opt(
let t = if v >= 0 {
DateTime::from_timestamp(
// extract seconds from microseconds
v / MICROSECONDS,
// discard extracted seconds and convert microseconds to nanoseconds
Expand All @@ -230,16 +234,18 @@ pub fn timestamp_us_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
let secs_rem = (v / MICROSECONDS, v % MICROSECONDS);
if secs_rem.1 == 0 {
// whole/integer seconds; no adjustment required
NaiveDateTime::from_timestamp_opt(secs_rem.0, 0)
DateTime::from_timestamp(secs_rem.0, 0)
} else {
// negative values with fractional seconds require 'div_floor' rounding behaviour.
// (which isn't yet stabilised: https://github.com/rust-lang/rust/issues/88581)
NaiveDateTime::from_timestamp_opt(
DateTime::from_timestamp(
secs_rem.0 - 1,
(NANOSECONDS + (v % MICROSECONDS * MILLISECONDS)) as u32,
)
}
}
};

t.map(|v| v.naive_utc())
}

/// converts a `i64` representing a `timestamp(ns)` to [`NaiveDateTime`]
Expand All @@ -251,8 +257,8 @@ pub fn timestamp_ns_to_datetime(v: i64) -> NaiveDateTime {
/// converts a `i64` representing a `timestamp(ns)` to [`NaiveDateTime`]
#[inline]
pub fn timestamp_ns_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
if v >= 0 {
NaiveDateTime::from_timestamp_opt(
let t = if v >= 0 {
DateTime::from_timestamp(
// extract seconds from nanoseconds
v / NANOSECONDS,
// discard extracted seconds
Expand All @@ -262,16 +268,15 @@ pub fn timestamp_ns_to_datetime_opt(v: i64) -> Option<NaiveDateTime> {
let secs_rem = (v / NANOSECONDS, v % NANOSECONDS);
if secs_rem.1 == 0 {
// whole/integer seconds; no adjustment required
NaiveDateTime::from_timestamp_opt(secs_rem.0, 0)
DateTime::from_timestamp(secs_rem.0, 0)
} else {
// negative values with fractional seconds require 'div_floor' rounding behaviour.
// (which isn't yet stabilised: https://github.com/rust-lang/rust/issues/88581)
NaiveDateTime::from_timestamp_opt(
secs_rem.0 - 1,
(NANOSECONDS + (v % NANOSECONDS)) as u32,
)
DateTime::from_timestamp(secs_rem.0 - 1, (NANOSECONDS + (v % NANOSECONDS)) as u32)
}
}
};

t.map(|v| v.naive_utc())
}

/// Converts a timestamp in `time_unit` and `timezone` into [`chrono::DateTime`].
Expand Down Expand Up @@ -405,10 +410,10 @@ pub fn utf8_to_naive_timestamp_scalar(value: &str, fmt: &str, tu: &TimeUnit) ->
parsed
.to_naive_datetime_with_offset(0)
.map(|x| match tu {
TimeUnit::Second => x.timestamp(),
TimeUnit::Millisecond => x.timestamp_millis(),
TimeUnit::Microsecond => x.timestamp_micros(),
TimeUnit::Nanosecond => x.timestamp_nanos_opt().unwrap(),
TimeUnit::Second => x.and_utc().timestamp(),
TimeUnit::Millisecond => x.and_utc().timestamp_millis(),
TimeUnit::Microsecond => x.and_utc().timestamp_micros(),
TimeUnit::Nanosecond => x.and_utc().timestamp_nanos_opt().unwrap(),
})
.ok()
}
Expand Down Expand Up @@ -531,10 +536,10 @@ pub fn add_naive_interval(timestamp: i64, time_unit: TimeUnit, interval: months_

// convert back to the target unit
match time_unit {
TimeUnit::Second => new_datetime_tz.timestamp_millis() / 1000,
TimeUnit::Millisecond => new_datetime_tz.timestamp_millis(),
TimeUnit::Microsecond => new_datetime_tz.timestamp_nanos_opt().unwrap() / 1000,
TimeUnit::Nanosecond => new_datetime_tz.timestamp_nanos_opt().unwrap(),
TimeUnit::Second => new_datetime_tz.and_utc().timestamp_millis() / 1000,
TimeUnit::Millisecond => new_datetime_tz.and_utc().timestamp_millis(),
TimeUnit::Microsecond => new_datetime_tz.and_utc().timestamp_nanos_opt().unwrap() / 1000,
TimeUnit::Nanosecond => new_datetime_tz.and_utc().timestamp_nanos_opt().unwrap(),
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/common/arrow/src/native/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ impl<R: std::io::Read> NativeReadBuf for BufReader<R> {
}
}

impl<R: bytes::Buf> NativeReadBuf for bytes::buf::Reader<R> {
fn buffer_bytes(&self) -> &[u8] {
self.get_ref().chunk()
}
}

impl NativeReadBuf for &[u8] {
fn buffer_bytes(&self) -> &[u8] {
self
Expand Down
7 changes: 0 additions & 7 deletions src/common/arrow/src/native/read/read_basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
use std::convert::TryInto;
use std::io::Read;

use futures::AsyncRead;
use futures::AsyncReadExt;
use parquet2::encoding::hybrid_rle::BitmapIter;
use parquet2::encoding::hybrid_rle::Decoder;
use parquet2::encoding::hybrid_rle::HybridEncoded;
Expand Down Expand Up @@ -188,8 +186,3 @@ pub fn read_u64<R: Read>(r: &mut R, buf: &mut [u8]) -> Result<u64> {
r.read_exact(buf)?;
Ok(u64::from_le_bytes(buf.try_into().unwrap()))
}

pub async fn read_u32_async<R: AsyncRead + Unpin>(r: &mut R, buf: &mut [u8]) -> Result<u32> {
r.read_exact(buf).await?;
Ok(u32::from_le_bytes(buf.try_into().unwrap()))
}
Loading

0 comments on commit 220787d

Please sign in to comment.