diff --git a/lib/db/maintain/scripts.js b/lib/db/maintain/scripts.js index ad0b4dd3..7c2aa097 100644 --- a/lib/db/maintain/scripts.js +++ b/lib/db/maintain/scripts.js @@ -390,7 +390,42 @@ module.exports.profiles = [ key, val, cityHash64(val) % 50000 as val_id - FROM profiles_series_gin` + FROM profiles_series_gin`, + + `ALTER TABLE profiles_input {{{OnCluster}}} + ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))), + ADD COLUMN IF NOT EXISTS \`functions\` Array(Tuple(UInt64, String))`, + + `ALTER TABLE profiles {{{OnCluster}}} + ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))), + ADD COLUMN IF NOT EXISTS \`functions\` Array(Tuple(UInt64, String))`, + + 'RENAME TABLE IF EXISTS profiles_mv TO profiles_mv_bak {{{OnCluster}}}', + + `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_mv {{{OnCluster}}} TO profiles AS + SELECT + timestamp_ns, + cityHash64(arraySort(arrayConcat( + profiles_input.tags, [ + ('__type__', concatWithSeparator(':', type, period_type, period_unit) as _type_id), + ('__sample_types_units__', arrayStringConcat(arrayMap(x -> x.1 || ':' || x.2, arraySort(sample_types_units)), ';')), + ('service_name', service_name) + ])) as _tags) as fingerprint, + _type_id as type_id, + sample_types_units, + service_name, + duration_ns, + payload_type, + payload, + values_agg, + tree, + functions + FROM profiles_input`, + + 'DROP TABLE IF EXISTS profiles_mv_bak {{{OnCluster}}}', + + "INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('profiles_v2'), 'update', " + + "'profiles_v2', toString(toUnixTimestamp(NOW())), NOW())" ] module.exports.profiles_dist = [ @@ -427,5 +462,18 @@ module.exports.profiles_dist = [ key String, val String, val_id UInt64 - ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());` + ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());`, + + `ALTER TABLE profiles_dist {{{OnCluster}}} + ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))), + ADD COLUMN IF NOT EXISTS \`functions\` Array(Tuple(UInt64, String))`, + + `ALTER TABLE profiles_dist {{{OnCluster}}} + ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))`, + + `ALTER TABLE profiles_series_dist {{{OnCluster}}} + ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))`, + + `ALTER TABLE profiles_series_gin_dist {{{OnCluster}}} + ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))` ] diff --git a/pyroscope/pprof-bin/Cargo.toml b/pyroscope/pprof-bin/Cargo.toml index 07b59dcb..daf87289 100644 --- a/pyroscope/pprof-bin/Cargo.toml +++ b/pyroscope/pprof-bin/Cargo.toml @@ -5,6 +5,7 @@ authors = ["akvlad90@gmail.com"] edition = "2018" build = "build.rs" + [lib] crate-type = ["cdylib", "rlib"] @@ -30,6 +31,11 @@ wasm-bindgen-test = "0.3.34" [profile.release] # Tell `rustc` to optimize for small code size. opt-level = "s" +overflow-checks = false + +[profile.dev] +overflow-checks = false + [build-dependencies] prost-build = { version = "0.12.3" } diff --git a/pyroscope/pprof-bin/pkg/pprof_bin.d.ts b/pyroscope/pprof-bin/pkg/pprof_bin.d.ts index f9cc13cc..87b92058 100644 --- a/pyroscope/pprof-bin/pkg/pprof_bin.d.ts +++ b/pyroscope/pprof-bin/pkg/pprof_bin.d.ts @@ -5,13 +5,17 @@ * @param {Uint8Array} bytes * @param {string} sample_type */ -export function merge_tree(id: number, bytes: Uint8Array, sample_type: string): void; +export function merge_prof(id: number, bytes: Uint8Array, sample_type: string): void; +/** +* @param {number} id +* @param {Uint8Array} bytes +*/ +export function merge_tree(id: number, bytes: Uint8Array): void; /** * @param {number} id -* @param {string} sample_type * @returns {Uint8Array} */ -export function export_tree(id: number, sample_type: string): Uint8Array; +export function export_tree(id: number): Uint8Array; /** * @param {number} id */ diff --git a/pyroscope/pprof-bin/pkg/pprof_bin.js b/pyroscope/pprof-bin/pkg/pprof_bin.js index 0d27eae1..6d0e9559 100644 --- a/pyroscope/pprof-bin/pkg/pprof_bin.js +++ b/pyroscope/pprof-bin/pkg/pprof_bin.js @@ -88,6 +88,7 @@ function passStringToWasm0(arg, malloc, realloc) { const ret = encodeString(arg, view); offset += ret.written; + ptr = realloc(ptr, len, offset, 1) >>> 0; } WASM_VECTOR_LEN = offset; @@ -98,12 +99,22 @@ function passStringToWasm0(arg, malloc, realloc) { * @param {Uint8Array} bytes * @param {string} sample_type */ -module.exports.merge_tree = function(id, bytes, sample_type) { +module.exports.merge_prof = function(id, bytes, sample_type) { const ptr0 = passArray8ToWasm0(bytes, wasm.__wbindgen_malloc); const len0 = WASM_VECTOR_LEN; const ptr1 = passStringToWasm0(sample_type, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc); const len1 = WASM_VECTOR_LEN; - wasm.merge_tree(id, ptr0, len0, ptr1, len1); + wasm.merge_prof(id, ptr0, len0, ptr1, len1); +}; + +/** +* @param {number} id +* @param {Uint8Array} bytes +*/ +module.exports.merge_tree = function(id, bytes) { + const ptr0 = passArray8ToWasm0(bytes, wasm.__wbindgen_malloc); + const len0 = WASM_VECTOR_LEN; + wasm.merge_tree(id, ptr0, len0); }; let cachedInt32Memory0 = null; @@ -121,20 +132,17 @@ function getArrayU8FromWasm0(ptr, len) { } /** * @param {number} id -* @param {string} sample_type * @returns {Uint8Array} */ -module.exports.export_tree = function(id, sample_type) { +module.exports.export_tree = function(id) { try { const retptr = wasm.__wbindgen_add_to_stack_pointer(-16); - const ptr0 = passStringToWasm0(sample_type, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc); - const len0 = WASM_VECTOR_LEN; - wasm.export_tree(retptr, id, ptr0, len0); + wasm.export_tree(retptr, id); var r0 = getInt32Memory0()[retptr / 4 + 0]; var r1 = getInt32Memory0()[retptr / 4 + 1]; - var v2 = getArrayU8FromWasm0(r0, r1).slice(); + var v1 = getArrayU8FromWasm0(r0, r1).slice(); wasm.__wbindgen_free(r0, r1 * 1, 1); - return v2; + return v1; } finally { wasm.__wbindgen_add_to_stack_pointer(16); } diff --git a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm index 04fee2a8..4162f86e 100644 Binary files a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm and b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm differ diff --git a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts index e2fcf635..10694398 100644 --- a/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts +++ b/pyroscope/pprof-bin/pkg/pprof_bin_bg.wasm.d.ts @@ -1,8 +1,9 @@ /* tslint:disable */ /* eslint-disable */ export const memory: WebAssembly.Memory; -export function merge_tree(a: number, b: number, c: number, d: number, e: number): void; -export function export_tree(a: number, b: number, c: number, d: number): void; +export function merge_prof(a: number, b: number, c: number, d: number, e: number): void; +export function merge_tree(a: number, b: number, c: number): void; +export function export_tree(a: number, b: number): void; export function drop_tree(a: number): void; export function init_panic_hook(): void; export function __wbindgen_malloc(a: number, b: number): number; diff --git a/pyroscope/pprof-bin/src/ch64.rs b/pyroscope/pprof-bin/src/ch64.rs new file mode 100644 index 00000000..eb11e98c --- /dev/null +++ b/pyroscope/pprof-bin/src/ch64.rs @@ -0,0 +1,181 @@ + +pub fn read_uint64_le(bytes: &[u8]) -> (u64) { + let mut res: u64 = 0; + for i in 0..8 { + res |= (bytes[i] as u64) << (i * 8); + } + res +} + +const kMul: u64 = 0x9ddfea08eb382d69; + +pub fn hash_128_to_64(l: u64, h: u64) -> u64 { + let mut a = (l ^ h).wrapping_mul(kMul); + a ^= (a >> 47); + let mut b = (h ^ a).wrapping_mul(kMul); + b ^= (b >> 47); + b = b.wrapping_mul(kMul); + b +} + +const k0: u64 = 0xc3a5c85c97cb3127; +const k2: u64 = 0x9ae16a3b2f90404f; +const k1: u64 = 0xb492b66fbe98f273; +const k3: u64 = 0xc949d7c7509e6557; +fn ch16(u: u64, v: u64) -> u64 { + hash_128_to_64(u, v) +} + +fn rot64(val: u64, shift: usize) -> u64 { + if shift == 0 { + return val + } + return (val >> shift) | val<<(64-shift) +} + +fn shiftMix(val: u64) -> u64 { + return val ^ (val >> 47) +} + +fn hash16(u: u64, v: u64) -> u64 { + hash_128_to_64(u, v) +} + +fn fetch32(p: &[u8]) -> u32 { + let mut res: u32 = 0; + for i in 0..4 { + res |= (p[i] as u32) << (i * 8); + } + res +} + +fn ch33to64(s: &[u8], length: usize) -> u64 { + let mut z = read_uint64_le(&s[24..]); + let mut a = read_uint64_le(&s) + + (length as u64+read_uint64_le(&s[length-16..])).wrapping_mul(k0); + let mut b = rot64(a+z, 52); + let mut c= rot64(a, 37); + a += read_uint64_le(&s[8..]); + c += rot64(a, 7); + a += read_uint64_le(&s[16..]); + let vf= a + z; + let vs= b + rot64(a, 31) + c; + + a = read_uint64_le(&s[16..]) + read_uint64_le(&s[length-32..]); + z = read_uint64_le(&s[length-8..]); + b = rot64(a+z, 52); + c = rot64(a, 37); + a += read_uint64_le(&s[length-24..]); + c += rot64(a, 7); + a += read_uint64_le(&s[length-16..]); + + let wf= a + z; + let ws= b + rot64(a, 31) + c; + let r= shiftMix((vf+ws).wrapping_mul(k2) + (wf+vs).wrapping_mul(k0)); + return shiftMix(r.wrapping_mul(k0)+vs).wrapping_mul(k2) +} + +fn ch17to32(s: &[u8], length: usize) -> u64 { + let a = read_uint64_le(s).wrapping_mul(k1); + let b= read_uint64_le(&s[8..]); + let c= read_uint64_le(&s[length-8..]).wrapping_mul(k2); + let d= read_uint64_le(&s[length-16..]).wrapping_mul(k0); + return hash16( + rot64(a-b, 43)+rot64(c, 30)+d, + a+rot64(b^k3, 20)-c+(length as u64), + ) +} + +fn ch0to16(s: &[u8], length: usize) -> u64 { + if length > 8 { + let a = read_uint64_le(s); + let b= read_uint64_le(&s[length-8..]); + return ch16(a, rot64(b+(length as u64), (length))) ^ b; + } + if length >= 4 { + let a = (fetch32(s) as u64); + return ch16((length as u64)+(a<<3), (fetch32(&s[length-4..]) as u64)); + } + if length > 0 { + let a = s[0]; + let b = s[length>>1]; + let c= s[length-1]; + let y = (a as u32) + ((b as u32) << 8); + let z = (length as u32) + ((c as u32) << 2); + return shiftMix( + (y as u64).wrapping_mul(k2)^ + (z as u64).wrapping_mul(k3)) + .wrapping_mul(k2); + } + return k2 +} + +fn weakHash32Seeds(w: u64, x: u64, y: u64, z: u64, _a: u64, _b: u64) -> (u64, u64) { + let mut a = _a + w; + let mut b = rot64(_b+a+z, 21); + let c = a; + a += x; + a += y; + b += rot64(a, 44); + return (a+z, b+c) +} + +// Return a 16-byte hash for s[0] ... s[31], a, and b. Quick and dirty. +fn weakHash32SeedsByte(s: &[u8], a: u64, b: u64) -> (u64, u64) { + _ = s[31]; + return weakHash32Seeds( + read_uint64_le(&s[0..0+8]), + read_uint64_le(&s[8..8+8]), + read_uint64_le(&s[16..16+8]), + read_uint64_le(&s[24..24+8]), + a, + b, + ); +} + +fn nearestMultiple64(b: &[u8]) -> usize { + return ((b.len()) - 1) & !63; +} + +// CH64 returns ClickHouse version of Hash64. +pub fn city_hash_64(s: &[u8]) -> u64 { + let length = s.len(); + if length <= 16 { + return ch0to16(s, length) + } + if length <= 32 { + return ch17to32(s, length) + } + if length <= 64 { + return ch33to64(s, length) + } + + let x= read_uint64_le(s); + let y= read_uint64_le(&s[length-16..]) ^ k1; + let mut z = read_uint64_le(&s[length-56..]) ^ k0; + + let mut v= weakHash32SeedsByte(&s[length-64..], (length as u64), y); + let mut w= weakHash32SeedsByte(&s[length-32..], (length as u64).wrapping_mul(k1), k0); + z += shiftMix(v.1).wrapping_mul(k1); + let mut x = rot64(z+x, 39).wrapping_mul(k1); + let mut y = rot64(y, 33).wrapping_mul(k1); + // Decrease len to the nearest multiple of 64, and operate on 64-byte chunks. + let mut _s = &s[..nearestMultiple64(s)]; + while _s.len() > 0 { + x = rot64(x+y+v.0+read_uint64_le(&s[16..]), 37).wrapping_mul(k1); + y = rot64(y+v.1+read_uint64_le(&s[48..]), 42).wrapping_mul(k1); + x ^= w.1; + y ^= v.0; + + z = rot64(z^w.0, 33); + v = weakHash32SeedsByte(s, v.1.wrapping_mul(k1), x+w.0); + w = weakHash32SeedsByte(&s[32..], z+w.1, y); + (z, x) = (x, z); + _s = &_s[64..]; + } + return ch16( + ch16(v.0, w.0)+shiftMix(y).wrapping_mul(k1)+z, + ch16(v.1, w.1)+x, + ); +} + diff --git a/pyroscope/pprof-bin/src/lib.rs b/pyroscope/pprof-bin/src/lib.rs index 4d03363d..da5af679 100644 --- a/pyroscope/pprof-bin/src/lib.rs +++ b/pyroscope/pprof-bin/src/lib.rs @@ -1,18 +1,23 @@ #![allow(unused_assignments)] mod utils; +mod ch64; use lazy_static::lazy_static; use pprof_pb::google::v1::Function; use pprof_pb::google::v1::Location; use pprof_pb::google::v1::Profile; -use pprof_pb::querier::v1::FlameGraph; use pprof_pb::querier::v1::Level; +use pprof_pb::querier::v1::FlameGraph; use pprof_pb::querier::v1::SelectMergeStacktracesResponse; +use std::panic; use prost::Message; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::slice::SliceIndex; use std::sync::Mutex; use std::vec::Vec; use wasm_bindgen::prelude::*; +use ch64::city_hash_64; +use ch64::read_uint64_le; pub mod pprof_pb { @@ -33,35 +38,51 @@ pub mod pprof_pb { } } -struct TreeNode { - name_idx: usize, - prepend: i64, - total: i64, - _self: i64, - children: Vec, -} - -impl TreeNode { - fn append_child(&mut self, _name_idx: usize) { - self.children.push(TreeNode { - name_idx: _name_idx, - prepend: 0, - total: 0, - _self: 0, - children: Vec::new(), - }); - } +struct TreeNodeV2 { + parent_id: u64, + fn_id: u64, + node_id: u64, + slf: u64, + total: u64 } struct Tree { names: Vec, - names_map: HashMap, - root: TreeNode, + names_map: HashMap, + nodes: HashMap>, sample_type: String, max_self: i64, + nodes_num: i32 +} + +fn find_node(id: u64, nodes: &Vec) -> i32 { + let mut n: i32 = -1; + for c in 0..nodes.len() { + let _c = &nodes[c]; + if _c.node_id == id { + n = c as i32; + break; + } + } + n +} + +fn get_node_id(parent_id: u64, name_hash: u64, level: u16) -> u64 { + let mut node_bytes: [u8; 16] = [0; 16]; + for i in 0..8 { + node_bytes[i] = ((parent_id >> (i * 8)) & 0xFF) as u8; + } + for i in 0..8 { + node_bytes[i+8] = ((name_hash >> (i * 8)) & 0xFF) as u8; + } + let mut _level = level; + if _level > 511 { + _level = 511; + } + (city_hash_64(&node_bytes[0..]) >> 9) | ((_level as u64) << 55) } -unsafe fn merge(tree: &mut Tree, p: &Profile) { +fn merge(tree: &mut Tree, p: &Profile) { let mut functions: HashMap = HashMap::new(); for f in p.function.iter() { functions.insert(f.id, &f); @@ -87,50 +108,61 @@ unsafe fn merge(tree: &mut Tree, p: &Profile) { if value_idx == -1 { return; } - for i in 0..p.location.len() { - let l = &p.location[i]; + let u_value_idx = value_idx as usize; + for l in p.location.iter() { let line = &p.string_table[functions[&l.line[0].function_id].name as usize]; - if tree.names_map.contains_key(line) { + let line_hash = city_hash_64(line.as_bytes()); + if tree.names_map.contains_key(&line_hash) { continue; } tree.names.push(line.clone()); - tree.names_map.insert(line.clone(), tree.names.len() - 1); + tree.names_map.insert(line_hash, tree.names.len() - 1); } for s in p.sample.iter() { - let mut node = &mut tree.root; + let mut parent_id: u64 = 0; for i in (0..s.location_id.len()).rev() { let location = locations[&s.location_id[i]]; - let name_idx = tree.names_map - [&p.string_table[functions[&location.line[0].function_id].name as usize]]; - let mut node_idx: i32 = -1; - for j in 0..node.children.len() { - if node.children[j].name_idx == name_idx { - node_idx = j as i32; - break; - } + let name = &p.string_table[functions[&location.line[0].function_id].name as usize]; + let name_hash = city_hash_64(name.as_bytes()); + let node_id = get_node_id( + parent_id, name_hash,(s.location_id.len() - i) as u16 + ); + if !tree.nodes.contains_key(&parent_id) && tree.nodes_num < 2000000{ + tree.nodes.insert(parent_id, Vec::new()); } - if node_idx == -1 { - node.append_child(name_idx); - node_idx = (node.children.len() as i32) - 1; - } - node = &mut node.children[node_idx as usize]; - node.total += s.value[value_idx as usize]; + let mut slf: u64 = 0; if i == 0 { - node._self += s.value[value_idx as usize]; - if node._self > tree.max_self { - tree.max_self = node._self - } + slf = s.value[u_value_idx] as u64; } + if tree.max_self < slf as i64 { + tree.max_self = slf as i64; + } + let mut fake_children: Vec = Vec::new(); + let mut children = tree.nodes + .get_mut(&parent_id) + .unwrap_or(&mut fake_children); + let n = find_node(node_id, children); + if n == -1 { + children.push(TreeNodeV2 { + parent_id, + fn_id: name_hash, + node_id, + slf, + total: s.value[u_value_idx] as u64 + }); + } else if tree.nodes_num < 2000000 { + children.get_mut(n as usize).unwrap().total += s.value[u_value_idx] as u64; + children.get_mut(n as usize).unwrap().slf += slf; + tree.nodes_num += 1; + } + + parent_id = node_id; } } - tree.root.total = 0; - for c in tree.root.children.iter() { - tree.root.total += c.total; - } } -/*fn read_uleb128(bytes: &[u8]) -> (usize, usize) { +fn read_uleb128(bytes: &[u8]) -> (usize, usize) { let mut result = 0; let mut shift = 0; loop { @@ -142,50 +174,72 @@ unsafe fn merge(tree: &mut Tree, p: &Profile) { } } (result, shift) -}*/ - -static mut INTS: [i64; 4000000] = [0; 4000000]; - -unsafe fn bfs(t: &mut Tree, res: &mut Vec<&[i64]>) { - let mut valid_refs = true; - // suppress irrelevant warnings - let mut prepend: i64 = 0; - let mut k = 4; - INTS[0] = 0; - INTS[1] = t.root.total; - INTS[2] = t.root._self; - INTS[3] = t.root.name_idx as i64; - res.push(&INTS[0..4]); - let mut refs: Vec<*mut TreeNode> = vec![&mut t.root]; - let mut _refs: Vec<*mut TreeNode> = vec![]; - while valid_refs { - valid_refs = false; - prepend = 0; - let _k = k; - for i in 0..refs.len() { - let _ref = refs[i]; - prepend += (*_ref).prepend; - for j in 0..(*_ref).children.len() { - valid_refs = true; - (*_ref).children[j].prepend += prepend; - INTS[k] = (*_ref).children[j].prepend; - INTS[k + 1] = (*_ref).children[j].total; - INTS[k + 2] = (*_ref).children[j]._self; - INTS[k + 3] = (*_ref).children[j].name_idx as i64; - prepend = 0; - _refs.push(&mut (*_ref).children[j]); +} + + - k += 4; +fn bfs(t: &Tree, res: &mut Vec) { + let mut total: u64 = 0; + for i in t.nodes.get(&(0u64)).unwrap().iter() { + total += i.total; + } + let mut lvl = Level::default(); + lvl.values.extend([0, total as i64, 0, 0]); + res.push(lvl); + + let totalNode: TreeNodeV2 = TreeNodeV2 { + slf: 0, + total: total, + node_id: 0, + fn_id: 0, + parent_id: 0 + }; + let mut prepend_map: HashMap = HashMap::new(); + + let mut reviewed: HashSet = HashSet::new(); + + let mut refs: Vec<&TreeNodeV2> = vec![&totalNode]; + let mut refLen: usize = 1; + let mut i = 0; + while refLen > 0 { + i+=1; + let mut prepend: u64 = 0; + let _refs = refs.clone(); + refs.clear(); + lvl = Level::default(); + for parent in _refs.iter() { + prepend += prepend_map.get(&parent.node_id).unwrap_or(&0); + let opt = t.nodes.get(&parent.node_id); + + if opt.is_none() { + prepend += parent.total; + continue; } - if (*_ref).children.len() == 0 { - prepend += (*_ref).total; - } else { - prepend += (*_ref)._self + let mut totalSum: u64 = 0; + for n in opt.unwrap().iter() { + if reviewed.contains(&n.node_id) { + // PANIC!!! WE FOUND A LOOP + return; + } else { + reviewed.insert(n.node_id); + } + prepend_map.insert(n.node_id, prepend); + refs.push(n); + totalSum += n.total; + lvl.values.extend( + [ + prepend as i64, + n.total as i64, + n.slf as i64, + *t.names_map.get(&n.fn_id).unwrap_or(&1) as i64 + ] + ); + prepend = 0; } + prepend += parent.slf; } - res.push(&INTS[_k..k]); - std::mem::swap(&mut refs, &mut _refs); - _refs.clear(); + res.push(lvl.clone()); + refLen = refs.len(); } } @@ -193,77 +247,151 @@ lazy_static! { static ref CTX: Mutex> = Mutex::new(HashMap::new()); } -#[wasm_bindgen] -pub unsafe fn merge_tree(id: u32, bytes: &[u8], sample_type: String) { - let mut ctx = CTX.lock().unwrap(); +fn upsert_tree(ctx: &mut HashMap, id: u32) { if !ctx.contains_key(&id) { ctx.insert( id, Tree { - names: Vec::new(), + names: vec!["total".to_string(), "n/a".to_string()], names_map: HashMap::new(), - root: TreeNode { - name_idx: 0, - _self: 0, - children: vec![], - prepend: 0, - total: 0, - }, - sample_type, + nodes: HashMap::new(), + sample_type: "".to_string(), max_self: 0, + nodes_num: 1 }, ); } +} - let mut tree = ctx.get_mut(&id).unwrap(); - tree.names.push("total".to_string()); - tree.names_map.insert("total".to_string(), 0); +fn merge_trie(tree: &mut Tree, bytes: &[u8]) { + let mut size = 0; + let mut offs = 0; + (size, offs) = read_uleb128(bytes); + for _i in 0..size { + let id = read_uint64_le(&bytes[offs..]); + offs += 8; + let mut _offs: usize = 0; + let mut _size: usize = 0; + (_size, _offs) = read_uleb128(&bytes[offs..]); + offs += _offs; + if !tree.names_map.contains_key(&id) && tree.names.len() < 2000000 { + tree.names.push(String::from_utf8_lossy(&bytes[offs..offs + _size]).to_string()); + tree.names_map.insert(id, tree.names.len() - 1); + } + offs += _size; + } - let prof = Profile::decode(bytes).unwrap(); - merge(&mut tree, &prof); + let mut _offs: usize = 0; + (size, _offs) = read_uleb128(&bytes[offs..]); + offs += _offs; + for _i in 0..size { + let parent_id = read_uint64_le(&bytes[offs..]); + offs += 8; + let fn_id = read_uint64_le(&bytes[offs..]); + offs += 8; + let node_id = read_uint64_le(&bytes[offs..]); + offs += 8; + let slf = read_uint64_le(&bytes[offs..]); + offs += 8; + let total = read_uint64_le(&bytes[offs..]); + if tree.max_self < slf as i64 { + tree.max_self = slf as i64; + } + offs += 8; + if tree.nodes.contains_key(&parent_id) { + let n = find_node(node_id, tree.nodes.get(&parent_id).unwrap()); + if n != -1 { + tree.nodes.get_mut(&parent_id).unwrap().get_mut(n as usize).unwrap().total += total; + tree.nodes.get_mut(&parent_id).unwrap().get_mut(n as usize).unwrap().slf += slf; + } else if tree.nodes_num < 2000000 { + tree.nodes.get_mut(&parent_id).unwrap().push(TreeNodeV2 { + fn_id, + parent_id, + node_id, + slf, + total + }); + tree.nodes_num+=1; + } + + } else if tree.nodes_num < 2000000 { + tree.nodes.insert(parent_id, Vec::new()); + tree.nodes.get_mut(&parent_id).unwrap().push(TreeNodeV2 { + fn_id, + parent_id, + node_id, + slf, + total + }); + tree.nodes_num+=1; + } + } } #[wasm_bindgen] -pub unsafe fn export_tree(id: u32, sample_type: String) -> Vec { - let mut ctx = CTX.lock().unwrap(); - let mut res = SelectMergeStacktracesResponse::default(); - let mut tree = &mut Tree { - names: Vec::new(), - names_map: HashMap::new(), - root: TreeNode { - name_idx: 0, - _self: 0, - children: vec![], - prepend: 0, - total: 0, - }, - sample_type, - max_self: 0, - }; - tree.names.push("total".to_string()); - tree.names_map.insert("total".to_string(), 0); - if ctx.contains_key(&id) { - tree = (*ctx).get_mut(&id).unwrap(); +pub fn merge_prof(id: u32, bytes: &[u8], sample_type: String) { + let p = panic::catch_unwind(|| { + let mut ctx = CTX.lock().unwrap(); + upsert_tree(&mut ctx, id); + let mut tree = ctx.get_mut(&id).unwrap(); + tree.sample_type = sample_type; + + let prof = Profile::decode(bytes).unwrap(); + merge(&mut tree, &prof); + }); + match p { + Ok(res) => {} + Err(err) => panic!(err) + + } +} + +#[wasm_bindgen] +pub fn merge_tree(id: u32, bytes: &[u8]) { + let result = panic::catch_unwind(|| { + let mut ctx = CTX.lock().unwrap(); + upsert_tree(&mut ctx, id); + let mut tree = ctx.get_mut(&id).unwrap(); + merge_trie(&mut tree, bytes); + 0 + }); + match result { + Ok(res) => {} + Err(err) => panic!(err) } - let mut fg = FlameGraph::default(); - fg.names = tree.names.clone(); - let mut levels: Vec<&[i64]> = Vec::new(); - bfs(tree, &mut levels); - for l in levels { - let mut level = Level::default(); - for v in l.iter() { - level.values.push(*v); +} + +#[wasm_bindgen] +pub fn export_tree(id: u32) -> Vec { + let p = panic::catch_unwind(|| { + let mut ctx = CTX.lock().unwrap(); + let mut res = SelectMergeStacktracesResponse::default(); + if !ctx.contains_key(&id) { + return res.encode_to_vec(); + } + let tree = ctx.get(&id).unwrap(); + if tree.nodes.len() == 0 { + return res.encode_to_vec(); + } + let mut fg = FlameGraph::default(); + fg.names = tree.names.clone(); + fg.max_self = tree.max_self; + fg.total = 0; + for n in tree.nodes.get(&(0u64)).unwrap().iter() { + fg.total += n.total as i64; } - fg.levels.push(level); + bfs(tree, &mut fg.levels); + res.flamegraph = Some(fg); + return res.encode_to_vec(); + }); + match p { + Ok(res) => return res, + Err(err) => panic!(err) } - fg.total = tree.root.total; - fg.max_self = tree.max_self; - res.flamegraph = Some(fg); - res.encode_to_vec() } #[wasm_bindgen] -pub unsafe fn drop_tree(id: u32) { +pub fn drop_tree(id: u32) { let mut ctx = CTX.lock().unwrap(); if ctx.contains_key(&id) { ctx.remove(&id); diff --git a/pyroscope/pyroscope.js b/pyroscope/pyroscope.js index d6debca4..1d4b8c1a 100644 --- a/pyroscope/pyroscope.js +++ b/pyroscope/pyroscope.js @@ -2,7 +2,7 @@ const messages = require('./querier_pb') const types = require('./types/v1/types_pb') const services = require('./querier_grpc_pb') const clickhouse = require('../lib/db/clickhouse') -const { DATABASE_NAME } = require('../lib/utils') +const { DATABASE_NAME, checkVersion } = require('../lib/utils') const Sql = require('@cloki/clickhouse-sql') const compiler = require('../parser/bnf') const { readULeb32 } = require('./pprof') @@ -167,7 +167,54 @@ const labelSelectorQuery = (query, labelSelector) => { )) } +const serviceNameSelectorQuery = (labelSelector) => { + const empty = Sql.Eq(new Sql.Raw('1'), new Sql.Raw('1')) + if (!labelSelector || !labelSelector.length || labelSelector === '{}') { + return empty + } + const labelSelectorScript = compiler.ParseScript(labelSelector).rootToken + let conds = null + for (const rule of labelSelectorScript.Children('log_stream_selector_rule')) { + const label = rule.Child('label').value + if (label !== 'service_name') { + continue + } + const val = JSON.parse(rule.Child('quoted_str').value) + let valRul = null + switch (rule.Child('operator').value) { + case '=': + valRul = Sql.Eq(new Sql.Raw('service_name'), Sql.val(val)) + break + case '!=': + valRul = Sql.Ne(new Sql.Raw('service_name'), Sql.val(val)) + break + case '=~': + valRul = Sql.Eq(new Sql.Raw(`match(service_name, ${Sql.quoteVal(val)})`), 1) + break + case '!~': + valRul = Sql.Ne(new Sql.Raw(`match(service_name, ${Sql.quoteVal(val)})`), 1) + } + conds = valRul + } + return conds || empty +} + const selectMergeStacktraces = async (req, res) => { + return await selectMergeStacktracesV2(req, res) +} + +const sqlWithReference = (ref) => { + const res = new Sql.WithReference(ref) + res.toString = function () { + if (this.ref.inline) { + return `(${this.ref.query.toString()}) as ${this.ref.alias}` + } + return this.ref.alias + } + return res +} + +const selectMergeStacktracesV2 = async (req, res) => { const dist = clusterName ? '_dist' : '' const typeRegex = parseTypeId(req.body.getProfileTypeid()) const sel = req.body.getLabelSelector() @@ -177,31 +224,101 @@ const selectMergeStacktraces = async (req, res) => { const toTimeSec = req.body && req.body.getEnd() ? Math.floor(parseInt(req.body.getEnd()) / 1000) : Math.floor(Date.now() / 1000) + const v2 = checkVersion('profiles_v2', (fromTimeSec - 3600) * 1000) + const serviceNameSelector = serviceNameSelectorQuery(sel) + const typeIdSelector = Sql.Eq( + 'type_id', + Sql.val(`${typeRegex.type}:${typeRegex.periodType}:${typeRegex.periodUnit}`) + ) const idxSelect = (new Sql.Select()) .select('fingerprint') .from(`${DATABASE_NAME()}.profiles_series_gin`) .where( Sql.And( Sql.Eq(new Sql.Raw(`has(sample_types_units, (${Sql.quoteVal(typeRegex.sampleType)},${Sql.quoteVal(typeRegex.sampleUnit)}))`), 1), - Sql.Eq('type_id', Sql.val(`${typeRegex.type}:${typeRegex.periodType}:${typeRegex.periodUnit}`)), + typeIdSelector, Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)), - Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)) + Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)), + serviceNameSelector ) ).groupBy('fingerprint') labelSelectorQuery(idxSelect, sel) - const sqlReq = (new Sql.Select()) - .select('payload') + const withIdxSelect = new Sql.With('idx', idxSelect, !!clusterName) + const rawReq = (new Sql.Select()).with(withIdxSelect) + .select([ + new Sql.Raw(`arrayMap(x -> (x.1, x.2, x.3, (arrayFirst(y -> y.1 == ${Sql.quoteVal(`${typeRegex.sampleType}:${typeRegex.sampleUnit}`)}, x.4) as af).2, af.3), tree)`), + 'tree' + ], 'functions') .from(`${DATABASE_NAME()}.profiles${dist}`) .where( Sql.And( Sql.Gte('timestamp_ns', new Sql.Raw(Math.floor(fromTimeSec) + '000000000')), Sql.Lte('timestamp_ns', new Sql.Raw(Math.floor(toTimeSec) + '000000000')), - new Sql.In('fingerprint', 'IN', idxSelect) + new Sql.In('fingerprint', 'IN', sqlWithReference(withIdxSelect)), + typeIdSelector, + serviceNameSelector )) if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) { - sqlReq.orderBy(['timestamp_ns', 'desc']).limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT)) + rawReq.orderBy(['timestamp_ns', 'desc']).limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT)) } + const withRawReq = new Sql.With('raw', rawReq, !!clusterName) + const joinedReq = (new Sql.Select()).with(withRawReq).select([ + new Sql.Raw('(raw.tree.1, raw.tree.2, raw.tree.3, sum(raw.tree.4), sum(raw.tree.5))'), + 'tree2' + ]).from(sqlWithReference(withRawReq)) + .join('raw.tree', 'array') + .groupBy(new Sql.Raw('raw.tree.1'), new Sql.Raw('raw.tree.2'), new Sql.Raw('raw.tree.3')) + .orderBy(new Sql.Raw('raw.tree.1')).limit(2000000) + const withJoinedReq = new Sql.With('joined', joinedReq, !!clusterName) + const joinedAggregatedReq = (new Sql.Select()).select( + [new Sql.Raw('groupArray(tree2)'), 'tree']).from(sqlWithReference(withJoinedReq)) + const functionsReq = (new Sql.Select()).select( + [new Sql.Raw('groupUniqArray(raw.functions)'), 'functions2'] + ).from(sqlWithReference(withRawReq)).join('raw.functions', 'array') + + let brackLegacy = (new Sql.Select()).select( + [new Sql.Raw('[]::Array(String)'), 'legacy'] + ) + let withLegacy = null + if (!v2) { + const legacy = (new Sql.Select()).with(withIdxSelect) + .select('payload') + .from(`${DATABASE_NAME()}.profiles${dist}`) + .where( + Sql.And( + Sql.Gte('timestamp_ns', new Sql.Raw(Math.floor(fromTimeSec) + '000000000')), + Sql.Lte('timestamp_ns', new Sql.Raw(Math.floor(toTimeSec) + '000000000')), + new Sql.In('fingerprint', 'IN', sqlWithReference(withIdxSelect)), + Sql.Eq(new Sql.Raw('empty(tree)'), 1), + typeIdSelector, + serviceNameSelector + )) + if (process.env.ADVANCED_PROFILES_MERGE_LIMIT) { + legacy.orderBy(['timestamp_ns', 'desc']).limit(parseInt(process.env.ADVANCED_PROFILES_MERGE_LIMIT)) + } + withLegacy = new Sql.With('legacy', legacy, !!clusterName) + brackLegacy = (new Sql.Select()) + .select([new Sql.Raw('groupArray(payload)'), 'payloads']) + .from(sqlWithReference(withLegacy)) + } + brackLegacy = new Sql.Raw(`(${brackLegacy.toString()})`) + const brack1 = new Sql.Raw(`(${joinedAggregatedReq.toString()})`) + const brack2 = new Sql.Raw(`(${functionsReq.toString()})`) + + const sqlReq = (new Sql.Select()) + .select( + [brackLegacy, 'legacy'], + [brack2, 'functions'], + [brack1, 'tree'] + ) + if (v2) { + sqlReq.with(withJoinedReq, withRawReq) + } else { + sqlReq.with(withJoinedReq, withRawReq, withLegacy) + } + let start = Date.now() + console.log(sqlReq.toString()) const profiles = await clickhouse.rawRequest(sqlReq.toString() + ' FORMAT RowBinary', null, DATABASE_NAME(), @@ -210,40 +327,35 @@ const selectMergeStacktraces = async (req, res) => { }) const binData = Uint8Array.from(profiles.data) req.log.debug(`selectMergeStacktraces: profiles downloaded: ${binData.length / 1025}kB in ${Date.now() - start}ms`) - start = Date.now() require('./pprof-bin/pkg/pprof_bin').init_panic_hook() - const promises = [] const _ctxIdx = ++ctxIdx - let mergeTreeLat = BigInt(0) - let exportTreeLat = BigInt(0) - for (let i = 0; i < binData.length;) { - const [size, shift] = readULeb32(binData, i) - const uarray = Uint8Array.from(profiles.data.slice(i + shift, i + size + shift)) - i += size + shift - promises.push(new Promise((resolve, reject) => setTimeout(() => { - try { - const start = process.hrtime?.bigint ? process.hrtime.bigint() : 0 - pprofBin.merge_tree(_ctxIdx, uarray, `${typeRegex.sampleType}:${typeRegex.sampleUnit}`) - mergeTreeLat += (process.hrtime?.bigint ? process.hrtime.bigint() : 0) - start - resolve() - } catch (e) { - reject(e) - } - }, 0))) - } - let sResp = null + const [legacyLen, shift] = readULeb32(binData, 0) + let ofs = shift try { - await Promise.all(promises) - const start = process.hrtime?.bigint ? process.hrtime.bigint() : 0 - sResp = pprofBin.export_tree(_ctxIdx, `${typeRegex.sampleType}:${typeRegex.sampleUnit}`) - exportTreeLat += (process.hrtime?.bigint ? process.hrtime.bigint() : 0) - start + let mergePprofLat = BigInt(0); + for (let i = 0; i < legacyLen; i++) { + const [profLen, shift] = readULeb32(binData, ofs) + ofs += shift + start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0) + pprofBin.merge_prof(_ctxIdx, + Uint8Array.from(profiles.data.slice(ofs, ofs + profLen)), + `${typeRegex.sampleType}:${typeRegex.sampleUnit}`) + mergePprofLat += (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start + ofs += profLen + } + start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0) + pprofBin.merge_tree(_ctxIdx, Uint8Array.from(profiles.data.slice(ofs))) + const mergeTreeLat = (process.hrtime?.bigint? process.hrtime.bigint() : BigInt(0)) - start + start = process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0) + const resp = pprofBin.export_tree(_ctxIdx) + const exportTreeLat = (process.hrtime?.bigint ? process.hrtime.bigint() : BigInt(0)) - start + req.log.debug(`merge_pprof: ${mergePprofLat / BigInt(1000000)}ms`) + req.log.debug(`merge_tree: ${mergeTreeLat / BigInt(1000000)}ms`) + req.log.debug(`export_tree: ${exportTreeLat / BigInt(1000000)}ms`) + return res.code(200).send(Buffer.from(resp)) } finally { - req.log.debug(`selectMergeStacktraces: profiles processed: ${promises.length} in ${Date.now() - start}ms`) - req.log.debug(`selectMergeStacktraces: mergeTree: ${mergeTreeLat / BigInt(1000000)}ms`) - req.log.debug(`selectMergeStacktraces: export_tree: ${exportTreeLat / BigInt(1000000)}ms`) - try { pprofBin.drop_tree(_ctxIdx) } catch (e) { req.log.error(e) } + try { pprofBin.drop_tree(_ctxIdx) } catch (e) {} } - return res.code(200).send(Buffer.from(sResp)) } const selectSeries = async (req, res) => { @@ -273,12 +385,18 @@ const selectSeries = async (req, res) => { } const aggregation = _req.getAggregation && _req.getAggregation() + const typeIdSelector = Sql.Eq( + 'type_id', + Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`)) + const serviceNameSelector = serviceNameSelectorQuery(labelSelector) + const idxReq = (new Sql.Select()) .select(new Sql.Raw('fingerprint')) .from(`${DATABASE_NAME()}.profiles_series_gin`) .where( Sql.And( - Sql.Eq('type_id', Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`)), + typeIdSelector, + serviceNameSelector, Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)), Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)), Sql.Eq(new Sql.Raw( @@ -298,12 +416,14 @@ const selectSeries = async (req, res) => { const labelsReq = (new Sql.Select()).with(withIdxReq).select( 'fingerprint', [new Sql.Raw(tagsReq), 'tags'], - [groupBy ? 'fingerprint' : new Sql.Raw('cityHash64(tags)'), 'new_fingerprint'] + [groupBy ? new Sql.Raw('cityHash64(tags)') : 'fingerprint', 'new_fingerprint'] ).distinct(true).from([`${DATABASE_NAME()}.profiles_series`, 'p']) .where(Sql.And( new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxReq)), Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)), - Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)) + Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)), + typeIdSelector, + serviceNameSelector )) const withLabelsReq = new Sql.With('labels', labelsReq, !!clusterName) @@ -330,7 +450,9 @@ const selectSeries = async (req, res) => { Sql.And( new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)), Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)), - Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)) + Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`)), + typeIdSelector, + serviceNameSelector ) ).groupBy('timestamp_ns', 'fingerprint') .orderBy(['fingerprint', 'ASC'], ['timestamp_ns', 'ASC'])