Skip to content

Commit

Permalink
fix/avoid-suppress-manual-compaction:
Browse files Browse the repository at this point in the history
 ### Add Support for Manual Compaction Requests

 - **Compaction Logic Enhancements**:
   - Updated `CompactionScheduler` in `compaction.rs` to handle manual compaction requests using `Options::StrictWindow`.
   - Introduced `PendingCompaction` struct to manage pending manual compaction requests.
   - Added logic to reschedule manual compaction requests once the current compaction task is completed.

 - **Testing**:
   - Added `test_manual_compaction_when_compaction_in_progress` to verify the handling of manual compaction requests during ongoing compaction processes.

 These changes enhance the compaction scheduling mechanism by allowing manual compaction requests to be queued and processed efficiently.
  • Loading branch information
v0y4g3r committed Jan 19, 2025
1 parent 87e2d1c commit bc38ed0
Showing 1 changed file with 176 additions and 6 deletions.
182 changes: 176 additions & 6 deletions src/mito2/src/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use std::sync::Arc;
use std::time::Instant;

use api::v1::region::compact_request;
use api::v1::region::compact_request::Options;
use common_base::Plugins;
use common_meta::key::SchemaMetadataManagerRef;
use common_telemetry::{debug, error, info, warn};
Expand Down Expand Up @@ -140,8 +141,20 @@ impl CompactionScheduler {
schema_metadata_manager: SchemaMetadataManagerRef,
) -> Result<()> {
if let Some(status) = self.region_status.get_mut(&region_id) {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
match compact_options {
Options::Regular(_) => {
// Region is compacting. Add the waiter to pending list.
status.merge_waiter(waiter);
}
options @ Options::StrictWindow(_) => {
// Incoming compaction request is manually triggered.
status.set_pending_request(PendingCompaction { options, waiter });
info!(
"Region {} is compacting, manually compaction will be re-scheduled.",
region_id
);
}
}
return Ok(());
}

Expand Down Expand Up @@ -177,6 +190,30 @@ impl CompactionScheduler {
return;
};

if let Some(pending_request) = std::mem::take(&mut status.pending_request) {
let PendingCompaction { options, waiter } = pending_request;

let request = status.new_compaction_request(
self.request_sender.clone(),
waiter,
self.engine_config.clone(),
self.cache_manager.clone(),
manifest_ctx,
self.listener.clone(),
schema_metadata_manager,
);

if let Err(e) = self.schedule_compaction_request(request, options).await {
error!(e; "Failed to continue pending manual compaction for region id: {}", region_id);
} else {
debug!(
"Successfully scheduled manual compaction for region id: {}",
region_id
);
}
return;
}

// We should always try to compact the region until picker returns None.
let request = status.new_compaction_request(
self.request_sender.clone(),
Expand Down Expand Up @@ -445,6 +482,8 @@ struct CompactionStatus {
access_layer: AccessLayerRef,
/// Pending waiters for compaction.
waiters: Vec<OutputTx>,
/// Pending compactions that are supposed to run as soon as current compaction task finished.
pending_request: Option<PendingCompaction>,
}

impl CompactionStatus {
Expand All @@ -459,6 +498,7 @@ impl CompactionStatus {
version_control,
access_layer,
waiters: Vec::new(),
pending_request: None,
}
}

Expand All @@ -469,9 +509,21 @@ impl CompactionStatus {
}
}

/// Set pending compaction request or replace current value if already exist.
fn set_pending_request(&mut self, pending: PendingCompaction) {
if let Some(prev) = self.pending_request.replace(pending) {
debug!(
"Replace pending compaction options with new request {:?} for region: {}",
prev.options, self.region_id
);
}
}

fn on_failure(mut self, err: Arc<Error>) {
for waiter in self.waiters.drain(..) {
waiter.send(Err(err.clone()).context(CompactRegionSnafu { region_id:self.region_id }));
waiter.send(Err(err.clone()).context(CompactRegionSnafu {
region_id: self.region_id,
}));
}
}

Expand Down Expand Up @@ -647,9 +699,20 @@ fn get_expired_ssts(
.collect()
}

/// Pending compaction request that is supposed to run after current task is finished,
/// typically used for manual compactions.
struct PendingCompaction {
/// Compaction options. Currently, it can only be [StrictWindow].
pub(crate) options: compact_request::Options,
/// Waiters of pending requests.
pub(crate) waiter: OptionOutputTx,
}

#[cfg(test)]
mod tests {
use api::v1::region::StrictWindow;
use tokio::sync::oneshot;

use super::*;
use crate::test_util::mock_schema_metadata_manager;
use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
Expand Down Expand Up @@ -802,7 +865,8 @@ mod tests {
.region_status
.get(&builder.region_id())
.unwrap()
.waiters.is_empty());
.waiters
.is_empty());

// On compaction finished and schedule next compaction.
scheduler
Expand All @@ -811,7 +875,6 @@ mod tests {
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());


// 5 files for next compaction.
apply_edit(
&version_control,
Expand All @@ -837,6 +900,113 @@ mod tests {
assert!(!scheduler
.region_status
.get(&builder.region_id())
.unwrap().waiters.is_empty());
.unwrap()
.waiters
.is_empty());
}

#[tokio::test]
async fn test_manual_compaction_when_compaction_in_progress() {
common_telemetry::init_default_ut_logging();
let job_scheduler = Arc::new(VecScheduler::default());
let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
let (tx, _rx) = mpsc::channel(4);
let mut scheduler = env.mock_compaction_scheduler(tx);
let mut builder = VersionControlBuilder::new();
let purger = builder.file_purger();
let region_id = builder.region_id();

let (schema_metadata_manager, kv_backend) = mock_schema_metadata_manager();
schema_metadata_manager
.register_region_table_info(
builder.region_id().table_id(),
"test_table",
"test_catalog",
"test_schema",
None,
kv_backend,
)
.await;

// 5 files to compact.
let end = 1000 * 1000;
let version_control = Arc::new(
builder
.push_l0_file(0, end)
.push_l0_file(10, end)
.push_l0_file(50, end)
.push_l0_file(80, end)
.push_l0_file(90, end)
.build(),
);
let manifest_ctx = env
.mock_manifest_context(version_control.current().version.metadata.clone())
.await;

let file_metas: Vec<_> = version_control.current().version.ssts.levels()[0]
.files
.values()
.map(|file| file.meta_ref().clone())
.collect();

// 5 files for next compaction and removes old files.
apply_edit(
&version_control,
&[(0, end), (20, end), (40, end), (60, end), (80, end)],
&file_metas,
purger.clone(),
);

scheduler
.schedule_compaction(
region_id,
compact_request::Options::Regular(Default::default()),
&version_control,
&env.access_layer,
OptionOutputTx::none(),
&manifest_ctx,
schema_metadata_manager.clone(),
)
.await
.unwrap();
// Should schedule 1 compaction.
assert_eq!(1, scheduler.region_status.len());
assert_eq!(1, job_scheduler.num_jobs());
assert!(scheduler
.region_status
.get(&region_id)
.unwrap()
.pending_request
.is_none());

// Schedule another manual compaction.
let (tx, _rx) = oneshot::channel();
scheduler
.schedule_compaction(
region_id,
compact_request::Options::StrictWindow(StrictWindow { window_seconds: 60 }),
&version_control,
&env.access_layer,
OptionOutputTx::new(Some(OutputTx::new(tx))),
&manifest_ctx,
schema_metadata_manager.clone(),
)
.await
.unwrap();
assert_eq!(1, scheduler.region_status.len());
// Current job num should be 1 since compaction is in progress.
assert_eq!(1, job_scheduler.num_jobs());
let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_some());

// On compaction finished and schedule next compaction.
scheduler
.on_compaction_finished(region_id, &manifest_ctx, schema_metadata_manager.clone())
.await;
assert_eq!(1, scheduler.region_status.len());
assert_eq!(2, job_scheduler.num_jobs());

let status = scheduler.region_status.get(&builder.region_id()).unwrap();
assert!(status.pending_request.is_none());
}
}

0 comments on commit bc38ed0

Please sign in to comment.