Skip to content

Commit

Permalink
Minor: Refactor memory size estimation for HashTable (#10748)
Browse files Browse the repository at this point in the history
* refactor: extract estimate_memory_size

* refactor: cap at usize::MAX

* refactor: use estimate_memory_size

* chore: add examples

* refactor: return Result<usize>; add testcase

* fix: docs

* fix: remove unneccessary checked_div

* fix: remove additional and_then
  • Loading branch information
marvinlanhenke committed Jun 3, 2024
1 parent eabbd28 commit a92f803
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 42 deletions.
134 changes: 134 additions & 0 deletions datafusion/common/src/utils/memory.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! This module provides a function to estimate the memory size of a HashTable prior to alloaction

use crate::{DataFusionError, Result};

/// Estimates the memory size required for a hash table prior to allocation.
///
/// # Parameters
/// - `num_elements`: The number of elements expected in the hash table.
/// - `fixed_size`: A fixed overhead size associated with the collection
/// (e.g., HashSet or HashTable).
/// - `T`: The type of elements stored in the hash table.
///
/// # Details
/// This function calculates the estimated memory size by considering:
/// - An overestimation of buckets to keep approximately 1/8 of them empty.
/// - The total memory size is computed as:
/// - The size of each entry (`T`) multiplied by the estimated number of
/// buckets.
/// - One byte overhead for each bucket.
/// - The fixed size overhead of the collection.
/// - If the estimation overflows, we return a [`DataFusionError`]
///
/// # Examples
/// ---
///
/// ## From within a struct
///
/// ```rust
/// # use datafusion_common::utils::memory::estimate_memory_size;
/// # use datafusion_common::Result;
///
/// struct MyStruct<T> {
/// values: Vec<T>,
/// other_data: usize,
/// }
///
/// impl<T> MyStruct<T> {
/// fn size(&self) -> Result<usize> {
/// let num_elements = self.values.len();
/// let fixed_size = std::mem::size_of_val(self) +
/// std::mem::size_of_val(&self.values);
///
/// estimate_memory_size::<T>(num_elements, fixed_size)
/// }
/// }
/// ```
/// ---
/// ## With a simple collection
///
/// ```rust
/// # use datafusion_common::utils::memory::estimate_memory_size;
/// # use std::collections::HashMap;
///
/// let num_rows = 100;
/// let fixed_size = std::mem::size_of::<HashMap<u64, u64>>();
/// let estimated_hashtable_size =
/// estimate_memory_size::<(u64, u64)>(num_rows,fixed_size)
/// .expect("Size estimation failed");
/// ```
pub fn estimate_memory_size<T>(num_elements: usize, fixed_size: usize) -> Result<usize> {
// For the majority of cases hashbrown overestimates the bucket quantity
// to keep ~1/8 of them empty. We take this factor into account by
// multiplying the number of elements with a fixed ratio of 8/7 (~1.14).
// This formula leads to overallocation for small tables (< 8 elements)
// but should be fine overall.
num_elements
.checked_mul(8)
.and_then(|overestimate| {
let estimated_buckets = (overestimate / 7).next_power_of_two();
// + size of entry * number of buckets
// + 1 byte for each bucket
// + fixed size of collection (HashSet/HashTable)
std::mem::size_of::<T>()
.checked_mul(estimated_buckets)?
.checked_add(estimated_buckets)?
.checked_add(fixed_size)
})
.ok_or_else(|| {
DataFusionError::Execution(
"usize overflow while estimating the number of buckets".to_string(),
)
})
}

#[cfg(test)]
mod tests {
use std::collections::HashSet;

use super::estimate_memory_size;

#[test]
fn test_estimate_memory() {
// size (bytes): 48
let fixed_size = std::mem::size_of::<HashSet<u32>>();

// estimated buckets: 16 = (8 * 8 / 7).next_power_of_two()
let num_elements = 8;
// size (bytes): 128 = 16 * 4 + 16 + 48
let estimated = estimate_memory_size::<u32>(num_elements, fixed_size).unwrap();
assert_eq!(estimated, 128);

// estimated buckets: 64 = (40 * 8 / 7).next_power_of_two()
let num_elements = 40;
// size (bytes): 368 = 64 * 4 + 64 + 48
let estimated = estimate_memory_size::<u32>(num_elements, fixed_size).unwrap();
assert_eq!(estimated, 368);
}

#[test]
fn test_estimate_memory_overflow() {
let num_elements = usize::MAX;
let fixed_size = std::mem::size_of::<HashSet<u32>>();
let estimated = estimate_memory_size::<u32>(num_elements, fixed_size);

assert!(estimated.is_err());
}
}
1 change: 1 addition & 0 deletions datafusion/common/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! This module provides the bisect function, which implements binary search.

pub mod memory;
pub mod proxy;

use crate::error::{_internal_datafusion_err, _internal_err};
Expand Down
35 changes: 11 additions & 24 deletions datafusion/physical-expr/src/aggregate/count_distinct/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use arrow_schema::DataType;

use datafusion_common::cast::{as_list_array, as_primitive_array};
use datafusion_common::utils::array_into_list_array;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::ScalarValue;
use datafusion_expr::Accumulator;

Expand Down Expand Up @@ -115,18 +116,11 @@ where
}

fn size(&self) -> usize {
let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX)
/ 7)
.next_power_of_two();

// Size of accumulator
// + size of entry * number of buckets
// + 1 byte for each bucket
// + fixed size of HashSet
std::mem::size_of_val(self)
+ std::mem::size_of::<T::Native>() * estimated_buckets
+ estimated_buckets
+ std::mem::size_of_val(&self.values)
let num_elements = self.values.len();
let fixed_size =
std::mem::size_of_val(self) + std::mem::size_of_val(&self.values);

estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
}
}

Expand Down Expand Up @@ -202,17 +196,10 @@ where
}

fn size(&self) -> usize {
let estimated_buckets = (self.values.len().checked_mul(8).unwrap_or(usize::MAX)
/ 7)
.next_power_of_two();

// Size of accumulator
// + size of entry * number of buckets
// + 1 byte for each bucket
// + fixed size of HashSet
std::mem::size_of_val(self)
+ std::mem::size_of::<T::Native>() * estimated_buckets
+ estimated_buckets
+ std::mem::size_of_val(&self.values)
let num_elements = self.values.len();
let fixed_size =
std::mem::size_of_val(self) + std::mem::size_of_val(&self.values);

estimate_memory_size::<T::Native>(num_elements, fixed_size).unwrap()
}
}
25 changes: 7 additions & 18 deletions datafusion/physical-plan/src/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
//! [`HashJoinExec`] Partitioned Hash Join Operator

use std::fmt;
use std::mem::size_of;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
Expand Down Expand Up @@ -59,6 +58,7 @@ use arrow::record_batch::RecordBatch;
use arrow::util::bit_util;
use arrow_array::cast::downcast_array;
use arrow_schema::ArrowError;
use datafusion_common::utils::memory::estimate_memory_size;
use datafusion_common::{
internal_datafusion_err, internal_err, plan_err, project_schema, DataFusionError,
JoinSide, JoinType, Result,
Expand Down Expand Up @@ -875,23 +875,12 @@ async fn collect_left_input(

// Estimation of memory size, required for hashtable, prior to allocation.
// Final result can be verified using `RawTable.allocation_info()`
//
// For majority of cases hashbrown overestimates buckets qty to keep ~1/8 of them empty.
// This formula leads to overallocation for small tables (< 8 elements) but fine overall.
let estimated_buckets = (num_rows.checked_mul(8).ok_or_else(|| {
DataFusionError::Execution(
"usize overflow while estimating number of hasmap buckets".to_string(),
)
})? / 7)
.next_power_of_two();
// 16 bytes per `(u64, u64)`
// + 1 byte for each bucket
// + fixed size of JoinHashMap (RawTable + Vec)
let estimated_hastable_size =
16 * estimated_buckets + estimated_buckets + size_of::<JoinHashMap>();

reservation.try_grow(estimated_hastable_size)?;
metrics.build_mem_used.add(estimated_hastable_size);
let fixed_size = std::mem::size_of::<JoinHashMap>();
let estimated_hashtable_size =
estimate_memory_size::<(u64, u64)>(num_rows, fixed_size)?;

reservation.try_grow(estimated_hashtable_size)?;
metrics.build_mem_used.add(estimated_hashtable_size);

let mut hashmap = JoinHashMap::with_capacity(num_rows);
let mut hashes_buffer = Vec::new();
Expand Down

0 comments on commit a92f803

Please sign in to comment.