Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: fix/profiles 574 576 #577

Merged
merged 4 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions lib/bun_wrapper.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
const { Transform } = require('stream')
const log = require('./logger')
const { EventEmitter } = require('events')
const zlib = require('zlib')

class BodyStream extends Transform {
_transform (chunk, encoding, callback) {
Expand Down Expand Up @@ -121,6 +122,16 @@ const wrapper = (handler, parsers) => {
headers['Content-Type'] = 'application/json'
response = JSON.stringify(response)
}
if (response && (ctx.headers.get('accept-encoding') || '').indexOf('gzip') !== -1) {
if (response.on) {
const _r = zlib.createGzip()
response.pipe(_r)
response = _r
} else {
response = Bun.gzipSync(response)
}
headers['Content-Encoding'] = 'gzip'
}
return new Response(response, { status: status, headers: headers })
}
return res
Expand Down
5 changes: 3 additions & 2 deletions pyroscope/json_parsers.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ const labelNames = async (req, payload) => {
return {
getStart: () => body.start,
getEnd: () => body.end,
getName: () => body.name
getName: () => body.name,
getMatchersList: () => body.matchers
}
}

Expand All @@ -43,7 +44,7 @@ const labelValues = async (req, payload) => {
body = JSON.parse(body.toString())
return {
getName: () => body.name,
getMatchers: () => body.matchers,
getMatchersList: () => body.matchers,
getStart: () => body.start,
getEnd: () => body.end
}
Expand Down
218 changes: 184 additions & 34 deletions pyroscope/pyroscope.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,24 +53,60 @@ WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND date <= toDa
}

const labelNames = async (req, res) => {
const body = req.body
const dist = clusterName ? '_dist' : ''
const fromTimeSec = Math.floor(req.body && req.body.getStart
? parseInt(req.body.getStart()) / 1000
: (Date.now() - HISTORY_TIMESPAN) / 1000)
const toTimeSec = Math.floor(req.body && req.body.getEnd
? parseInt(req.body.getEnd()) / 1000
: Date.now() / 1000)
const labelNames = await clickhouse.rawRequest(`SELECT DISTINCT key
FROM profiles_series_keys${dist}
WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND date <= toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)})) FORMAT JSON`,
null, DATABASE_NAME())
if (!body.getMatchersList || body.getMatchersList().length === 0) {
const q = `SELECT DISTINCT key
FROM profiles_series_keys ${dist}
WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))
AND date <= toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)})) FORMAT JSON`
console.log(q)
const labelNames = await clickhouse.rawRequest(q, null, DATABASE_NAME())
const resp = new types.LabelNamesResponse()
resp.setNamesList(labelNames.data.data.map(label => label.key))
return resp
}
const promises = []
for (const matcher of body.getMatchersList()) {
const specialMatchers = getSpecialMatchers(matcher)
const idxReq = matcherIdxRequest(matcher, specialMatchers, fromTimeSec, toTimeSec)
const withIdxReq = new Sql.With('idx', idxReq)
const specialClauses = specialMatchersQuery(specialMatchers.matchers,
'sample_types_units')
const serviceNameSelector = serviceNameSelectorQuery(matcher)
const req = (new Sql.Select()).with(withIdxReq)
.select('key')
.distinct(true)
.from(`profiles_series_gin${dist}`)
.where(Sql.And(
specialClauses,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxReq))
))
promises.push(clickhouse.rawRequest(req.toString() + ' FORMAT JSON', null, DATABASE_NAME()))
}
const labelNames = await Promise.all(promises)
const labelNamesDedup = Object.fromEntries(
labelNames.flatMap(val => {
return val.data.data.map(row => [row.key, true])
})
)
const resp = new types.LabelNamesResponse()
resp.setNamesList(labelNames.data.data.map(label => label.key))
resp.setNamesList([...Object.keys(labelNamesDedup)])
return resp
}

