Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 27 additions & 7 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,8 @@ rpath = false
# If there are dependencies that need patching, they can be listed below.
# For example:
# arrow-format = { git = "https://github.com/datafuse-extras/arrow-format", rev = "78dacc1" }

arrow2 = { git = "https://github.com/jorgecarleitao/arrow2", rev = "211be21" }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "fb08b72" }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", rev = "ed0e1ff" }
limits-rs = { git = "https://github.com/datafuse-extras/limits-rs", rev = "abfcf7b" }
metrics = { git = "https://github.com/datafuse-extras/metrics.git", rev = "bc49d03" }
2 changes: 1 addition & 1 deletion src/common/arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ arrow = { package = "arrow2", version = "0.15.0", default-features = false, feat
arrow-format = { version = "0.8.0", features = ["flight-data", "flight-service", "ipc"] }
futures = "0.3.24"
native = { package = "strawboat", version = "0.1.0" }
parquet2 = { version = "0.17.0", default_features = false }
parquet2 = { version = "0.17.0", default_features = false, features = ["serde_types"] }

[dev-dependencies]
7 changes: 1 addition & 6 deletions src/query/storages/common/index/src/range_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::type_check::check_function;
use common_expression::types::nullable::NullableDomain;
Expand Down Expand Up @@ -46,7 +43,7 @@ pub struct RangeFilter {
}
impl RangeFilter {
pub fn try_create(
ctx: Arc<dyn TableContext>,
func_ctx: FunctionContext,
exprs: &[Expr<String>],
schema: TableSchemaRef,
) -> Result<Self> {
Expand All @@ -58,8 +55,6 @@ impl RangeFilter {
})
.unwrap();

let func_ctx = ctx.try_get_function_context()?;

let (new_expr, _) = ConstantFolder::fold(&conjunction, func_ctx, &BUILTIN_FUNCTIONS);

Ok(Self {
Expand Down
1 change: 0 additions & 1 deletion src/query/storages/common/pruner/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ doctest = false
test = false

[dependencies]
common-catalog = { path = "../../../../query/catalog" }
common-exception = { path = "../../../../common/exception" }
common-expression = { path = "../../../expression" }

Expand Down
6 changes: 3 additions & 3 deletions src/query/storages/common/pruner/src/range_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use std::sync::Arc;

use common_catalog::table_context::TableContext;
use common_exception::Result;
use common_expression::Expr;
use common_expression::FunctionContext;
use common_expression::TableSchemaRef;
use storages_common_index::RangeFilter;
use storages_common_table_meta::meta::StatisticsOfColumns;
Expand Down Expand Up @@ -62,13 +62,13 @@ impl RangePrunerCreator {
///
/// Note: the schema should be the schema of the table, not the schema of the input.
pub fn try_create<'a>(
ctx: &Arc<dyn TableContext>,
func_ctx: FunctionContext,
filter_expr: Option<&'a [Expr<String>]>,
schema: &'a TableSchemaRef,
) -> Result<Arc<dyn RangePruner + Send + Sync>> {
Ok(match filter_expr {
Some(exprs) if !exprs.is_empty() => {
let range_filter = RangeFilter::try_create(ctx.clone(), exprs, schema.clone())?;
let range_filter = RangeFilter::try_create(func_ctx, exprs, schema.clone())?;
match range_filter.try_eval_const() {
Ok(v) => {
if v {
Expand Down
6 changes: 5 additions & 1 deletion src/query/storages/fuse/src/pruning/pruning_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,11 @@ impl BlockPruner {

// prepare the range filter.
// if filter_expression is none, an dummy pruner will be returned, which prunes nothing
let range_pruner = RangePrunerCreator::try_create(ctx, filter_exprs.as_deref(), &schema)?;
let range_pruner = RangePrunerCreator::try_create(
ctx.try_get_function_context()?,
filter_exprs.as_deref(),
&schema,
)?;

// prepare the filter.
// None will be returned, if filter is not applicable (e.g. unsuitable filter expression, index not available, etc.)
Expand Down
7 changes: 5 additions & 2 deletions src/query/storages/hive/hive/src/hive_partition_pruner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,11 @@ impl HivePartitionPruner {
}

pub fn prune(&self, partitions: Vec<String>) -> Result<Vec<String>> {
let range_filter =
RangeFilter::try_create(self.ctx.clone(), &self.filters, self.full_schema.clone())?;
let range_filter = RangeFilter::try_create(
self.ctx.try_get_function_context()?,
&self.filters,
self.full_schema.clone(),
)?;
let column_stats = self.get_column_stats(&partitions)?;
let mut filted_partitions = vec![];
for (idx, stats) in column_stats.into_iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/hive/hive/src/hive_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl HiveTable {
});
let range_filter = match filter_expressions {
Some(exprs) if !exprs.is_empty() => Some(RangeFilter::try_create(
ctx.clone(),
ctx.try_get_function_context()?,
&exprs,
self.table_info.schema(),
)?),
Expand Down
3 changes: 3 additions & 0 deletions src/query/storages/parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,6 @@ glob = "0.3.0"
opendal = { workspace = true }
serde = { workspace = true }
typetag = "0.2.3"

[dev-dependencies]
common-sql = { path = "../../sql" }
8 changes: 4 additions & 4 deletions src/query/storages/parquet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
#![allow(clippy::uninlined_format_args)]
#![deny(unused_crate_dependencies)]

mod parquet_column;
mod parquet_part;
mod parquet_reader;
mod parquet_source;
mod pruning;
mod read_options;
mod statistics;
mod table_function;

pub use parquet_part::ParquetLocationPart;
pub use parquet_reader::ParquetReader;
pub use parquet_source::ParquetSource;
pub use read_options::ReadOptions;
pub use table_function::ParquetTable;
57 changes: 0 additions & 57 deletions src/query/storages/parquet/src/parquet_column.rs

This file was deleted.

48 changes: 5 additions & 43 deletions src/query/storages/parquet/src/parquet_part.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ use std::hash::Hash;
use std::hash::Hasher;
use std::sync::Arc;

use common_arrow::parquet::compression::Compression as ParquetCompression;
use common_arrow::parquet::compression::Compression;
use common_arrow::parquet::indexes::Interval;
use common_catalog::plan::PartInfo;
use common_catalog::plan::PartInfoPtr;
use common_exception::ErrorCode;
Expand Down Expand Up @@ -65,48 +66,6 @@ impl ParquetLocationPart {
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug, Eq, PartialEq, Hash, Clone, Copy)]
pub enum Compression {
Uncompressed,
Snappy,
Gzip,
Lzo,
Brotli,
Lz4,
Zstd,
Lz4Raw,
}

impl From<Compression> for ParquetCompression {
fn from(value: Compression) -> Self {
match value {
Compression::Uncompressed => ParquetCompression::Uncompressed,
Compression::Snappy => ParquetCompression::Snappy,
Compression::Gzip => ParquetCompression::Gzip,
Compression::Lzo => ParquetCompression::Lzo,
Compression::Brotli => ParquetCompression::Brotli,
Compression::Lz4 => ParquetCompression::Lz4,
Compression::Zstd => ParquetCompression::Zstd,
Compression::Lz4Raw => ParquetCompression::Lz4Raw,
}
}
}

impl From<ParquetCompression> for Compression {
fn from(value: ParquetCompression) -> Self {
match value {
ParquetCompression::Uncompressed => Compression::Uncompressed,
ParquetCompression::Snappy => Compression::Snappy,
ParquetCompression::Gzip => Compression::Gzip,
ParquetCompression::Lzo => Compression::Lzo,
ParquetCompression::Brotli => Compression::Brotli,
ParquetCompression::Lz4 => Compression::Lz4,
ParquetCompression::Zstd => Compression::Zstd,
ParquetCompression::Lz4Raw => Compression::Lz4Raw,
}
}
}

#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
pub struct ColumnMeta {
pub offset: u64,
Expand All @@ -119,6 +78,7 @@ pub struct ParquetRowGroupPart {
pub location: String,
pub num_rows: usize,
pub column_metas: HashMap<usize, ColumnMeta>,
pub row_selection: Option<Vec<Interval>>,
}

#[typetag::serde(name = "parquet_row_group")]
Expand Down Expand Up @@ -146,11 +106,13 @@ impl ParquetRowGroupPart {
location: String,
num_rows: usize,
column_metas: HashMap<usize, ColumnMeta>,
row_selection: Option<Vec<Interval>>,
) -> Arc<Box<dyn PartInfo>> {
Arc::new(Box::new(ParquetRowGroupPart {
location,
num_rows,
column_metas,
row_selection,
}))
}

Expand Down
Loading