Skip to content

Commit

Permalink
feat(meta): record total size of files in version (#2965)
Browse files Browse the repository at this point in the history
* add size for level

* add pending size for handler

* fix test

* merge from main

* update grafana

Signed-off-by: Little-Wallace <[email protected]>
  • Loading branch information
Little-Wallace authored Jun 7, 2022
1 parent c2a84f2 commit 1fad3e9
Show file tree
Hide file tree
Showing 11 changed files with 176 additions and 105 deletions.
2 changes: 1 addition & 1 deletion grafana/risingwave-dashboard.json

Large diffs are not rendered by default.

43 changes: 26 additions & 17 deletions grafana/risingwave-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,14 @@ def timeseries(self, title, targets):

def timeseries_count(self, title, targets):
gridPos = self.layout.next_half_width_graph()
return TimeSeries(title=title, targets=targets, gridPos=gridPos, fillOpacity=10)
return TimeSeries(title=title, targets=targets, gridPos=gridPos, fillOpacity=10, legendDisplayMode="table",
legendPlacement="right")

def timeseries_latency(self, title, targets):
gridPos = self.layout.next_half_width_graph()
return TimeSeries(title=title, targets=targets, gridPos=gridPos, unit="s", fillOpacity=10)
return TimeSeries(title=title, targets=targets, gridPos=gridPos, unit="s", fillOpacity=10,
legendDisplayMode="table", legendPlacement="right", legendCalcs=["max"])


def timeseries_bytes_per_sec(self, title, targets):
gridPos = self.layout.next_half_width_graph()
Expand All @@ -86,7 +89,8 @@ def timeseries_bytes(self, title, targets):

def timeseries_kilobytes(self, title, targets):
gridPos = self.layout.next_half_width_graph()
return TimeSeries(title=title, targets=targets, gridPos=gridPos, unit="deckbytes", fillOpacity=10)
return TimeSeries(title=title, targets=targets, gridPos=gridPos, unit="deckbytes", fillOpacity=10,
legendDisplayMode="table", legendPlacement="right", legendCalcs=["max"])

def timeseries_dollar(self, title, targets):
gridPos = self.layout.next_half_width_graph()
Expand Down Expand Up @@ -140,7 +144,12 @@ def section_compaction(panels):
panels.row("Compaction"),
panels.timeseries_count("SST Counts", [
panels.target(
"sum(storage_level_sst_num) by (instance, level_index)", "{{level_index}}"
"sum(storage_level_sst_num) by (instance, level_index)", "L{{level_index}}"
),
]),
panels.timeseries_kilobytes("KBs level sst", [
panels.target(
"sum(storage_level_total_file_size) by (instance, level_index)", "L{{level_index}}"
),
]),
panels.timeseries_count("Compaction Count", [
Expand Down Expand Up @@ -184,7 +193,7 @@ def section_compaction(panels):
]),
panels.timeseries_count("Compacting SST Count", [
panels.target(
"storage_level_compact_cnt", "{{level_index}}"
"storage_level_compact_cnt", "L{{level_index}}"
),
]),
panels.timeseries_bytes("Hummock Version Size", [
Expand All @@ -194,50 +203,50 @@ def section_compaction(panels):
]),
panels.timeseries_bytes("GBs Read from Next Level", [
panels.target(
"histogram_quantile(0.5, sum(rate(storage_level_compact_read_next_bucket[1m])) by (le, level_index))", "{{level_index}} read bytes p50"
"histogram_quantile(0.5, sum(rate(storage_level_compact_read_next_bucket[1m])) by (le, level_index))", "L{{level_index}} read bytes p50"
),
panels.target(
"histogram_quantile(0.99, sum(rate(storage_level_compact_read_next_bucket[1m])) by (le, level_index))", "{{level_index}} read bytes p99"
"histogram_quantile(0.99, sum(rate(storage_level_compact_read_next_bucket[1m])) by (le, level_index))", "L{{level_index}} read bytes p99"
),
]),
panels.timeseries_bytes("GBs Read from Current Level", [
panels.target(
"histogram_quantile(0.5, sum(rate(storage_level_compact_read_curr_bucket[1m])) by (le, level_index))", "{{level_index}} read bytes p50"
"histogram_quantile(0.5, sum(rate(storage_level_compact_read_curr_bucket[1m])) by (le, level_index))", "L{{level_index}} read bytes p50"
),
panels.target(
"histogram_quantile(0.99, sum(rate(storage_level_compact_read_curr_bucket[1m])) by (le, level_index))", "{{level_index}} read bytes p99"
"histogram_quantile(0.99, sum(rate(storage_level_compact_read_curr_bucket[1m])) by (le, level_index))", "L{{level_index}} read bytes p99"
),
]),
panels.timeseries_count("Count of SSTs Read from Current Level", [
panels.target(
"histogram_quantile(0.5, sum(rate(storage_level_compact_read_sstn_curr_bucket[1m])) by (le, level_index))", "{{level_index}} p50"
"histogram_quantile(0.5, sum(rate(storage_level_compact_read_sstn_curr_bucket[1m])) by (le, level_index))", "L{{level_index}} p50"
),
panels.target(
"histogram_quantile(0.99, sum(rate(storage_level_compact_read_sstn_curr_bucket[1m])) by (le, level_index))", "{{level_index}} p99"
"histogram_quantile(0.99, sum(rate(storage_level_compact_read_sstn_curr_bucket[1m])) by (le, level_index))", "L{{level_index}} p99"
),
]),
panels.timeseries_bytes("GBs Written to Next Level", [
panels.target(
"histogram_quantile(0.5, sum(rate(storage_level_compact_write_bucket[1m])) by (le, level_index))", "{{level_index}} write bytes p50"
"histogram_quantile(0.5, sum(rate(storage_level_compact_write_bucket[1m])) by (le, level_index))", "L{{level_index}} write bytes p50"
),
panels.target(
"histogram_quantile(0.99, sum(rate(storage_level_compact_write_bucket[1m])) by (le, level_index))", "{{level_index}} write bytes p99"
"histogram_quantile(0.99, sum(rate(storage_level_compact_write_bucket[1m])) by (le, level_index))", "L{{level_index}} write bytes p99"
),
]),
panels.timeseries_count("Count of SSTs Written to Next Level", [
panels.target(
"histogram_quantile(0.5, sum(rate(storage_level_compact_write_sstn_bucket[1m])) by (le, level_index))", "{{level_index}} write count p50"
"histogram_quantile(0.5, sum(rate(storage_level_compact_write_sstn_bucket[1m])) by (le, level_index))", "L{{level_index}} write count p50"
),
panels.target(
"histogram_quantile(0.99, sum(rate(storage_level_compact_write_sstn_bucket[1m])) by (le, level_index))", "{{level_index}} write count p99"
"histogram_quantile(0.99, sum(rate(storage_level_compact_write_sstn_bucket[1m])) by (le, level_index))", "L{{level_index}} write count p99"
),
]),
panels.timeseries_count("Count of SSTs Read from Next Level", [
panels.target(
"histogram_quantile(0.5, sum(rate(storage_level_compact_read_sstn_next_bucket[1m])) by (le, level_index))", "{{level_index}} read count p50"
"histogram_quantile(0.5, sum(rate(storage_level_compact_read_sstn_next_bucket[1m])) by (le, level_index))", "L{{level_index}} read count p50"
),
panels.target(
"histogram_quantile(0.99, sum(rate(storage_level_compact_read_sstn_next_bucket[1m])) by (le, level_index))", "{{level_index}} read count p99"
"histogram_quantile(0.99, sum(rate(storage_level_compact_read_sstn_next_bucket[1m])) by (le, level_index))", "L{{level_index}} read count p99"
),
]),
]
Expand Down
2 changes: 2 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ message Level {
uint32 level_idx = 1;
LevelType level_type = 2;
repeated SstableInfo table_infos = 3;
uint64 total_file_size = 4;
}

