Skip to content

Commit

Permalink
Merge pull request #459 from metrico/pre-release
Browse files Browse the repository at this point in the history
Pre release
  • Loading branch information
akvlad authored Feb 6, 2024
2 parents b00eeb6 + 4d8f0f5 commit d75c74f
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 20 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/node-clickhouse-cluster.js.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,4 @@ jobs:
CLICKHOUSE_TSDB: qryn
INTEGRATION_E2E: 1
CLOKI_EXT_URL: 127.0.0.1:3100
run: CLUSTER_NAME=test_cluster_two_shards node qryn.mjs >/dev/stdout & npm run test --forceExit
run: CLUSTER_NAME=test_cluster_two_shards node qryn.mjs >/dev/stdout & sleep 10 && npm run test --forceExit
22 changes: 21 additions & 1 deletion lib/db/maintain/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ module.exports.rotate = async (opts) => {
{ type: 'rotate', name: 'v3_time_series_days' },
{ type: 'rotate', name: 'v3_storage_policy' },
{ type: 'rotate', name: 'v1_traces_days' },
{ type: 'rotate', name: 'v1_traces_storage_policy' }
{ type: 'rotate', name: 'v1_traces_storage_policy' },
{ type: 'rotate', name: 'v1_profiles_days' }
], db.db)
const _update = (req) => {
return upgradeRequest({ db: db.db, useDefaultDB: true }, req)
Expand Down Expand Up @@ -161,5 +162,24 @@ module.exports.rotate = async (opts) => {
await _update(alterSm, null, db.db)
await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.db)
}
if (db.samples_days + '' !== settings.v1_profiles_days) {
let alterTable = 'ALTER TABLE profiles {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192'
let rotateTable = `ALTER TABLE profiles {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY`
await _update(alterTable, null, db.db)
await _update(rotateTable, null, db.db)
alterTable = 'ALTER TABLE profiles_series {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192'
rotateTable = `ALTER TABLE profiles_series {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY`
await _update(alterTable, null, db.db)
await _update(rotateTable, null, db.db)
alterTable = 'ALTER TABLE profiles_series_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192'
rotateTable = `ALTER TABLE profiles_series_gin {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY`
await _update(alterTable, null, db.db)
await _update(rotateTable, null, db.db)
alterTable = 'ALTER TABLE profiles_series_keys {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192'
rotateTable = `ALTER TABLE profiles_series_keys {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY`
await _update(alterTable, null, db.db)
await _update(rotateTable, null, db.db)
await client.addSetting('rotate', 'v1_profiles_days', db.samples_days + '', db.db)
}
}
}
8 changes: 4 additions & 4 deletions lib/db/maintain/scripts.js
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ module.exports.profiles = [
values_agg Array(Tuple(String, Int64, Int32)) CODEC(ZSTD(1))
) Engine {{MergeTree}}()
ORDER BY (type_id, service_name, timestamp_ns)
PARTITION BY toDate(FROM_UNIXTIME(intDiv(timestamp_ns, 1000000000)))`,
PARTITION BY toDate(FROM_UNIXTIME(intDiv(timestamp_ns, 1000000000))) {{{CREATE_SETTINGS}}}`,

`CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_mv {{{OnCluster}}} TO profiles AS
SELECT
Expand Down Expand Up @@ -335,7 +335,7 @@ module.exports.profiles = [
tags Array(Tuple(String, String)) CODEC(ZSTD(1)),
) Engine {{ReplacingMergeTree}}()
ORDER BY (date, type_id, fingerprint)
PARTITION BY date`,
PARTITION BY date {{{CREATE_SETTINGS}}}`,

`CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_series_mv {{{OnCluster}}} TO profiles_series AS
SELECT
Expand All @@ -362,7 +362,7 @@ module.exports.profiles = [
fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1)),
) Engine {{ReplacingMergeTree}}()
ORDER BY (date, key, val, type_id, fingerprint)
PARTITION BY date`,
PARTITION BY date {{{CREATE_SETTINGS}}}`,

`CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_series_gin_mv {{{OnCluster}}} TO profiles_series_gin AS
SELECT
Expand All @@ -382,7 +382,7 @@ module.exports.profiles = [
val_id UInt64
) Engine {{ReplacingMergeTree}}()
ORDER BY (date, key, val_id)
PARTITION BY date`,
PARTITION BY date {{{CREATE_SETTINGS}}}`,

`CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_series_keys_mv {{{OnCluster}}} TO profiles_series_keys AS
SELECT
Expand Down
46 changes: 33 additions & 13 deletions pyroscope/pyroscope.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,24 @@ const { clusterName} = require('../common')

const HISTORY_TIMESPAN = 1000 * 60 * 60 * 24 * 7

/**
*
* @param typeId {string}
*/
const parseTypeId = (typeId) => {
const typeParts = typeId.match(/^([^:]+):([^:]+):([^:]+):([^:]+):([^:]+)$/)
if (!typeParts) {
throw new QrynBadRequest('invalid type id')
}
return {
type: typeParts[1],
sampleType: typeParts[2],
sampleUnit: typeParts[3],
periodType: typeParts[4],
periodUnit: typeParts[5]
}
}

