Skip to content

Conversation

@2010YOUY01
Copy link
Contributor

@2010YOUY01 2010YOUY01 commented Mar 3, 2025

Which issue does this PR close?

  • Closes #.

Rationale for this change

For memory-limit queries, executors might write temporary results into the disk to reduce memory load. It's important to have a configuration option to limit the max disk usage, in case some query would bloat the disk and cause other issues.

DuckDB provides a similar configuration:

max_temp_directory_size | The maximum amount of data stored inside the 'temp_directory' (when set) (e.g., 1GB)

This PR adds a configuration option max_temp_directory_size to disk manager (default to 100GB), if the limit is reached for all spilled files, an error will be returned. Currently, it can be only configured by constructing DiskManager with an additional argument.

See example in datafusion-cli (Manually modified datafusion-cli's startup code to change the disk limit to 1GB)

yongting@Mac ~/C/d/datafusion-cli (spill-disk-limit *)> cargo run --profile release-nonlto -- --mem-pool-type fair -m 100M
DataFusion CLI v46.0.0
> select * from generate_series(1, 1000000000000) as t1(v1) order by v1;
Resources exhausted: The used disk space during the spilling process has exceeded the allowable limit of 1024.0 MB. Try increasing the `max_temp_directory_size` in the disk manager configuration.

What changes are included in this PR?

This PR is mostly refactoring, the actual change is small. I made one big refactor into a separate commit, so you can only look at the second commit for the related changes.

Changes

1. refactor: Move metrics module from datafusion_physical_plan crate to datafusion_execution crate

Current code organization:

  • datafusion_physical_plan depends on datafusion_execution
  • datafusion_physical_plan includes physical executors like SortExec
  • datafusion_execution includes memory reservation and disk manager

This PR requires adding metrics tracking ability to DiskManager, besides, I believe in the future we can also add more Metrics into MemoryPool and DiskManager, for aggregated statistics across different executors, so this PR first made this refactoring change.

2. refactor: Group all spilling-related metrics in executors into a single struct SpillMetrics

This way we can ensure the metrics across different operators can be kept consistent.

3. refactor: Move IPCStreamWriter from datafusion_physical_plan to datafusion_execution::DiskManager

4. Implement the utility function to spill batches inside DiskManager

try_spill_record_batches is implemented to replace the old utility function spill_record_batches, this way the total spilled file size can be tracked, also the interface for external operators to use is a little simpler.

Are these changes tested?

Yes, integration test is included for queries exceed/not-exceed the disk limit.

Are there any user-facing changes?

No. However there is one deprecation, I'll explain separately.

@github-actions github-actions bot added core Core DataFusion crate execution Related to the execution crate labels Mar 3, 2025
since = "46.0.0",
note = "This function is deprecated. Use `datafusion_execution::DiskManager::try_spill_record_batch_by_size` instead. Note this method is mainly used within DataFusion for spilling operators. If you only
want the functionality of writing `RecordBatch`es to disk, consider using `arrow::ipc::writer::StreamWriter` instead."
)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't have to be public, see spill_record_batches() is also non-public.
I believe this utility function is used mainly for tracking temp files and metrics for DataFusion executor's internal use, the major functionality for writing RecordBatches to disk is in arrow::ipc, so I guess deprecating it won't cause major issues.

let disk_manager = DiskManager::try_new_without_arc(DiskManagerConfig::NewOs)?
.with_max_temp_directory_size(disk_limit)?;

let runtime = RuntimeEnvBuilder::new()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use the builder to specify disk limit for now, because DiskManagerConfig is an enum instead of struct, so now the setup routine is a bit hacky.
Changing it I think will inevitably cause API change, so I prefer to leave it to a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you coudl do this iinstead of adding a new function:

    let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
    let disk_manager = Arc::try_unwrap(disk_manager)
        .expect("DiskManager should be unique")
        .with_max_temp_directory_size(disk_limit)?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@comphead
Copy link
Contributor

comphead commented Mar 3, 2025

Nice PR thanks @2010YOUY01 I'm planning to review it today

