Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 172 additions & 11 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,7 @@ enum BatchPartitionerState {
Hash {
exprs: Vec<Arc<dyn PhysicalExpr>>,
num_partitions: usize,
partition_reducer: StrengthReducedU64,
hash_buffer: Vec<u64>,
indices: Vec<Vec<u32>>,
},
Expand All @@ -438,6 +439,70 @@ enum BatchPartitionerState {
/// executions and runs.
pub const REPARTITION_RANDOM_STATE: SeededRandomState = SeededRandomState::with_seed(0);

#[derive(Debug, Clone, Copy)]
enum StrengthReducedU64 {
PowerOfTwo { mask: u64 },
Reciprocal { divisor: u64, reciprocal: u128 },
}

impl StrengthReducedU64 {
fn new(divisor: u64) -> Self {
debug_assert!(divisor > 0);

if divisor.is_power_of_two() {
Self::PowerOfTwo { mask: divisor - 1 }
} else {
Self::Reciprocal {
divisor,
// ceil(2^128 / divisor), computed without representing 2^128
reciprocal: u128::MAX / u128::from(divisor) + 1,
}
}
}

fn partition_indices(self, hash_buffer: &[u64], indices: &mut [Vec<u32>]) {
match self {
Self::PowerOfTwo { mask } => {
for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash & mask) as usize].push(index as u32);
}
}
Self::Reciprocal {
divisor,
reciprocal,
} => {
for (index, hash) in hash_buffer.iter().enumerate() {
let quotient = Self::quotient(*hash, reciprocal);
let partition = *hash - quotient * divisor;
indices[partition as usize].push(index as u32);
}
}
}
}

#[cfg(test)]
fn remainder(self, value: u64) -> u64 {
match self {
Self::PowerOfTwo { mask } => value & mask,
Self::Reciprocal {
divisor,
reciprocal,
} => value - Self::quotient(value, reciprocal) * divisor,
}
}

#[inline]
fn quotient(value: u64, reciprocal: u128) -> u64 {
let reciprocal_low = reciprocal as u64;
let reciprocal_high = (reciprocal >> 64) as u64;
let low_product = u128::from(value) * u128::from(reciprocal_low);
let high_product = u128::from(value) * u128::from(reciprocal_high);
let carry = ((high_product & u128::from(u64::MAX)) + (low_product >> 64)) >> 64;

((high_product >> 64) + carry) as u64
}
}

impl BatchPartitioner {
/// Create a new [`BatchPartitioner`] for hash-based repartitioning.
///
Expand All @@ -446,22 +511,27 @@ impl BatchPartitioner {
/// - `num_partitions`: Total number of output partitions.
/// - `timer`: Metric used to record time spent during repartitioning.
///
/// # Notes
/// This constructor cannot fail and performs no validation.
/// # Errors
/// Returns an error if `num_partitions` is zero.
pub fn new_hash_partitioner(
exprs: Vec<Arc<dyn PhysicalExpr>>,
num_partitions: usize,
timer: metrics::Time,
) -> Self {
Self {
) -> Result<Self> {
Comment thread
Dandandan marked this conversation as resolved.
if num_partitions == 0 {
return internal_err!("Hash repartition requires at least one partition");
}

Ok(Self {
state: BatchPartitionerState::Hash {
exprs,
num_partitions,
partition_reducer: StrengthReducedU64::new(num_partitions as u64),
hash_buffer: vec![],
indices: vec![vec![]; num_partitions],
},
timer,
}
})
}
Comment thread
Dandandan marked this conversation as resolved.

/// Create a new [`BatchPartitioner`] for round-robin repartitioning.
Expand Down Expand Up @@ -510,7 +580,7 @@ impl BatchPartitioner {
) -> Result<Self> {
match partitioning {
Partitioning::Hash(exprs, num_partitions) => {
Ok(Self::new_hash_partitioner(exprs, num_partitions, timer))
Self::new_hash_partitioner(exprs, num_partitions, timer)
}
Partitioning::RoundRobinBatch(num_partitions) => {
Ok(Self::new_round_robin_partitioner(
Expand Down Expand Up @@ -575,7 +645,8 @@ impl BatchPartitioner {
}
BatchPartitionerState::Hash {
exprs,
num_partitions: partitions,
num_partitions: _,
Comment thread
Dandandan marked this conversation as resolved.
Outdated
partition_reducer,
hash_buffer,
indices,
} => {
Expand All @@ -596,9 +667,7 @@ impl BatchPartitioner {

indices.iter_mut().for_each(|v| v.clear());

for (index, hash) in hash_buffer.iter().enumerate() {
indices[(*hash % *partitions as u64) as usize].push(index as u32);
}
partition_reducer.partition_indices(hash_buffer, indices);

// Finished building index-arrays for output partitions
timer.done();
Expand Down Expand Up @@ -1359,7 +1428,7 @@ impl RepartitionExec {
exprs.clone(),
*num_partitions,
metrics.repartition_time.clone(),
)
)?
}
Partitioning::RoundRobinBatch(num_partitions) => {
BatchPartitioner::new_round_robin_partitioner(
Expand Down Expand Up @@ -1785,6 +1854,98 @@ mod tests {
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
use insta::assert_snapshot;

#[test]
fn strength_reduced_u64_remainder_matches_modulo() {
let divisors = [
1,
2,
3,
4,
5,
7,
8,
10,
16,
31,
32,
63,
64,
65,
97,
u64::from(u32::MAX),
u64::from(u32::MAX) + 1,
1_u64 << 32,
(1_u64 << 63) - 1,
1_u64 << 63,
u64::MAX - 1,
u64::MAX,
];
let values = [
0,
1,
2,
3,
4,
5,
31,
32,
33,
63,
64,
65,
u64::from(u32::MAX) - 1,
u64::from(u32::MAX),
u64::from(u32::MAX) + 1,
(1_u64 << 32) - 1,
1_u64 << 32,
(1_u64 << 32) + 1,
(1_u64 << 63) - 1,
1_u64 << 63,
(1_u64 << 63) + 1,
u64::MAX - 1,
u64::MAX,
];

for divisor in divisors {
let reducer = StrengthReducedU64::new(divisor);
for value in values {
assert_eq!(
reducer.remainder(value),
value % divisor,
"value={value} divisor={divisor}"
);
}

let mut value = 0x1234_5678_9abc_def0 ^ divisor;
for _ in 0..10_000 {
value = value
.wrapping_mul(6_364_136_223_846_793_005)
.wrapping_add(1_442_695_040_888_963_407);
assert_eq!(
reducer.remainder(value),
value % divisor,
"value={value} divisor={divisor}"
);
}
}
}

#[test]
fn hash_partitioner_requires_nonzero_partitions() {
let metrics = ExecutionPlanMetricsSet::new();
let timer = MetricBuilder::new(&metrics).subset_time("test", 0);

let err = BatchPartitioner::new_hash_partitioner(vec![], 0, timer)
.err()
.expect("zero hash partitions should fail")
.to_string();

assert!(
err.contains("Hash repartition requires at least one partition"),
"actual: {err}"
);
}

#[tokio::test]
async fn one_to_many_round_robin() -> Result<()> {
// define input partitions
Expand Down
Loading