diff --git a/native/core/src/execution/shuffle/shuffle_writer.rs b/native/core/src/execution/shuffle/shuffle_writer.rs index 2c597b059b..25c836a6ab 100644 --- a/native/core/src/execution/shuffle/shuffle_writer.rs +++ b/native/core/src/execution/shuffle/shuffle_writer.rs @@ -186,7 +186,6 @@ impl ExecutionPlan for ShuffleWriterExec { futures::stream::once( external_shuffle( input, - partition, self.output_data_file.clone(), self.output_index_file.clone(), self.partitioning.clone(), @@ -205,7 +204,6 @@ impl ExecutionPlan for ShuffleWriterExec { #[allow(clippy::too_many_arguments)] async fn external_shuffle( mut input: SendableRecordBatchStream, - partition_id: usize, output_data_file: String, output_index_file: String, partitioning: Partitioning, @@ -216,7 +214,6 @@ async fn external_shuffle( ) -> Result { let schema = input.schema(); let mut repartitioner = ShuffleRepartitioner::try_new( - partition_id, output_data_file, output_index_file, Arc::clone(&schema), @@ -288,13 +285,11 @@ struct ShuffleRepartitioner { output_index_file: String, schema: SchemaRef, buffered_partitions: Vec, - /// Sort expressions /// Partitioning scheme to use partitioning: Partitioning, num_output_partitions: usize, runtime: Arc, metrics: ShuffleRepartitionerMetrics, - reservation: MemoryReservation, /// Hashes for each row in the current batch hashes_buf: Vec, /// Partition ids for each row in the current batch @@ -306,7 +301,6 @@ struct ShuffleRepartitioner { impl ShuffleRepartitioner { #[allow(clippy::too_many_arguments)] pub fn try_new( - partition_id: usize, output_data_file: String, output_index_file: String, schema: SchemaRef, @@ -318,9 +312,6 @@ impl ShuffleRepartitioner { enable_fast_encoding: bool, ) -> Result { let num_output_partitions = partitioning.partition_count(); - let reservation = MemoryConsumer::new(format!("ShuffleRepartitioner[{}]", partition_id)) - .with_can_spill(true) - .register(&runtime.memory_pool); let mut hashes_buf = Vec::with_capacity(batch_size); let mut partition_ids = Vec::with_capacity(batch_size); @@ -352,7 +343,6 @@ impl ShuffleRepartitioner { num_output_partitions, runtime, metrics, - reservation, hashes_buf, partition_ids, batch_size, @@ -472,41 +462,12 @@ impl ShuffleRepartitioner { .enumerate() .filter(|(_, (start, end))| start < end) { - let mut mem_diff = self - .append_rows_to_partition( - input.columns(), - &shuffled_partition_ids[start..end], - partition_id, - ) - .await?; - - if mem_diff > 0 { - let mem_increase = mem_diff as usize; - - let try_grow = { - let mut mempool_timer = self.metrics.mempool_time.timer(); - let result = self.reservation.try_grow(mem_increase); - mempool_timer.stop(); - result - }; - - if try_grow.is_err() { - self.spill().await?; - let mut mempool_timer = self.metrics.mempool_time.timer(); - self.reservation.free(); - self.reservation.try_grow(mem_increase)?; - mempool_timer.stop(); - mem_diff = 0; - } - } - - if mem_diff < 0 { - let mem_used = self.reservation.size(); - let mem_decrease = mem_used.min(-mem_diff as usize); - let mut mempool_timer = self.metrics.mempool_time.timer(); - self.reservation.shrink(mem_decrease); - mempool_timer.stop(); - } + self.append_rows_to_partition( + input.columns(), + &shuffled_partition_ids[start..end], + partition_id, + ) + .await?; } } Partitioning::UnknownPartitioning(n) if *n == 1 => { @@ -518,6 +479,9 @@ impl ShuffleRepartitioner { buffered_partitions.len() ); + // TODO the single partition case could be optimized to avoid appending all + // rows from the batch into builders and then recreating the batch + // https://github.com/apache/datafusion-comet/issues/1453 let indices = (0..input.num_rows()).collect::>(); self.append_rows_to_partition(input.columns(), &indices, 0) @@ -593,10 +557,6 @@ impl ShuffleRepartitioner { write_time.stop(); - let mut mempool_timer = self.metrics.mempool_time.timer(); - self.reservation.free(); - mempool_timer.stop(); - elapsed_compute.stop(); // shuffle writer always has empty output @@ -608,7 +568,10 @@ impl ShuffleRepartitioner { } fn used(&self) -> usize { - self.reservation.size() + self.buffered_partitions + .iter() + .map(|b| b.reservation.size()) + .sum() } fn spilled_bytes(&self) -> usize { @@ -639,7 +602,6 @@ impl ShuffleRepartitioner { for p in &mut self.buffered_partitions { spilled_bytes += p.spill(&self.runtime, &self.metrics)?; } - self.reservation.free(); self.metrics.spill_count.add(1); self.metrics.spilled_bytes.add(spilled_bytes); @@ -652,34 +614,28 @@ impl ShuffleRepartitioner { columns: &[ArrayRef], indices: &[usize], partition_id: usize, - ) -> Result { - let mut mem_diff = 0; - + ) -> Result<()> { let output = &mut self.buffered_partitions[partition_id]; // If the range of indices is not big enough, just appending the rows into // active array builders instead of directly adding them as a record batch. let mut start_index: usize = 0; - let mut output_ret = output.append_rows(columns, indices, start_index, &self.metrics); + let mut output_ret = output.append_rows(columns, indices, start_index, &self.metrics)?; loop { match output_ret { - AppendRowStatus::MemDiff(l) => { - mem_diff += l?; + AppendRowStatus::Appended => { break; } AppendRowStatus::StartIndex(new_start) => { - // Cannot allocate enough memory for the array builders in the partition, - // spill partitions and retry. + // Cannot allocate enough memory for the array builders in this partition, + // spill all partitions and retry. self.spill().await?; - let mut mempool_timer = self.metrics.mempool_time.timer(); - self.reservation.free(); - mempool_timer.stop(); - start_index = new_start; let output = &mut self.buffered_partitions[partition_id]; - output_ret = output.append_rows(columns, indices, start_index, &self.metrics); + output_ret = + output.append_rows(columns, indices, start_index, &self.metrics)?; if let AppendRowStatus::StartIndex(new_start) = output_ret { if new_start == start_index { @@ -695,7 +651,7 @@ impl ShuffleRepartitioner { } } - Ok(mem_diff) + Ok(()) } } @@ -713,9 +669,9 @@ impl Debug for ShuffleRepartitioner { /// The status of appending rows to a partition buffer. #[derive(Debug)] enum AppendRowStatus { - /// The difference in memory usage after appending rows - MemDiff(Result), - /// The index of the next row to append + /// Rows were appended + Appended, + /// Not all rows were appended due to lack of available memory StartIndex(usize), } @@ -782,9 +738,7 @@ impl PartitionBuffer { /// Initializes active builders if necessary. /// Returns error if memory reservation fails. - fn init_active_if_necessary(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result { - let mut mem_diff = 0; - + fn allocate_active_builders(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<()> { if self.active.is_empty() { let mut mempool_timer = metrics.mempool_time.timer(); self.reservation.try_grow(self.active_slots_mem_size)?; @@ -793,10 +747,8 @@ impl PartitionBuffer { let mut repart_timer = metrics.repart_time.timer(); self.active = new_array_builders(&self.schema, self.batch_size); repart_timer.stop(); - - mem_diff += self.active_slots_mem_size as isize; } - Ok(mem_diff) + Ok(()) } /// Appends rows of specified indices from columns into active array builders. @@ -806,20 +758,20 @@ impl PartitionBuffer { indices: &[usize], start_index: usize, metrics: &ShuffleRepartitionerMetrics, - ) -> AppendRowStatus { - let mut mem_diff = 0; + ) -> Result { let mut start = start_index; - // lazy init because some partition may be empty - let init = self.init_active_if_necessary(metrics); - if init.is_err() { - return AppendRowStatus::StartIndex(start); - } - mem_diff += init.unwrap(); - + // loop until all indices are processed while start < indices.len() { let end = (start + self.batch_size).min(indices.len()); + // allocate builders + if self.allocate_active_builders(metrics).is_err() { + // could not allocate memory for builders, so abort + // and return the current index + return Ok(AppendRowStatus::StartIndex(start)); + } + let mut repart_timer = metrics.repart_time.timer(); self.active .iter_mut() @@ -829,54 +781,53 @@ impl PartitionBuffer { }); self.num_active_rows += end - start; repart_timer.stop(); + start = end; if self.num_active_rows >= self.batch_size { - let flush = self.flush(metrics); - if let Err(e) = flush { - return AppendRowStatus::MemDiff(Err(e)); - } - mem_diff += flush.unwrap(); - - let init = self.init_active_if_necessary(metrics); - if init.is_err() { - return AppendRowStatus::StartIndex(end); - } - mem_diff += init.unwrap(); + self.flush(metrics)?; } - start = end; } - AppendRowStatus::MemDiff(Ok(mem_diff)) + Ok(AppendRowStatus::Appended) } /// Flush active data into frozen bytes. This can reduce memory usage because the frozen /// bytes are compressed. - fn flush(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result { + fn flush(&mut self, metrics: &ShuffleRepartitionerMetrics) -> Result<()> { if self.num_active_rows == 0 { - return Ok(0); + return Ok(()); } - let mut mem_diff = 0isize; // active -> staging let active = std::mem::take(&mut self.active); let num_rows = self.num_active_rows; self.num_active_rows = 0; - let mut mempool_timer = metrics.mempool_time.timer(); - self.reservation.try_shrink(self.active_slots_mem_size)?; - mempool_timer.stop(); - let mut repart_timer = metrics.repart_time.timer(); let frozen_batch = make_batch(Arc::clone(&self.schema), active, num_rows)?; repart_timer.stop(); - let frozen_capacity_old = self.frozen.capacity(); let mut cursor = Cursor::new(&mut self.frozen); cursor.seek(SeekFrom::End(0))?; - self.shuffle_block_writer - .write_batch(&frozen_batch, &mut cursor, &metrics.encode_time)?; + let bytes_written = self.shuffle_block_writer.write_batch( + &frozen_batch, + &mut cursor, + &metrics.encode_time, + )?; + + // we typically expect the frozen bytes to take up less memory than + // the builders due to compression but there could be edge cases where + // this is not the case + let mut mempool_timer = metrics.mempool_time.timer(); + if self.active_slots_mem_size >= bytes_written { + self.reservation + .try_shrink(self.active_slots_mem_size - bytes_written)?; + } else { + self.reservation + .try_grow(bytes_written - self.active_slots_mem_size)?; + } + mempool_timer.stop(); - mem_diff += (self.frozen.capacity() - frozen_capacity_old) as isize; - Ok(mem_diff) + Ok(()) } fn spill( @@ -1073,10 +1024,12 @@ mod test { assert!(buffer.spill_file.is_none()); // append first batch - should fit in memory - let status = buffer.append_rows(batch.columns(), &indices, 0, &metrics); + let status = buffer + .append_rows(batch.columns(), &indices, 0, &metrics) + .unwrap(); assert_eq!( format!("{status:?}"), - format!("{:?}", AppendRowStatus::MemDiff(Ok(106496))) + format!("{:?}", AppendRowStatus::Appended) ); assert_eq!(900, buffer.num_active_rows); assert_eq!(106496, buffer.reservation.size()); @@ -1084,15 +1037,16 @@ mod test { assert!(buffer.spill_file.is_none()); // append second batch - should trigger flush to frozen bytes - let status = buffer.append_rows(batch.columns(), &indices, 0, &metrics); + let status = buffer + .append_rows(batch.columns(), &indices, 0, &metrics) + .unwrap(); assert_eq!( format!("{status:?}"), - format!("{:?}", AppendRowStatus::MemDiff(Ok(126316))) + format!("{:?}", AppendRowStatus::Appended) ); assert_eq!(0, buffer.num_active_rows); assert_eq!(9914, buffer.frozen.len()); - // note that the reservation does not include the frozen bytes - assert_eq!(106496, buffer.reservation.size()); + assert_eq!(9914, buffer.reservation.size()); assert!(buffer.spill_file.is_none()); // spill @@ -1104,14 +1058,15 @@ mod test { assert_eq!(9914, buffer.spill_file.as_ref().unwrap().file.len()); // append after spill - let status = buffer.append_rows(batch.columns(), &indices, 0, &metrics); + let status = buffer + .append_rows(batch.columns(), &indices, 0, &metrics) + .unwrap(); assert_eq!( format!("{status:?}"), - format!("{:?}", AppendRowStatus::MemDiff(Ok(0))) + format!("{:?}", AppendRowStatus::Appended) ); assert_eq!(900, buffer.num_active_rows); - // TODO reservation should not be zero because there are active builders again - assert_eq!(0, buffer.reservation.size()); + assert_eq!(106496, buffer.reservation.size()); assert_eq!(0, buffer.frozen.len()); } @@ -1125,7 +1080,6 @@ mod test { let runtime_env = create_runtime(memory_limit); let metrics_set = ExecutionPlanMetricsSet::new(); let mut repartitioner = ShuffleRepartitioner::try_new( - 0, "/tmp/data.out".to_string(), "/tmp/index.out".to_string(), batch.schema(), @@ -1145,11 +1099,6 @@ mod test { assert!(repartitioner.buffered_partitions[0].spill_file.is_none()); assert!(repartitioner.buffered_partitions[1].spill_file.is_none()); - // TODO: note that we are currently double counting the memory usage - // because we reserve the memory twice - once at the repartitioner level - // and then again in each PartitionBuffer - // https://github.com/apache/datafusion-comet/issues/1448 - assert_eq!(212992, repartitioner.reservation.size()); assert_eq!( 106496, repartitioner.buffered_partitions[0].reservation.size() @@ -1166,14 +1115,12 @@ mod test { assert!(repartitioner.buffered_partitions[1].spill_file.is_some()); // after spill, all reservations should be freed - assert_eq!(0, repartitioner.reservation.size()); assert_eq!(0, repartitioner.buffered_partitions[0].reservation.size()); assert_eq!(0, repartitioner.buffered_partitions[1].reservation.size()); // insert another batch after spilling repartitioner.insert_batch(batch.clone()).await.unwrap(); - assert_eq!(212992, repartitioner.reservation.size()); assert_eq!( 106496, repartitioner.buffered_partitions[0].reservation.size()