Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

335 changes: 213 additions & 122 deletions rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,162 @@ impl AddAssign for CompactionMetrics {
}
}

/// Trait for implementing custom compaction planning strategies.
///
/// This trait allows users to define their own compaction strategies by implementing
/// the `plan` method. The default implementation is provided by [`DefaultCompactionPlanner`].
#[async_trait::async_trait]
pub trait CompactionPlanner: Send + Sync {
/// Build compaction plan.
///
/// This method analyzes the dataset's fragments and generates a [`CompactionPlan`]
/// containing a list of compaction tasks to execute.
///
/// # Arguments
///
/// * `dataset` - Reference to the dataset to be compacted
/// * `options` - Compaction options including target row count, deletion thresholds, etc.
Comment thread
zhangyue19921010 marked this conversation as resolved.
Outdated
async fn plan(&self, dataset: &Dataset) -> Result<CompactionPlan>;
}

/// Formulate a plan to compact the files in a dataset
///
/// The compaction plan will contain a list of tasks to execute. Each task
/// will contain approximately `target_rows_per_fragment` rows and will be
/// rewriting fragments that are adjacent in the dataset's fragment list. Some
/// tasks may contain a single fragment when that fragment has deletions that
/// are being materialized and doesn't have any neighbors that need to be
/// compacted.
#[derive(Debug, Clone, Default)]
pub struct DefaultCompactionPlanner {
options: CompactionOptions,
}

impl DefaultCompactionPlanner {
pub fn new(mut options: CompactionOptions) -> Self {
options.validate();
Self { options }
}
}

#[async_trait::async_trait]
impl CompactionPlanner for DefaultCompactionPlanner {
async fn plan(&self, dataset: &Dataset) -> Result<CompactionPlan> {
// get_fragments should be returning fragments in sorted order (by id)
// and fragment ids should be unique
let fragments = dataset.get_fragments();

debug_assert!(
fragments.windows(2).all(|w| w[0].id() < w[1].id()),
"fragments in manifest are not sorted"
);
let mut fragment_metrics = futures::stream::iter(fragments)
.map(|fragment| async move {
match collect_metrics(&fragment).await {
Ok(metrics) => Ok((fragment.metadata, metrics)),
Err(e) => Err(e),
}
})
.buffered(dataset.object_store().io_parallelism());

let index_fragmaps = load_index_fragmaps(dataset).await?;
let indices_containing_frag = |frag_id: u32| {
index_fragmaps
.iter()
.enumerate()
.filter(|(_, bitmap)| bitmap.contains(frag_id))
.map(|(pos, _)| pos)
.collect::<Vec<_>>()
};

let mut candidate_bins: Vec<CandidateBin> = Vec::new();
let mut current_bin: Option<CandidateBin> = None;
let mut i = 0;

while let Some(res) = fragment_metrics.next().await {
let (fragment, metrics) = res?;

let candidacy = if self.options.materialize_deletions
&& metrics.deletion_percentage() > self.options.materialize_deletions_threshold
{
Some(CompactionCandidacy::CompactItself)
} else if metrics.physical_rows < self.options.target_rows_per_fragment {
// Only want to compact if their are neighbors to compact such that
Comment thread
zhangyue19921010 marked this conversation as resolved.
// we can get a larger fragment.
Some(CompactionCandidacy::CompactWithNeighbors)
} else {
// Not a candidate
None
};

let indices = indices_containing_frag(fragment.id as u32);

match (candidacy, &mut current_bin) {
(None, None) => {} // keep searching
(Some(candidacy), None) => {
// Start a new bin
current_bin = Some(CandidateBin {
fragments: vec![fragment],
pos_range: i..(i + 1),
candidacy: vec![candidacy],
row_counts: vec![metrics.num_rows()],
indices,
});
}
(Some(candidacy), Some(bin)) => {
// We cannot mix "indexed" and "non-indexed" fragments and so we only consider
// the existing bin if it contains the same indices
if bin.indices == indices {
// Add to current bin
bin.fragments.push(fragment);
bin.pos_range.end += 1;
bin.candidacy.push(candidacy);
bin.row_counts.push(metrics.num_rows());
} else {
// Index set is different. Complete previous bin and start new one
candidate_bins.push(current_bin.take().unwrap());
current_bin = Some(CandidateBin {
fragments: vec![fragment],
pos_range: i..(i + 1),
candidacy: vec![candidacy],
row_counts: vec![metrics.num_rows()],
indices,
});
}
}
(None, Some(_)) => {
// Bin is complete
candidate_bins.push(current_bin.take().unwrap());
}
}

i += 1;
}

// Flush the last bin
if let Some(bin) = current_bin {
candidate_bins.push(bin);
}

let final_bins = candidate_bins
.into_iter()
.filter(|bin| !bin.is_noop())
.flat_map(|bin| bin.split_for_size(self.options.target_rows_per_fragment))
.map(|bin| TaskData {
fragments: bin.fragments,
});

let mut compaction_plan =
CompactionPlan::new(dataset.manifest.version, self.options.clone());
compaction_plan.extend_tasks(final_bins);

Ok(compaction_plan)
}
}