const labelValues = async (req, res) => {
const dist = clusterName ? '_dist' : ''
const body = req.body;
const name = req.body && req.body.getName
? req.body.getName()
: ''
Expand All @@ -83,13 +119,45 @@ const labelValues = async (req, res) => {
if (!name) {
throw new Error('No name provided')
}
const labelValues = await clickhouse.rawRequest(`SELECT DISTINCT val
if (!body.getMatchersList || body.getMatchersList().length === 0) {
const labelValues = await clickhouse.rawRequest(`SELECT DISTINCT val
FROM profiles_series_gin${dist}
WHERE key = ${Sql.quoteVal(name)} AND
date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND
date <= toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)})) FORMAT JSON`, null, DATABASE_NAME())
const resp = new types.LabelValuesResponse()
resp.setNamesList(labelValues.data.data.map(label => label.val))
return resp
}
const promises = []
for (const matcher of body.getMatchersList()) {
const specialMatchers = getSpecialMatchers(matcher)
const idxReq = matcherIdxRequest(matcher, specialMatchers, fromTimeSec, toTimeSec)
const withIdxReq = new Sql.With('idx', idxReq)
const specialClauses = specialMatchersQuery(specialMatchers.matchers,
'sample_types_units')
const serviceNameSelector = serviceNameSelectorQuery(matcher)
const req = (new Sql.Select()).with(withIdxReq)
.select('val')
.distinct(true)
.from(`profiles_series_gin${dist}`)
.where(Sql.And(
specialClauses,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
Sql.Eq('key', name),
new Sql.In('fingerprint', 'IN', new Sql.WithReference(withIdxReq))
))
console.log(req.toString())
promises.push(clickhouse.rawRequest(req.toString() + ' FORMAT JSON', null, DATABASE_NAME()))
}
const labelValues = await Promise.all(promises)
const labelValuesDedup = Object.fromEntries(
labelValues.flatMap(val => val.data.data.map(row => [row.val, true]))
)
const resp = new types.LabelValuesResponse()
resp.setNamesList(labelValues.data.data.map(label => label.val))
resp.setNamesList([...Object.keys(labelValuesDedup)])
return resp
}

Expand Down Expand Up @@ -244,6 +312,36 @@ const selectMergeProfile = async (req, res) => {
}
}

/**
*
* @param labelSelector {string}
* @param specialMatchers {object || undefined}
* @param fromTimeSec {number}
* @param toTimeSec {number}
* @returns {Sql.Select}
*/
const matcherIdxRequest = (labelSelector, specialMatchers, fromTimeSec, toTimeSec) => {
specialMatchers = specialMatchers || getSpecialMatchers(labelSelector)
const specialClauses = specialMatchersQuery(specialMatchers.matchers,
'sample_types_units')
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)
const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
specialClauses,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`))
)
)
if (!specialMatchers.query.match(/^[{} ]*$/)) {
labelSelectorQuery(idxReq, specialMatchers.query)
}
return idxReq
}

