Skip to content

Commit

Permalink
feat: add vacuum table result table (#14830)
Browse files Browse the repository at this point in the history
* feat: add vacuum table result table

* feat: add vacuum table result table

* feat: add vacuum table result table, fix test fail
  • Loading branch information
lichuang authored Mar 5, 2024
1 parent 6517e89 commit dbe6830
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 3 deletions.
79 changes: 78 additions & 1 deletion src/query/service/src/interpreters/interpreter_table_vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ use chrono::Duration;
use databend_common_catalog::table::TableExt;
use databend_common_exception::Result;
use databend_common_expression::types::StringType;
use databend_common_expression::types::UInt64Type;
use databend_common_expression::DataBlock;
use databend_common_expression::FromData;
use databend_common_license::license::Feature::Vacuum;
use databend_common_license::license_manager::get_license_manager;
use databend_common_sql::plans::VacuumTablePlan;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_fuse::FUSE_TBL_BLOCK_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_SEGMENT_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_SNAPSHOT_PREFIX;
use databend_common_storages_fuse::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX;
use databend_enterprise_vacuum_handler::get_vacuum_handler;
use opendal::Metakey;

use crate::interpreters::Interpreter;
use crate::pipelines::PipelineBuildResult;
Expand All @@ -36,10 +42,57 @@ pub struct VacuumTableInterpreter {
plan: VacuumTablePlan,
}

type FileStat = (u64, u64);

#[derive(Debug, Default)]
struct Statistics {
pub snapshot_files: FileStat,
pub segment_files: FileStat,
pub block_files: FileStat,
pub index_files: FileStat,
}

impl VacuumTableInterpreter {
pub fn try_create(ctx: Arc<QueryContext>, plan: VacuumTablePlan) -> Result<Self> {
Ok(VacuumTableInterpreter { ctx, plan })
}

async fn get_statistics(&self, fuse_table: &FuseTable) -> Result<Statistics> {
let operator = fuse_table.get_operator();
let table_data_prefix = format!("/{}", fuse_table.meta_location_generator().prefix());

let mut snapshot_files = (0, 0);
let mut segment_files = (0, 0);
let mut block_files = (0, 0);
let mut index_files = (0, 0);

let prefix_with_stats = vec![
(FUSE_TBL_SNAPSHOT_PREFIX, &mut snapshot_files),
(FUSE_TBL_SEGMENT_PREFIX, &mut segment_files),
(FUSE_TBL_BLOCK_PREFIX, &mut block_files),
(FUSE_TBL_XOR_BLOOM_INDEX_PREFIX, &mut index_files),
];

for (dir_prefix, stat) in prefix_with_stats {
for entry in operator
.list_with(&format!("{}/{}/", table_data_prefix, dir_prefix))
.metakey(Metakey::ContentLength)
.await?
{
if entry.metadata().is_file() {
stat.0 += 1;
stat.1 += entry.metadata().content_length();
}
}
}

Ok(Statistics {
snapshot_files,
segment_files,
block_files,
index_files,
})
}
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -83,7 +136,31 @@ impl Interpreter for VacuumTableInterpreter {
.await?;

match purge_files_opt {
None => return Ok(PipelineBuildResult::create()),
None => {
return {
let stat = self.get_statistics(fuse_table).await?;
let total_files = stat.snapshot_files.0
+ stat.segment_files.0
+ stat.block_files.0
+ stat.index_files.0;
let total_size = stat.snapshot_files.1
+ stat.segment_files.1
+ stat.block_files.1
+ stat.index_files.1;
PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![
UInt64Type::from_data(vec![stat.snapshot_files.0]),
UInt64Type::from_data(vec![stat.snapshot_files.1]),
UInt64Type::from_data(vec![stat.segment_files.0]),
UInt64Type::from_data(vec![stat.segment_files.1]),
UInt64Type::from_data(vec![stat.block_files.0]),
UInt64Type::from_data(vec![stat.block_files.1]),
UInt64Type::from_data(vec![stat.index_files.0]),
UInt64Type::from_data(vec![stat.index_files.1]),
UInt64Type::from_data(vec![total_files]),
UInt64Type::from_data(vec![total_size]),
])])
};
}
Some(purge_files) => PipelineBuildResult::from_blocks(vec![
DataBlock::new_from_columns(vec![StringType::from_data(purge_files)]),
]),
Expand Down
13 changes: 12 additions & 1 deletion src/query/sql/src/planner/plans/ddl/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,18 @@ impl VacuumTablePlan {
DataType::String,
)]))
} else {
Arc::new(DataSchema::empty())
Arc::new(DataSchema::new(vec![
DataField::new("snapshot_files", DataType::Number(NumberDataType::UInt64)),
DataField::new("snapshot_bytes", DataType::Number(NumberDataType::UInt64)),
DataField::new("segments_files", DataType::Number(NumberDataType::UInt64)),
DataField::new("segments_size", DataType::Number(NumberDataType::UInt64)),
DataField::new("block_files", DataType::Number(NumberDataType::UInt64)),
DataField::new("block_size", DataType::Number(NumberDataType::UInt64)),
DataField::new("index_files", DataType::Number(NumberDataType::UInt64)),
DataField::new("index_size", DataType::Number(NumberDataType::UInt64)),
DataField::new("total_files", DataType::Number(NumberDataType::UInt64)),
DataField::new("total_size", DataType::Number(NumberDataType::UInt64)),
]))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ bash ../scripts/ci/deploy/databend-query-standalone.sh >/dev/null 2>&1
# check if before and after vacuum table the table count matched
old_count=$(echo "select * from test_vacuum.a order by c" | $BENDSQL_CLIENT_CONNECT)

echo "set data_retention_time_in_days=0; vacuum table test_vacuum.a" | $BENDSQL_CLIENT_CONNECT
echo "set data_retention_time_in_days=0; vacuum table test_vacuum.a" | $BENDSQL_CLIENT_CONNECT >/dev/null
#echo "optimize table test_vacuum.a all" | $BENDSQL_CLIENT_CONNECT
count=$(echo "select * from test_vacuum.a order by c" | $BENDSQL_CLIENT_CONNECT)

Expand Down
9 changes: 9 additions & 0 deletions tests/suites/5_ee/01_vacuum/01_0002_ee_vacuum_drop_table.sh
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ echo "select * from test_vacuum_drop_4.b" | $BENDSQL_CLIENT_CONNECT
echo "set data_retention_time_in_days=0; vacuum drop table" | $BENDSQL_CLIENT_CONNECT
echo "select * from test_vacuum_drop_4.b" | $BENDSQL_CLIENT_CONNECT

## test vacuum drop table output
echo "create table test_vacuum_drop_4.c(c int)" | $BENDSQL_CLIENT_CONNECT
echo "INSERT INTO test_vacuum_drop_4.c VALUES (1)" | $BENDSQL_CLIENT_CONNECT
count=$(echo "set data_retention_time_in_days=0; vacuum table test_vacuum_drop_4.c" | $BENDSQL_CLIENT_CONNECT | awk '{print $9}')
if [[ "$count" != "4" ]]; then
echo "vacuum table, count:$count"
exit 1
fi

echo "drop database if exists test_vacuum_drop_4" | $BENDSQL_CLIENT_CONNECT

## Drop table
Expand Down

0 comments on commit dbe6830

Please sign in to comment.