/// Compacts the files in the dataset without reordering them.
///
/// This does a few things:
/// By default, this does a few things:
/// * Removes deleted rows from fragments.
/// * Removes dropped columns from fragments.
/// * Merges fragments that are too small.
Expand All @@ -218,13 +371,20 @@ impl AddAssign for CompactionMetrics {
/// If no compaction is needed, this method will not make a new version of the table.
pub async fn compact_files(
dataset: &mut Dataset,
mut options: CompactionOptions,
options: CompactionOptions,
remap_options: Option<Arc<dyn IndexRemapperOptions>>, // These will be deprecated later

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

) -> Result<CompactionMetrics> {
info!(target: TRACE_DATASET_EVENTS, event=DATASET_COMPACTING_EVENT, uri = &dataset.uri);
options.validate();
let planner = DefaultCompactionPlanner::new(options);
compact_files_with_planner(dataset, remap_options, &planner).await
}

let compaction_plan: CompactionPlan = plan_compaction(dataset, &options).await?;
pub async fn compact_files_with_planner(
dataset: &mut Dataset,
remap_options: Option<Arc<dyn IndexRemapperOptions>>, // These will be deprecated later
planner: &dyn CompactionPlanner,
) -> Result<CompactionMetrics> {
let compaction_plan: CompactionPlan = planner.plan(dataset).await?;

// If nothing to compact, don't make a commit.
if compaction_plan.tasks().is_empty() {
Expand All @@ -234,16 +394,23 @@ pub async fn compact_files(
let dataset_ref = &dataset.clone();

let result_stream = futures::stream::iter(compaction_plan.tasks.into_iter())
.map(|task| rewrite_files(Cow::Borrowed(dataset_ref), task, &options))
.map(|task| rewrite_files(Cow::Borrowed(dataset_ref), task, &compaction_plan.options))
.buffer_unordered(
options
compaction_plan
.options
.num_threads
.unwrap_or_else(get_num_compute_intensive_cpus),
);

let completed_tasks: Vec<RewriteResult> = result_stream.try_collect().await?;
let remap_options = remap_options.unwrap_or(Arc::new(DatasetIndexRemapperOptions::default()));
let metrics = commit_compaction(dataset, completed_tasks, remap_options, &options).await?;
let metrics = commit_compaction(
dataset,
completed_tasks,
remap_options,
&compaction_plan.options,
)
.await?;

Ok(metrics)
}
Expand Down Expand Up @@ -458,125 +625,12 @@ async fn load_index_fragmaps(dataset: &Dataset) -> Result<Vec<RoaringBitmap>> {
Ok(index_fragmaps)
}

/// Formulate a plan to compact the files in a dataset
///
/// The compaction plan will contain a list of tasks to execute. Each task
/// will contain approximately `target_rows_per_fragment` rows and will be
/// rewriting fragments that are adjacent in the dataset's fragment list. Some
/// tasks may contain a single fragment when that fragment has deletions that
/// are being materialized and doesn't have any neighbors that need to be
/// compacted.
pub async fn plan_compaction(
dataset: &Dataset,
options: &CompactionOptions,
) -> Result<CompactionPlan> {
// get_fragments should be returning fragments in sorted order (by id)
// and fragment ids should be unique
let fragments = dataset.get_fragments();
debug_assert!(
fragments.windows(2).all(|w| w[0].id() < w[1].id()),
"fragments in manifest are not sorted"
);
let mut fragment_metrics = futures::stream::iter(fragments)
.map(|fragment| async move {
match collect_metrics(&fragment).await {
Ok(metrics) => Ok((fragment.metadata, metrics)),
Err(e) => Err(e),
}
})
.buffered(dataset.object_store().io_parallelism());

let index_fragmaps = load_index_fragmaps(dataset).await?;
let indices_containing_frag = |frag_id: u32| {
index_fragmaps
.iter()
.enumerate()
.filter(|(_, bitmap)| bitmap.contains(frag_id))
.map(|(pos, _)| pos)
.collect::<Vec<_>>()
};

let mut candidate_bins: Vec<CandidateBin> = Vec::new();
let mut current_bin: Option<CandidateBin> = None;
let mut i = 0;

while let Some(res) = fragment_metrics.next().await {
let (fragment, metrics) = res?;

let candidacy = if options.materialize_deletions
&& metrics.deletion_percentage() > options.materialize_deletions_threshold
{
Some(CompactionCandidacy::CompactItself)
} else if metrics.physical_rows < options.target_rows_per_fragment {
// Only want to compact if their are neighbors to compact such that
// we can get a larger fragment.
Some(CompactionCandidacy::CompactWithNeighbors)
} else {
// Not a candidate
None
};

let indices = indices_containing_frag(fragment.id as u32);

match (candidacy, &mut current_bin) {
(None, None) => {} // keep searching
(Some(candidacy), None) => {
// Start a new bin
current_bin = Some(CandidateBin {
fragments: vec![fragment],
pos_range: i..(i + 1),
candidacy: vec![candidacy],
row_counts: vec![metrics.num_rows()],
indices,
});
}
(Some(candidacy), Some(bin)) => {
// We cannot mix "indexed" and "non-indexed" fragments and so we only consider
// the existing bin if it contains the same indices
if bin.indices == indices {
// Add to current bin
bin.fragments.push(fragment);
bin.pos_range.end += 1;
bin.candidacy.push(candidacy);
bin.row_counts.push(metrics.num_rows());
} else {
// Index set is different. Complete previous bin and start new one
candidate_bins.push(current_bin.take().unwrap());
current_bin = Some(CandidateBin {
fragments: vec![fragment],
pos_range: i..(i + 1),
candidacy: vec![candidacy],
row_counts: vec![metrics.num_rows()],
indices,
});
}
}
(None, Some(_)) => {
// Bin is complete
candidate_bins.push(current_bin.take().unwrap());
}
}

i += 1;
}

// Flush the last bin
if let Some(bin) = current_bin {
candidate_bins.push(bin);
}

let final_bins = candidate_bins
.into_iter()
.filter(|bin| !bin.is_noop())
.flat_map(|bin| bin.split_for_size(options.target_rows_per_fragment))
.map(|bin| TaskData {
fragments: bin.fragments,
});

let mut compaction_plan = CompactionPlan::new(dataset.manifest.version, options.clone());
compaction_plan.extend_tasks(final_bins);

Ok(compaction_plan)
let planner = DefaultCompactionPlanner::new(options.clone());
planner.plan(dataset).await
}

/// The result of a single compaction task.
Expand Down Expand Up @@ -3580,4 +3634,41 @@ mod tests {
plan
);
}

#[tokio::test]
async fn test_default_compaction_planner() {
let test_dir = TempStrDir::default();
let test_uri = &test_dir;

let data = sample_data();
let schema = data.schema();

// Create dataset with multiple small fragments
let reader = RecordBatchIterator::new(vec![Ok(data.clone())], schema.clone());
let write_params = WriteParams {
max_rows_per_file: 2000,
..Default::default()
};
let dataset = Dataset::write(reader, test_uri, Some(write_params))
.await
.unwrap();

assert_eq!(dataset.get_fragments().len(), 5);

// Test default planner
let options = CompactionOptions {
target_rows_per_fragment: 5000,
materialize_deletions_threshold: 2.0,
..Default::default()
};

let planner = DefaultCompactionPlanner::new(options);
let plan = planner.plan(&dataset).await.unwrap();

// Should create tasks to compact small fragments
assert!(!plan.tasks.is_empty());
assert_eq!(plan.read_version, dataset.manifest.version);
// make sure options.validate() is worked
Comment thread
zhangyue19921010 marked this conversation as resolved.
Outdated
assert_eq!(plan.options.materialize_deletions, false);
}
}