Skip to content

Commit

Permalink
fix: potential missing index creation in merge process
Browse files Browse the repository at this point in the history
  • Loading branch information
dantengsky committed May 29, 2024
1 parent 67f360b commit c2dc0e4
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 79 deletions.
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub use write::serialize_block;
pub use write::write_data;
pub use write::BlockBuilder;
pub use write::BlockSerialization;
pub use write::BlockWriter;
pub use write::CachedMetaWriter;
pub use write::InvertedIndexBuilder;
pub use write::InvertedIndexWriter;
Expand Down
78 changes: 78 additions & 0 deletions src/query/storages/fuse/src/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,22 @@ use databend_common_expression::TableSchemaRef;
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
use databend_common_io::constants::DEFAULT_BLOCK_INDEX_BUFFER_SIZE;
use databend_common_meta_app::schema::TableMeta;
use databend_common_metrics::storage::metrics_inc_block_index_write_milliseconds;
use databend_common_metrics::storage::metrics_inc_block_index_write_nums;
use databend_common_metrics::storage::metrics_inc_block_inverted_index_generate_milliseconds;
use databend_common_metrics::storage::metrics_inc_block_inverted_index_write_bytes;
use databend_common_metrics::storage::metrics_inc_block_inverted_index_write_milliseconds;
use databend_common_metrics::storage::metrics_inc_block_inverted_index_write_nums;
use databend_common_metrics::storage::metrics_inc_block_write_milliseconds;
use databend_common_metrics::storage::metrics_inc_block_write_nums;
use databend_storages_common_blocks::blocks_to_parquet;
use databend_storages_common_index::BloomIndex;
use databend_storages_common_table_meta::meta::BlockMeta;
use databend_storages_common_table_meta::meta::ClusterStatistics;
use databend_storages_common_table_meta::meta::ColumnMeta;
use databend_storages_common_table_meta::meta::Location;
use databend_storages_common_table_meta::table::TableCompression;
use log::info;
use opendal::Operator;

use crate::io::write::WriteSettings;
Expand Down Expand Up @@ -329,3 +337,73 @@ impl BlockBuilder {
Ok(serialized)
}
}

pub struct BlockWriter;

impl BlockWriter {
pub async fn write_down(dal: &Operator, serialized: BlockSerialization) -> Result<BlockMeta> {
let block_meta = serialized.block_meta;

Self::write_down_data_block(dal, serialized.block_raw_data, &block_meta.location.0).await?;
Self::write_down_bloom_index_state(dal, serialized.bloom_index_state).await?;
Self::write_down_inverted_index_state(dal, serialized.inverted_index_states).await?;

Ok(block_meta)
}

pub async fn write_down_data_block(
dal: &Operator,
raw_block_data: Vec<u8>,
block_location: &str,
) -> Result<()> {
let start = Instant::now();
let size = raw_block_data.len();

write_data(raw_block_data, dal, block_location).await?;

metrics_inc_block_write_nums(1);
metrics_inc_block_write_nums(size as u64);
metrics_inc_block_write_milliseconds(start.elapsed().as_millis() as u64);

info!("wrote down block: {}", block_location);
Ok(())
}

pub async fn write_down_bloom_index_state(
dal: &Operator,
bloom_index_state: Option<BloomIndexState>,
) -> Result<()> {
if let Some(index_state) = bloom_index_state {
let start = Instant::now();

let location = &index_state.location.0;
write_data(index_state.data, dal, location).await?;

metrics_inc_block_index_write_nums(1);
metrics_inc_block_index_write_nums(index_state.size);
metrics_inc_block_index_write_milliseconds(start.elapsed().as_millis() as u64);

info!("wrote down bloom index: {}", location);
}
Ok(())
}

pub async fn write_down_inverted_index_state(
dal: &Operator,
inverted_index_states: Vec<InvertedIndexState>,
) -> Result<()> {
for inverted_index_state in inverted_index_states {
let start = Instant::now();

let location = &inverted_index_state.location.0;
let index_size = inverted_index_state.size;
write_data(inverted_index_state.data, dal, location).await?;
metrics_inc_block_inverted_index_write_nums(1);
metrics_inc_block_inverted_index_write_bytes(index_size);
metrics_inc_block_inverted_index_write_milliseconds(start.elapsed().as_millis() as u64);

info!("wrote down inverted index: {}", location);
}
Ok(())
}
}
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub use block_writer::serialize_block;
pub use block_writer::write_data;
pub use block_writer::BlockBuilder;
pub use block_writer::BlockSerialization;
pub use block_writer::BlockWriter;
pub use block_writer::InvertedIndexBuilder;
pub(crate) use inverted_index_writer::create_index_schema;
pub(crate) use inverted_index_writer::create_tokenizer_manager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::any::Any;
use std::sync::Arc;
use std::time::Instant;