const profileTypesHandler = async (req, res) => {
const dist = clusterName ? '_dist' : ''
const _res = new messages.ProfileTypesResponse()
Expand All @@ -28,7 +46,9 @@ WHERE date >= toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)})) AND date <= toDa
_res.setProfileTypesList(profileTypes.data.data.map(profileType => {
const pt = new types.ProfileType()
const [name, periodType, periodUnit] = profileType.type_id.split(':')
pt.setId(profileType.type_id + ':' + profileType.sample_type_unit[0] + ':' + profileType.sample_type_unit[1])
const typeIdParts = profileType.type_id.match(/^([^:]+):(.*)$/)
pt.setId(typeIdParts[1] + ':' + profileType.sample_type_unit[0] + ':' + profileType.sample_type_unit[1] +
':' + typeIdParts[2])
pt.setName(name)
pt.setSampleType(profileType.sample_type_unit[0])
pt.setSampleUnit(profileType.sample_type_unit[1])
Expand Down Expand Up @@ -149,7 +169,7 @@ const labelSelectorQuery = (query, labelSelector) => {

const selectMergeStacktraces = async (req, res) => {
const dist = clusterName ? '_dist' : ''
const typeRe = req.body.getProfileTypeid().match(/^(.+):([^:]+):([^:]+)$/)
const typeRegex = parseTypeId(req.body.getProfileTypeid())
const sel = req.body.getLabelSelector()
const fromTimeSec = req.body && req.body.getStart()
? Math.floor(parseInt(req.body.getStart()) / 1000)
Expand All @@ -162,8 +182,8 @@ const selectMergeStacktraces = async (req, res) => {
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
Sql.Eq(new Sql.Raw(`has(sample_types_units, (${Sql.quoteVal(typeRe[2])},${Sql.quoteVal(typeRe[3])}))`), 1),
Sql.Eq('type_id', Sql.val(typeRe[1])),
Sql.Eq(new Sql.Raw(`has(sample_types_units, (${Sql.quoteVal(typeRegex.sampleType)},${Sql.quoteVal(typeRegex.sampleUnit)}))`), 1),
Sql.Eq('type_id', Sql.val(`${typeRegex.type}:${typeRegex.periodType}:${typeRegex.periodUnit}`)),
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`))
)
Expand Down Expand Up @@ -198,7 +218,7 @@ const selectMergeStacktraces = async (req, res) => {
i += size + shift
promises.push(new Promise((resolve, reject) => setTimeout(() => {
try {
pprofBin.merge_tree(_ctxIdx, uarray, `${typeRe[2]}:${typeRe[3]}`)
pprofBin.merge_tree(_ctxIdx, uarray, `${typeRegex.sampleType}:${typeRegex.sampleUnit}`)
resolve()
} catch (e) {
reject(e)
Expand All @@ -208,7 +228,7 @@ const selectMergeStacktraces = async (req, res) => {
let sResp = null
try {
await Promise.all(promises)
sResp = pprofBin.export_tree(_ctxIdx, `${typeRe[2]}:${typeRe[3]}`)
sResp = pprofBin.export_tree(_ctxIdx, `${typeRegex.sampleType}:${typeRegex.sampleUnit}`)
} finally {
try { pprofBin.drop_tree(_ctxIdx) } catch (e) { req.log.error(e) }
}
Expand All @@ -223,16 +243,16 @@ const selectSeries = async (req, res) => {
const toTimeSec = Math.floor(req.getEnd && req.getEnd()
? parseInt(req.getEnd()) / 1000
: Date.now() / 1000)
let typeId = _req.getProfileTypeid && _req.getProfileTypeid()
if (!typeId) {
let typeID = _req.getProfileTypeid && _req.getProfileTypeid()
if (!typeID) {
throw new QrynBadRequest('No type provided')
}
typeId = typeId.match(/^(.+):([^:]+):([^:]+)$/)
if (!typeId) {
typeID = parseTypeId(typeID)
if (!typeID) {
throw new QrynBadRequest('Invalid type provided')
}
const dist = clusterName ? '_dist' : ''
const sampleTypeId = typeId[2] + ':' + typeId[3]
const sampleTypeId = typeID.sampleType + ':' + typeID.sampleUnit
const labelSelector = _req.getLabelSelector && _req.getLabelSelector()
let groupBy = _req.getGroupByList && _req.getGroupByList()
groupBy = groupBy && groupBy.length ? groupBy : null
Expand All @@ -247,11 +267,11 @@ const selectSeries = async (req, res) => {
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
Sql.Eq('type_id', Sql.val(typeId[1])),
Sql.Eq('type_id', Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`)),
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
Sql.Eq(new Sql.Raw(
`has(sample_types_units, (${Sql.quoteVal(typeId[2])}, ${Sql.quoteVal(typeId[3])}))`),
`has(sample_types_units, (${Sql.quoteVal(typeID.sampleType)}, ${Sql.quoteVal(typeID.sampleUnit)}))`),
1)
)
)
Expand Down
2 changes: 1 addition & 1 deletion test/e2e
Submodule e2e updated from 02f8c2 to d45fb2

0 comments on commit d75c74f

Please sign in to comment.