Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
zhyass committed Jan 17, 2025
1 parent 1b6f1b2 commit 9d3f7ca
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@

use databend_common_exception::Result;
use databend_common_expression::BlockThresholds;
use databend_common_expression::Column;
use databend_common_expression::DataBlock;
use databend_common_expression::Value;
use databend_common_pipeline_core::Pipeline;

use crate::processors::AccumulatingTransform;
Expand Down Expand Up @@ -63,7 +65,7 @@ impl AccumulatingTransform for BlockCompactBuilder {

fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
let num_rows = data.num_rows();
let num_bytes = data.memory_size();
let num_bytes = memory_size(&data);

if !self.thresholds.check_for_compact(num_rows, num_bytes) {
// holding slices of blocks to merge later may lead to oom, so
Expand Down Expand Up @@ -112,3 +114,18 @@ impl AccumulatingTransform for BlockCompactBuilder {
}
}
}

pub(crate) fn memory_size(data_block: &DataBlock) -> usize {
data_block
.columns()
.iter()
.map(|entry| {
if let Value::Column(Column::Nullable(col)) = &entry.value {
if col.validity.true_count() == 0 {
return 0;
}
}
entry.memory_size()
})
.sum()
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl AccumulatingTransform for BlockCompactNoSplitBuilder {

fn transform(&mut self, data: DataBlock) -> Result<Vec<DataBlock>> {
self.accumulated_rows += data.num_rows();
self.accumulated_bytes += data.memory_size();
self.accumulated_bytes += crate::processors::memory_size(&data);
if !self
.thresholds
.check_large_enough(self.accumulated_rows, self.accumulated_bytes)
Expand Down
2 changes: 1 addition & 1 deletion src/query/storages/system/src/streams_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ impl<const T: bool> AsyncSystemTable for StreamsTable<T> {

let mut handlers = Vec::new();
for table in tables {
// If db1 is visible, do not means db1.table1 is visible. An user may have a grant about db1.table2, so db1 is visible
// If db1 is visible, do not mean db1.table1 is visible. A user may have a grant about db1.table2, so db1 is visible
// for her, but db1.table1 may be not visible. So we need an extra check about table here after db visibility check.
let t_id = table.get_id();
if visibility_checker.check_table_visibility(
Expand Down

0 comments on commit 9d3f7ca

Please sign in to comment.