Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/main' into alamb/unified_rg_and_…
Browse files Browse the repository at this point in the history
…page
  • Loading branch information
alamb committed Jun 3, 2024
2 parents 3bd9b04 + e4f7b98 commit aed543e
Show file tree
Hide file tree
Showing 69 changed files with 2,921 additions and 1,594 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
rand = "0.8"
regex = "1.8"
rstest = "0.20.0"
rstest = "0.21.0"
serde_json = "1"
sqlparser = { version = "0.45.0", features = ["visitor"] }
tempfile = "3"
Expand Down
1 change: 1 addition & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion datafusion-examples/examples/advanced_udaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ impl AggregateUDFImpl for GeoMeanUdaf {
true
}

fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
Ok(Box::new(GeometricMeanGroupsAccumulator::new()))
}
}
Expand Down
6 changes: 5 additions & 1 deletion datafusion-examples/examples/simplify_udaf_expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,13 @@ impl AggregateUDFImpl for BetterAvgUdaf {
true
}

fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
fn create_groups_accumulator(
&self,
_args: AccumulatorArgs,
) -> Result<Box<dyn GroupsAccumulator>> {
unimplemented!("should not get here");
}

// we override method, to return new expression which would substitute
// user defined function call
fn simplify(&self) -> Option<AggregateFunctionSimplification> {
Expand Down
134 changes: 134 additions & 0 deletions datafusion/common/src/utils/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This module provides a function to estimate the memory size of a HashTable prior to alloaction

use crate::{DataFusionError, Result};

/// Estimates the memory size required for a hash table prior to allocation.
///
/// # Parameters
/// - `num_elements`: The number of elements expected in the hash table.
/// - `fixed_size`: A fixed overhead size associated with the collection
/// (e.g., HashSet or HashTable).
/// - `T`: The type of elements stored in the hash table.
///
/// # Details
/// This function calculates the estimated memory size by considering:
/// - An overestimation of buckets to keep approximately 1/8 of them empty.
/// - The total memory size is computed as:
/// - The size of each entry (`T`) multiplied by the estimated number of
/// buckets.
/// - One byte overhead for each bucket.
/// - The fixed size overhead of the collection.
/// - If the estimation overflows, we return a [`DataFusionError`]
///
/// # Examples
/// ---
///
/// ## From within a struct
///
/// ```rust
/// # use datafusion_common::utils::memory::estimate_memory_size;
/// # use datafusion_common::Result;
///
/// struct MyStruct<T> {
/// values: Vec<T>,
/// other_data: usize,
/// }
///
/// impl<T> MyStruct<T> {
/// fn size(&self) -> Result<usize> {
/// let num_elements = self.values.len();
/// let fixed_size = std::mem::size_of_val(self) +
/// std::mem::size_of_val(&self.values);
///
/// estimate_memory_size::<T>(num_elements, fixed_size)
/// }
/// }
/// ```
/// ---
/// ## With a simple collection
///
/// ```rust
/// # use datafusion_common::utils::memory::estimate_memory_size;
/// # use std::collections::HashMap;
///
/// let num_rows = 100;
/// let fixed_size = std::mem::size_of::<HashMap<u64, u64>>();
/// let estimated_hashtable_size =
/// estimate_memory_size::<(u64, u64)>(num_rows,fixed_size)
/// .expect("Size estimation failed");
/// ```
pub fn estimate_memory_size<T>(num_elements: usize, fixed_size: usize) -> Result<usize> {
// For the majority of cases hashbrown overestimates the bucket quantity
// to keep ~1/8 of them empty. We take this factor into account by
// multiplying the number of elements with a fixed ratio of 8/7 (~1.14).
// This formula leads to overallocation for small tables (< 8 elements)
// but should be fine overall.
num_elements
.checked_mul(8)
.and_then(|overestimate| {
let estimated_buckets = (overestimate / 7).next_power_of_two();
// + size of entry * number of buckets
// + 1 byte for each bucket
// + fixed size of collection (HashSet/HashTable)
std::mem::size_of::<T>()
.checked_mul(estimated_buckets)?
.checked_add(estimated_buckets)?
.checked_add(fixed_size)
})
.ok_or_else(|| {
DataFusionError::Execution(
"usize overflow while estimating the number of buckets".to_string(),
)
})
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use super::estimate_memory_size;

#[test]
fn test_estimate_memory() {
// size (bytes): 48
let fixed_size = std::mem::size_of::<HashSet<u32>>();

// estimated buckets: 16 = (8 * 8 / 7).next_power_of_two()
let num_elements = 8;
// size (bytes): 128 = 16 * 4 + 16 + 48
let estimated = estimate_memory_size::<u32>(num_elements, fixed_size).unwrap();
assert_eq!(estimated, 128);

// estimated buckets: 64 = (40 * 8 / 7).next_power_of_two()
let num_elements = 40;
// size (bytes): 368 = 64 * 4 + 64 + 48
let estimated = estimate_memory_size::<u32>(num_elements, fixed_size).unwrap();
assert_eq!(estimated, 368);
}

#[test]
fn test_estimate_memory_overflow() {
let num_elements = usize::MAX;
let fixed_size = std::mem::size_of::<HashSet<u32>>();
let estimated = estimate_memory_size::<u32>(num_elements, fixed_size);

assert!(estimated.is_err());
}
}
1 change: 1 addition & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! This module provides the bisect function, which implements binary search.

pub mod memory;
pub mod proxy;

use crate::error::{_internal_datafusion_err, _internal_err};
Expand Down
8 changes: 4 additions & 4 deletions datafusion/core/src/dataframe/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ use datafusion_expr::{
avg, count, max, min, stddev, utils::COUNT_STAR_EXPANSION,
TableProviderFilterPushDown, UNNAMED_TABLE,
};
use datafusion_expr::{case, is_null, sum};
use datafusion_expr::{case, is_null};
use datafusion_functions_aggregate::expr_fn::median;
use datafusion_functions_aggregate::expr_fn::sum;

use async_trait::async_trait;

Expand Down Expand Up @@ -1593,9 +1594,8 @@ mod tests {
use datafusion_common::{Constraint, Constraints};
use datafusion_common_runtime::SpawnedTask;
use datafusion_expr::{
array_agg, cast, count_distinct, create_udf, expr, lit, sum,
BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, WindowFrame,
WindowFunctionDefinition,
array_agg, cast, count_distinct, create_udf, expr, lit, BuiltInWindowFunction,
ScalarFunctionImplementation, Volatility, WindowFrame, WindowFunctionDefinition,
};
use datafusion_physical_expr::expressions::Column;
use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
Expand Down
Loading

0 comments on commit aed543e

Please sign in to comment.