use databend_common_base::base::ProgressValues;
use databend_common_catalog::table::Table;
Expand All @@ -25,7 +24,6 @@ use databend_common_expression::BlockMetaInfoDowncast;
use databend_common_expression::ComputedExpr;
use databend_common_expression::DataBlock;
use databend_common_expression::TableSchema;
use databend_common_metrics::storage::*;
use databend_common_pipeline_core::processors::Event;
use databend_common_pipeline_core::processors::InputPort;
use databend_common_pipeline_core::processors::OutputPort;
Expand All @@ -37,9 +35,9 @@ use databend_storages_common_index::BloomIndex;
use opendal::Operator;

use crate::io::create_inverted_index_builders;
use crate::io::write_data;
use crate::io::BlockBuilder;
use crate::io::BlockSerialization;
use crate::io::BlockWriter;
use crate::operations::common::BlockMetaIndex;
use crate::operations::common::MutationLogEntry;
use crate::operations::common::MutationLogs;
Expand Down Expand Up @@ -303,70 +301,19 @@ impl Processor for TransformSerializeBlock {
async fn async_process(&mut self) -> Result<()> {
match std::mem::replace(&mut self.state, State::Consume) {
State::Serialized { serialized, index } => {
let start = Instant::now();
// write block data.
let raw_block_data = serialized.block_raw_data;
let data_size = raw_block_data.len();
let path = serialized.block_meta.location.0.as_str();
write_data(raw_block_data, &self.dal, path).await?;

// Perf.
{
metrics_inc_block_write_nums(1);
metrics_inc_block_write_bytes(data_size as u64);
metrics_inc_block_write_milliseconds(start.elapsed().as_millis() as u64);
}

// write index data.
let bloom_index_state = serialized.bloom_index_state;
if let Some(bloom_index_state) = bloom_index_state {
let start = Instant::now();
let index_size = bloom_index_state.size;
write_data(
bloom_index_state.data,
&self.dal,
&bloom_index_state.location.0,
)
.await?;
// Perf.
{
metrics_inc_block_index_write_nums(1);
metrics_inc_block_index_write_bytes(index_size);
metrics_inc_block_index_write_milliseconds(
start.elapsed().as_millis() as u64
);
}
}

// write inverted index
for inverted_index_state in serialized.inverted_index_states {
let start = Instant::now();
let index_size = inverted_index_state.size;
write_data(
inverted_index_state.data,
&self.dal,
&inverted_index_state.location.0,
)
.await?;
// Perf.
{
metrics_inc_block_inverted_index_write_nums(1);
metrics_inc_block_inverted_index_write_bytes(index_size);
metrics_inc_block_inverted_index_write_milliseconds(
start.elapsed().as_millis() as u64,
);
}
}
let block_meta = BlockWriter::write_down(&self.dal, serialized).await?;

let data_block = if let Some(index) = index {
let mutation_log_data_block = if let Some(index) = index {
// we are replacing the block represented by the `index`
Self::mutation_logs(MutationLogEntry::ReplacedBlock {
index,
block_meta: Arc::new(serialized.block_meta),
block_meta: Arc::new(block_meta),
})
} else {
// appending new data block
let progress_values = ProgressValues {
rows: serialized.block_meta.row_count as usize,
bytes: serialized.block_meta.block_size as usize,
rows: block_meta.row_count as usize,
bytes: block_meta.block_size as usize,
};
self.block_builder
.ctx
Expand All @@ -376,12 +323,12 @@ impl Processor for TransformSerializeBlock {
if let Some(tid) = self.table_id {
self.block_builder
.ctx
.update_multi_table_insert_status(tid, serialized.block_meta.row_count);
.update_multi_table_insert_status(tid, block_meta.row_count);
}

DataBlock::empty_with_meta(Box::new(serialized.block_meta))
DataBlock::empty_with_meta(Box::new(block_meta))
};
self.output_data = Some(data_block);
self.output_data = Some(mutation_log_data_block);
}
_ => return Err(ErrorCode::Internal("It's a bug.")),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ use itertools::Itertools;
use log::info;
use opendal::Operator;

use crate::io::write_data;
use crate::io::BlockBuilder;
use crate::io::BlockReader;
use crate::io::BlockWriter;
use crate::io::CompactSegmentInfoReader;
use crate::io::MetaReaders;
use crate::io::ReadSettings;
Expand Down Expand Up @@ -464,11 +464,8 @@ impl AggregationContext {
})??;

// persistent data
let new_block_meta = serialized.block_meta;
let new_block_location = new_block_meta.location.0.clone();
let new_block_raw_data = serialized.block_raw_data;
let data_accessor = self.data_accessor.clone();
write_data(new_block_raw_data, &data_accessor, &new_block_location).await?;

let new_block_meta = BlockWriter::write_down(&self.data_accessor, serialized).await?;

metrics_inc_merge_into_replace_blocks_counter(1);
metrics_inc_merge_into_replace_blocks_rows_counter(origin_num_rows as u32);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ use log::warn;
use opendal::Operator;

use crate::io::read::bloom::block_filter_reader::BloomBlockFilterReader;
use crate::io::write_data;
use crate::io::BlockBuilder;
use crate::io::BlockReader;
use crate::io::BlockWriter;
use crate::io::CompactSegmentInfoReader;
use crate::io::MetaReaders;
use crate::io::ReadSettings;
Expand Down Expand Up @@ -564,18 +564,11 @@ impl AggregationContext {
})??;

// persistent data
let new_block_meta = serialized.block_meta;
let new_block_location = new_block_meta.location.0.clone();
let new_block_raw_data = serialized.block_raw_data;
let data_accessor = self.data_accessor.clone();
write_data(new_block_raw_data, &data_accessor, &new_block_location).await?;
let new_block_meta = BlockWriter::write_down(&self.data_accessor, serialized).await?;

metrics_inc_replace_block_number_write(1);
metrics_inc_replace_row_number_write(new_block_meta.row_count);
metrics_inc_replace_replaced_blocks_rows(num_rows as u64);
if let Some(index_state) = serialized.bloom_index_state {
write_data(index_state.data, &data_accessor, &index_state.location.0).await?;
}

// generate log
let mutation = MutationLogEntry::ReplacedBlock {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
statement ok
create or replace database test_15669;

statement ok
use test_15669;

statement ok
create table t (c int) as select * from numbers(1000);

statement ok
merge into t using (select 99 c ) s on t.c = s.c when matched then delete;


# expects that bloom pruning prunes 1 block: "bloom pruning: 1 to 0"
query T
explain select * from t where c = 99;
----
Filter
├── output columns: [t.c (#0)]
├── filters: [is_true(t.c (#0) = 99)]
├── estimated rows: 1.00
└── TableScan
├── table: default.test_15669.t
├── output columns: [c (#0)]
├── read rows: 0
├── read size: 0
├── partitions total: 1
├── partitions scanned: 0
├── pruning stats: [segments: <range pruning: 1 to 1>, blocks: <range pruning: 1 to 1, bloom pruning: 1 to 0>]
├── push downs: [filters: [is_true(t.c (#0) = 99)], limit: NONE]
└── estimated rows: 999.00

0 comments on commit c2dc0e4

Please sign in to comment.