@2010YOUY01 2010YOUY01 changed the title feat: Add one config to limit max disk usage for spilling queries feat: Add config max_temp_directory_size to limit max disk usage for spilling queries Mar 4, 2025
@adriangb
Copy link
Contributor

This makes total sense to me, I’ve certainly wanted this feature!

@comphead
Copy link
Contributor

Sorry, it totally slipped my mind

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @2010YOUY01 -- I really like where this is heading. Sorry for the very late review

I think we should try and simplify the configuration system a bit (I left some comment) as well as perhaps avoid adding new methods to the DiskManager).

I feel like some good follow on work would be (I can file tickets):

  1. Connect this setting to datafusion-cli so we can test it more easily
  2. Make the DiskManagerConfig more a builder style

Something like

let manager = DiskManagerBuilder::new()
  .with_max_temp_directory_size(100*1024*1024)
  .build_arc();

})
}

/// Write record batches to a temporary file, and return the spill file handle.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about putting this method on the DiskManager itself -- I think managing the contents of the spill files is something that will likely get more sophisticated (e.g. if we want to explore using mmap and other techniques to speed up reading)

I have been meaning to file tickets about this and I will do so shortly.

What would you think about introducing a new struct that would be responsible for managing the spill files for a particular operation

Something like this perhaps:

struct SpillFiles {
  env: Arc<RuntimeEnv>
  files: Vec<RefCountedTempFile>
...
}
...

impl SpillFiles {
    pub fn try_spill_record_batches(
        &self,
        batches: &[RecordBatch],
        request_description: &str,
        caller_spill_metrics: &mut SpillMetrics,
    ) -> Result<()> {..}
}

}
}

pub fn try_new_without_arc(config: DiskManagerConfig) -> Result<Self> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the need for this function and find the name confusing. It seems like the only difference is that it will error if it doesn't have exclusive access to the disk manager. I have an alternate suggestion above

}

/// Set the maximum amount of data (in bytes) stored inside the temporary directories.
pub fn with_max_temp_directory_size(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect that this function should be on the DiskManagerConfig rather than DiskManager 🤔

Then you won't have to add the new try_uwnrap either

let disk_manager = DiskManager::try_new_without_arc(DiskManagerConfig::NewOs)?
.with_max_temp_directory_size(disk_limit)?;

let runtime = RuntimeEnvBuilder::new()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you coudl do this iinstead of adding a new function:

    let disk_manager = DiskManager::try_new(DiskManagerConfig::NewOs)?;
    let disk_manager = Arc::try_unwrap(disk_manager)
        .expect("DiskManager should be unique")
        .with_max_temp_directory_size(disk_limit)?;

@2010YOUY01
Copy link
Contributor Author

2010YOUY01 commented Mar 20, 2025

@alamb Thanks for the review, I agree there are two things we can improve:

  1. Create a new struct for spilling related utilities, instead of putting it inside DiskManager
  2. Easier-to-use builder for DiskManager for configuring max_temp_dicrectory_size (@Standing-Man already start helping in Make DiskManagerBuilder to construct DiskManagers #15319, thank you 🙏🏼 )

I also found another limitation in this spill interface, for certain cases we want:

let in_progress_temp_file = spiller.create_in_progress_temp_file();
in_progress_temp_file.append(batch);
...
in_progress_temp_file.append(batch);
let temp_file = in_progress_temp_file.finish();

Instead of always writing batches to a spill file at once.

I plan to address point 1 and the above-mentioned limitation in another PR, and after that rework this PR to add disk limit feature. So closed this PR for now.

@2010YOUY01 2010YOUY01 closed this Mar 20, 2025
@alamb
Copy link
Contributor

alamb commented Mar 20, 2025

I plan to address point 1 and the above-mentioned limitation in another PR, and after that rework this PR to add disk limit feature. So closed this PR for now.

THank you -- I think consolidating the spilling related utilities and making the interface clear will make it much easier as we go forward to add new features.

Maybe we can take inspiration from comet's spill / shuffle writer code: https://github.com/apache/datafusion-comet/blob/382ac938f2d10666eefc08ec5c1c82025ddf3726/native/core/src/execution/shuffle/shuffle_writer.rs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Core DataFusion crate execution Related to the execution crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants