diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs index 69e4c1b40056a..56a8a275c52e6 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_builder.rs @@ -14,7 +14,9 @@ use databend_common_exception::Result; use databend_common_expression::BlockThresholds; +use databend_common_expression::Column; use databend_common_expression::DataBlock; +use databend_common_expression::Value; use databend_common_pipeline_core::Pipeline; use crate::processors::AccumulatingTransform; @@ -63,7 +65,7 @@ impl AccumulatingTransform for BlockCompactBuilder { fn transform(&mut self, data: DataBlock) -> Result> { let num_rows = data.num_rows(); - let num_bytes = data.memory_size(); + let num_bytes = memory_size(&data); if !self.thresholds.check_for_compact(num_rows, num_bytes) { // holding slices of blocks to merge later may lead to oom, so @@ -112,3 +114,18 @@ impl AccumulatingTransform for BlockCompactBuilder { } } } + +pub(crate) fn memory_size(data_block: &DataBlock) -> usize { + data_block + .columns() + .iter() + .map(|entry| { + if let Value::Column(Column::Nullable(col)) = &entry.value { + if col.validity.true_count() == 0 { + return 0; + } + } + entry.memory_size() + }) + .sum() +} diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs index 85215b0b58b6f..adcd014acb56f 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_compact_no_split_builder.rs @@ -79,7 +79,7 @@ impl AccumulatingTransform for BlockCompactNoSplitBuilder { fn transform(&mut self, data: DataBlock) -> Result> { self.accumulated_rows += data.num_rows(); - self.accumulated_bytes += data.memory_size(); + self.accumulated_bytes += crate::processors::memory_size(&data); if !self .thresholds .check_large_enough(self.accumulated_rows, self.accumulated_bytes) diff --git a/src/query/storages/system/src/streams_table.rs b/src/query/storages/system/src/streams_table.rs index 9a30db09adf21..f752186b84fea 100644 --- a/src/query/storages/system/src/streams_table.rs +++ b/src/query/storages/system/src/streams_table.rs @@ -185,7 +185,7 @@ impl AsyncSystemTable for StreamsTable { let mut handlers = Vec::new(); for table in tables { - // If db1 is visible, do not means db1.table1 is visible. An user may have a grant about db1.table2, so db1 is visible + // If db1 is visible, do not mean db1.table1 is visible. A user may have a grant about db1.table2, so db1 is visible // for her, but db1.table1 may be not visible. So we need an extra check about table here after db visibility check. let t_id = table.get_id(); if visibility_checker.check_table_visibility(