const series = async (req, res) => {
const _req = req.body
const fromTimeSec = Math.floor(_req.getStart && _req.getStart()
Expand All @@ -256,19 +354,40 @@ const series = async (req, res) => {
const promises = []
for (const labelSelector of _req.getMatchersList() || []) {
const specialMatchers = getSpecialMatchers(labelSelector)
const specialClauses = specialMatchersQuery(specialMatchers.matchers)
// Special matchers -> query clauses
const sampleTypesUnitsFieldName = '_sample_types_units'
const clauses = []
if (specialMatchers.__name__) {
clauses.push(matcherClause("splitByChar(':', type_id)[1]", specialMatchers.__name__))
}
if (specialMatchers.__period_type__) {
clauses.push(matcherClause("splitByChar(':', type_id)[2]", specialMatchers.__period_type__))
}
if (specialMatchers.__period_unit__) {
clauses.push(matcherClause("splitByChar(':', type_id)[3]", specialMatchers.__period_unit__))
}
if (specialMatchers.__sample_type__) {
clauses.push(matcherClause(`${sampleTypesUnitsFieldName}.1`, specialMatchers.__sample_type__))
}
if (specialMatchers.__sample_unit__) {
clauses.push(matcherClause(`${sampleTypesUnitsFieldName}.2`, specialMatchers.__sample_unit__))
}
if (specialMatchers.__profile_type__) {
clauses.push(matcherClause(
`format('{}:{}:{}:{}:{}', (splitByChar(':', type_id) as _parts)[1], ${sampleTypesUnitsFieldName}.1, ${sampleTypesUnitsFieldName}.2, _parts[2], _parts[3])`,
specialMatchers.__profile_type__))
}
let specialClauses = null
if (clauses.length === 0) {
specialClauses = Sql.Eq(new Sql.Raw('1'), 1)
} else if (clauses.length === 1) {
specialClauses = clauses[0]
} else {
specialClauses = Sql.And(...clauses)
}
//
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)
const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`))
)
)
labelSelectorQuery(idxReq, specialMatchers.query)
const idxReq = matcherIdxRequest(labelSelector, specialMatchers, fromTimeSec, toTimeSec)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
const labelsReq = (new Sql.Select())
.with(withIdxReq)
Expand Down Expand Up @@ -349,8 +468,21 @@ const series = async (req, res) => {
}

/**
* returns special matchers and sanitized query without them as following:
* @example
* {
* "matchers": {
* "__name__": ["=", "foo"],
* "__period_type__": ["=~", "bar"],
* },
* "query": "{service_name=\"abc\", job=\"def\"}"
* }
*
* @param query {string}
* @returns {{
* matchers: { [fieldName: string]: [operator: string, value: string] },
* query: string
* }}
*/
const getSpecialMatchers = (query) => {
if (query.length <= 2) {
Expand Down Expand Up @@ -395,27 +527,45 @@ const matcherClause = (field, matcher) => {
return valRul
}

const specialMatchersQuery = (matchers) => {
/**
* @example
* specialMatchersQuery({
* "__name__": ["=", "foo"],
* "__period_type__": ["=~", "bar"],
* })
*
* @param specialMatchers {Object}
* @returns {Sql.Condition}
*/
const specialMatchersQuery = (specialMatchers) => {
const sampleTypesUnitsFieldName = 'sample_types_units'
const clauses = []
if (matchers.__name__) {
clauses.push(matcherClause("splitByChar(':', type_id)[1]", matchers.__name__))
if (specialMatchers.__name__) {
clauses.push(matcherClause("splitByChar(':', type_id)[1]", specialMatchers.__name__))
}
if (matchers.__period_type__) {
clauses.push(matcherClause("splitByChar(':', type_id)[2]", matchers.__period_type__))
if (specialMatchers.__period_type__) {
clauses.push(matcherClause("splitByChar(':', type_id)[2]", specialMatchers.__period_type__))
}
if (matchers.__period_unit__) {
clauses.push(matcherClause("splitByChar(':', type_id)[3]", matchers.__period_unit__))
if (specialMatchers.__period_unit__) {
clauses.push(matcherClause("splitByChar(':', type_id)[3]", specialMatchers.__period_unit__))
}
const arrayExists = (field) => {
const arrayExists = Sql.Condition(null, null, null)
arrayExists.toString = () => {
return `arrayExists(x -> ${field}, ${sampleTypesUnitsFieldName})`
}
return arrayExists
}
if (matchers.__sample_type__) {
clauses.push(matcherClause('_sample_types_units.1', matchers.__sample_type__))
if (specialMatchers.__sample_type__) {
clauses.push(arrayExists(matcherClause('x.1', specialMatchers.__sample_type__)))
}
if (matchers.__sample_unit__) {
clauses.push(matcherClause('_sample_types_units.2', matchers.__sample_unit__))
if (specialMatchers.__sample_unit__) {
clauses.push(arrayExists(matcherClause('x.2', specialMatchers.__sample_unit__)))
}
if (matchers.__profile_type__) {
clauses.push(matcherClause(
"format('{}:{}:{}:{}:{}', (splitByChar(':', type_id) as _parts)[1], _sample_types_units.1, _sample_types_units.2, _parts[2], _parts[3])",
matchers.__profile_type__))
if (specialMatchers.__profile_type__) {
clauses.push(arrayExists(matcherClause(
"format('{}:{}:{}:{}:{}', (splitByChar(':', type_id) as _parts)[1], x.1, x.2, _parts[2], _parts[3])",
specialMatchers.__profile_type__)))
}
if (clauses.length === 0) {
return Sql.Eq(new Sql.Raw('1'), 1)
Expand Down
4 changes: 2 additions & 2 deletions pyroscope/render.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const render = async (req, res) => {
? Math.floor(parseInt(req.query.until) / 1000)
: Math.floor((Date.now() - 1000 * 60 * 60 * 48) / 1000)
if (!parsedQuery) {
return res.sendStatus(400).send('Invalid query')
return res.code(400).send('Invalid query')
}
const groupBy = req.query.groupBy || []
let agg = ''
Expand All @@ -26,7 +26,7 @@ const render = async (req, res) => {
break
}
if (req.query.format === 'dot') {
return res.sendStatus(400).send('Dot format is not supported')
return res.code(400).send('Dot format is not supported')
}
const promises = []
promises.push(mergeStackTraces(
Expand Down
2 changes: 1 addition & 1 deletion test/e2e
Submodule e2e updated from 376a7d to 7d7767
Loading