message UncommittedEpoch {
Expand Down Expand Up @@ -152,6 +153,7 @@ message LevelHandler {
message SstTask {
uint64 task_id = 1;
repeated uint64 ssts = 2;
uint64 total_file_size = 3;
}
uint32 level = 1;
repeated SstTask tasks = 3;
Expand Down
8 changes: 8 additions & 0 deletions src/meta/src/hummock/compaction/compaction_picker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,13 @@ impl CompactionPicker for MinOverlappingPicker {
level_idx: self.level as u32,
level_type: levels[self.level].level_type,
table_infos: select_input_ssts,
total_file_size: 0,
},
target_level: Level {
level_idx: target_level as u32,
level_type: levels[target_level].level_type,
table_infos: target_input_ssts,
total_file_size: 0,
},
split_ranges: vec![],
})
Expand All @@ -203,6 +205,7 @@ pub mod tests {
level_idx: 0,
level_type: LevelType::Overlapping as i32,
table_infos: vec![],
total_file_size: 0,
},
Level {
level_idx: 1,
Expand All @@ -212,6 +215,7 @@ pub mod tests {
generate_table(1, 1, 101, 200, 1),
generate_table(2, 1, 222, 300, 1),
],
total_file_size: 0,
},
Level {
level_idx: 2,
Expand All @@ -223,6 +227,7 @@ pub mod tests {
generate_table(7, 1, 501, 800, 1),
generate_table(8, 2, 301, 400, 1),
],
total_file_size: 0,
},
];
let mut levels_handler = vec![
Expand Down Expand Up @@ -271,6 +276,7 @@ pub mod tests {
level_idx: 0,
level_type: LevelType::Overlapping as i32,
table_infos: vec![],
total_file_size: 0,
},
Level {
level_idx: 1,
Expand All @@ -280,6 +286,7 @@ pub mod tests {
generate_table(1, 1, 100, 149, 2),
generate_table(2, 1, 150, 249, 2),
],
total_file_size: 0,
},
Level {
level_idx: 2,
Expand All @@ -288,6 +295,7 @@ pub mod tests {
generate_table(4, 1, 50, 199, 1),
generate_table(5, 1, 200, 399, 1),
],
total_file_size: 0,
},
];
let mut levels_handler = vec![
Expand Down
106 changes: 46 additions & 60 deletions src/meta/src/hummock/compaction/level_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,17 +122,13 @@ impl DynamicLevelSelector {

let mut l0_size = 0;
for level in levels.iter() {
let mut total_file_size = 0;
for table in &level.table_infos {
total_file_size += table.file_size;
}
if level.level_idx > 0 {
if total_file_size > 0 && first_non_empty_level == 0 {
if level.total_file_size > 0 && first_non_empty_level == 0 {
first_non_empty_level = level.level_idx as usize;
}
max_level_size = std::cmp::max(max_level_size, total_file_size);
max_level_size = std::cmp::max(max_level_size, level.total_file_size);
} else {
l0_size = total_file_size;
l0_size = level.total_file_size;
}
}

Expand Down Expand Up @@ -191,14 +187,9 @@ impl DynamicLevelSelector {
// The bottommost level can not be input level.
for level in &levels[..self.config.max_level] {
let level_idx = level.level_idx as usize;
let mut total_size = 0;
let mut idle_file_count = 0;
for table in &level.table_infos {
if !handlers[level_idx].is_pending_compact(&table.id) {
total_size += table.file_size;
idle_file_count += 1;
}
}
let idle_file_count =
(level.table_infos.len() - handlers[level_idx].get_pending_file_count()) as u64;
let total_size = level.total_file_size - handlers[level_idx].get_pending_file_size();
if total_size == 0 {
continue;
}
Expand Down Expand Up @@ -288,6 +279,16 @@ pub mod tests {
tables
}

fn generate_level(level_idx: u32, table_infos: Vec<SstableInfo>) -> Level {
let total_file_size = table_infos.iter().map(|sst| sst.file_size).sum();
Level {
level_idx,
level_type: LevelType::Nonoverlapping as i32,
table_infos,
total_file_size,
}
}

#[test]
fn test_dynamic_level() {
let config = CompactionConfig {
Expand All @@ -307,27 +308,12 @@ pub mod tests {
level_idx: 0,
level_type: LevelType::Overlapping as i32,
table_infos: vec![],
total_file_size: 0,
},
Level {
level_idx: 1,
level_type: LevelType::Nonoverlapping as i32,
table_infos: vec![],
},
Level {
level_idx: 2,
level_type: LevelType::Nonoverlapping as i32,
table_infos: generate_tables(0..5, 0..1000, 3, 10),
},
Level {
level_idx: 3,
level_type: LevelType::Nonoverlapping as i32,
table_infos: generate_tables(5..10, 0..1000, 2, 50),
},
Level {
level_idx: 4,
level_type: LevelType::Nonoverlapping as i32,
table_infos: generate_tables(10..15, 0..1000, 1, 200),
},
generate_level(1, vec![]),
generate_level(2, generate_tables(0..5, 0..1000, 3, 10)),
generate_level(3, generate_tables(5..10, 0..1000, 2, 50)),
generate_level(4, generate_tables(10..15, 0..1000, 1, 200)),
];
let ctx = selector.calculate_level_base_size(&levels);
assert_eq!(ctx.base_level, 2);
Expand All @@ -338,6 +324,12 @@ pub mod tests {
levels[4]
.table_infos
.append(&mut generate_tables(15..20, 2000..3000, 1, 400));
levels[4].total_file_size = levels[4]
.table_infos
.iter()
.map(|sst| sst.file_size)
.sum::<u64>();

let ctx = selector.calculate_level_base_size(&levels);
// data size increase, so we need increase one level to place more data.
assert_eq!(ctx.base_level, 1);
Expand All @@ -350,14 +342,27 @@ pub mod tests {
levels[0]
.table_infos
.append(&mut generate_tables(20..26, 0..1000, 1, 100));
levels[0].total_file_size = levels[0]
.table_infos
.iter()
.map(|sst| sst.file_size)
.sum::<u64>();

let ctx = selector.calculate_level_base_size(&levels);
assert_eq!(ctx.base_level, 2);
assert_eq!(ctx.level_max_bytes[2], 600);
assert_eq!(ctx.level_max_bytes[3], 605);
assert_eq!(ctx.level_max_bytes[4], 3025);

levels[0].table_infos.clear();
levels[0].total_file_size = 0;
levels[1].table_infos = generate_tables(26..32, 0..1000, 1, 100);
levels[1].total_file_size = levels[1]
.table_infos
.iter()
.map(|sst| sst.file_size)
.sum::<u64>();

let ctx = selector.calculate_level_base_size(&levels);
assert_eq!(ctx.base_level, 1);
assert_eq!(ctx.level_max_bytes[1], 100);
Expand All @@ -379,32 +384,13 @@ pub mod tests {
compaction_mode: RangeMode,
};
let mut levels = vec![
Level {
level_idx: 0,
level_type: LevelType::Overlapping as i32,
table_infos: generate_tables(15..25, 0..600, 3, 10),
},
Level {
level_idx: 1,
level_type: LevelType::Nonoverlapping as i32,
table_infos: vec![],
},
Level {
level_idx: 2,
level_type: LevelType::Nonoverlapping as i32,
table_infos: generate_tables(0..5, 0..1000, 3, 10),
},
Level {
level_idx: 3,
level_type: LevelType::Nonoverlapping as i32,
table_infos: generate_tables(5..10, 0..1000, 2, 50),
},
Level {
level_idx: 4,
level_type: LevelType::Nonoverlapping as i32,
table_infos: generate_tables(10..15, 0..1000, 1, 200),
},
generate_level(0, generate_tables(15..25, 0..600, 3, 10)),
generate_level(1, vec![]),
generate_level(2, generate_tables(0..5, 0..1000, 3, 10)),
generate_level(3, generate_tables(5..10, 0..1000, 2, 50)),
generate_level(4, generate_tables(10..15, 0..1000, 1, 200)),
];
levels[0].level_type = LevelType::Overlapping as i32;

let selector = DynamicLevelSelector::new(
Arc::new(config.clone()),
Expand Down
Loading

0 comments on commit 1fad3e9

Please sign in to comment.