diff --git a/apps/api/src/routes/v1_analytics_getVerifications.ts b/apps/api/src/routes/v1_analytics_getVerifications.ts index 2496d45607..083743095c 100644 --- a/apps/api/src/routes/v1_analytics_getVerifications.ts +++ b/apps/api/src/routes/v1_analytics_getVerifications.ts @@ -222,21 +222,21 @@ export const registerV1AnalyticsGetVerifications = (app: App) => const tables = { hour: { - name: "default.key_verifications_per_hour_v2", + name: "default.key_verifications_per_hour_v3", fill: `WITH FILL FROM toStartOfHour(fromUnixTimestamp64Milli({ start: Int64 })) TO toStartOfHour(fromUnixTimestamp64Milli({ end: Int64 })) STEP INTERVAL 1 HOUR`, }, day: { - name: "default.key_verifications_per_day_v2", + name: "default.key_verifications_per_day_v3", fill: `WITH FILL FROM toDate(toStartOfDay(fromUnixTimestamp64Milli({ start: Int64 }))) TO toDate(toStartOfDay(fromUnixTimestamp64Milli({ end: Int64 }))) STEP INTERVAL 1 DAY`, }, month: { - name: "default.key_verifications_per_month_v2", + name: "default.key_verifications_per_month_v3", fill: `WITH FILL FROM toDate(toStartOfMonth(fromUnixTimestamp64Milli({ start: Int64 }))) TO toDate(toStartOfMonth(fromUnixTimestamp64Milli({ end: Int64 }))) diff --git a/apps/engineering/content/docs/architecture/services/analytics.mdx b/apps/engineering/content/docs/architecture/services/analytics.mdx index 19625ac427..ea1c467456 100644 --- a/apps/engineering/content/docs/architecture/services/analytics.mdx +++ b/apps/engineering/content/docs/architecture/services/analytics.mdx @@ -65,10 +65,10 @@ Users query against friendly table names that map to actual ClickHouse tables: ```go TableAliases: map[string]string{ "key_verifications_v1": "default.key_verifications_raw_v2", - "key_verifications_per_minute_v1": "default.key_verifications_per_minute_v2", - "key_verifications_per_hour_v1": "default.key_verifications_per_hour_v2", - "key_verifications_per_day_v1": "default.key_verifications_per_day_v2", - "key_verifications_per_month_v1": "default.key_verifications_per_month_v2", + "key_verifications_per_minute_v1": "default.key_verifications_per_minute_v3", + "key_verifications_per_hour_v1": "default.key_verifications_per_hour_v3", + "key_verifications_per_day_v1": "default.key_verifications_per_day_v3", + "key_verifications_per_month_v1": "default.key_verifications_per_month_v3", } ``` diff --git a/go/apps/api/routes/v2_analytics_get_verifications/handler.go b/go/apps/api/routes/v2_analytics_get_verifications/handler.go index a4e1bb2cc4..7169eeb404 100644 --- a/go/apps/api/routes/v2_analytics_get_verifications/handler.go +++ b/go/apps/api/routes/v2_analytics_get_verifications/handler.go @@ -30,18 +30,18 @@ type ResponseData = openapi.V2AnalyticsGetVerificationsResponseData var ( tableAliases = map[string]string{ "key_verifications_v1": "default.key_verifications_raw_v2", - "key_verifications_per_minute_v1": "default.key_verifications_per_minute_v2", - "key_verifications_per_hour_v1": "default.key_verifications_per_hour_v2", - "key_verifications_per_day_v1": "default.key_verifications_per_day_v2", - "key_verifications_per_month_v1": "default.key_verifications_per_month_v2", + "key_verifications_per_minute_v1": "default.key_verifications_per_minute_v3", + "key_verifications_per_hour_v1": "default.key_verifications_per_hour_v3", + "key_verifications_per_day_v1": "default.key_verifications_per_day_v3", + "key_verifications_per_month_v1": "default.key_verifications_per_month_v3", } allowedTables = []string{ "default.key_verifications_raw_v2", - "default.key_verifications_per_minute_v2", - "default.key_verifications_per_hour_v2", - "default.key_verifications_per_day_v2", - "default.key_verifications_per_month_v2", + "default.key_verifications_per_minute_v3", + "default.key_verifications_per_hour_v3", + "default.key_verifications_per_day_v3", + "default.key_verifications_per_month_v3", } ) diff --git a/go/cmd/create-clickhouse-user/main.go b/go/cmd/create-clickhouse-user/main.go index 4b918e1396..92407dd5dc 100644 --- a/go/cmd/create-clickhouse-user/main.go +++ b/go/cmd/create-clickhouse-user/main.go @@ -66,11 +66,16 @@ unkey create-clickhouse-user --workspace-id ws_123 --username custom_user --max- var allowedTables = []string{ // Key verifications + // "default.key_verifications_raw_v2", "default.key_verifications_per_minute_v2", + "default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v2", + "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v2", + "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v2", + "default.key_verifications_per_month_v3", // Not used ATM // // Ratelimits // "default.ratelimits_raw_v2", diff --git a/go/pkg/clickhouse/key_verifications_test.go b/go/pkg/clickhouse/key_verifications_test.go index 91632bef00..7d6045c684 100644 --- a/go/pkg/clickhouse/key_verifications_test.go +++ b/go/pkg/clickhouse/key_verifications_test.go @@ -119,7 +119,7 @@ func TestKeyVerifications(t *testing.T) { }, time.Minute, time.Second) t.Run("totals are correct", func(t *testing.T) { - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { t.Run(table, func(t *testing.T) { require.EventuallyWithT(t, func(c *assert.CollectT) { queried := int64(0) @@ -138,7 +138,7 @@ func TestKeyVerifications(t *testing.T) { return acc }, map[string]int{}) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { t.Run(table, func(t *testing.T) { for outcome, count := range countByOutcome { require.EventuallyWithT(t, func(c *assert.CollectT) { @@ -164,7 +164,7 @@ func TestKeyVerifications(t *testing.T) { return acc }, map[string]int{}) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { t.Run(table, func(t *testing.T) { t.Parallel() @@ -191,7 +191,7 @@ func TestKeyVerifications(t *testing.T) { return acc }, map[string]int{}) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { t.Run(table, func(t *testing.T) { for outcome, count := range countByOutcome { require.EventuallyWithT(t, func(c *assert.CollectT) { @@ -219,7 +219,7 @@ func TestKeyVerifications(t *testing.T) { return acc }, map[string]int{}) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { t.Run(table, func(t *testing.T) { for outcome, count := range countByOutcome { require.EventuallyWithT(t, func(c *assert.CollectT) { @@ -242,7 +242,7 @@ func TestKeyVerifications(t *testing.T) { p75 := percentile(latencies, 0.75) p99 := percentile(latencies, 0.99) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { t.Run(table, func(t *testing.T) { t.Parallel() var ( @@ -266,7 +266,7 @@ func TestKeyVerifications(t *testing.T) { return acc + v.SpentCredits }, int64(0)) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { t.Run(table, func(t *testing.T) { t.Parallel() var queried int64 @@ -287,7 +287,7 @@ func TestKeyVerifications(t *testing.T) { return acc }, int64(0)) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { t.Run(table, func(t *testing.T) { t.Parallel() var queried int64 @@ -311,7 +311,7 @@ func TestKeyVerifications(t *testing.T) { return acc }, int64(0)) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { t.Run(table, func(t *testing.T) { t.Parallel() var queried int64 @@ -332,7 +332,7 @@ func TestKeyVerifications(t *testing.T) { id := identityID expectedExternalID := identityToExternalID[id] - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { tbl := table t.Run(tbl, func(t *testing.T) { t.Parallel() @@ -360,7 +360,7 @@ func TestKeyVerifications(t *testing.T) { return acc }, 0) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { tbl := table t.Run(tbl, func(t *testing.T) { t.Parallel() @@ -389,7 +389,7 @@ func TestKeyVerifications(t *testing.T) { return acc }, map[string]int{}) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { tbl := table t.Run(tbl, func(t *testing.T) { t.Parallel() @@ -415,7 +415,7 @@ func TestKeyVerifications(t *testing.T) { id := identityID extID := identityToExternalID[id] - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { tbl := table t.Run(tbl, func(t *testing.T) { t.Parallel() @@ -455,7 +455,7 @@ func TestKeyVerifications(t *testing.T) { return acc }, int64(0)) - for _, table := range []string{"default.key_verifications_per_minute_v2", "default.key_verifications_per_hour_v2", "default.key_verifications_per_day_v2", "default.key_verifications_per_month_v2"} { + for _, table := range []string{"default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v3"} { tbl := table t.Run(tbl, func(t *testing.T) { t.Parallel() diff --git a/go/pkg/clickhouse/migrations/20251125075943.sql b/go/pkg/clickhouse/migrations/20251125075943.sql new file mode 100644 index 0000000000..9a0b4e7a6d --- /dev/null +++ b/go/pkg/clickhouse/migrations/20251125075943.sql @@ -0,0 +1,97 @@ +ALTER TABLE `default`.`key_verifications_raw_v2` MODIFY TTL toDateTime(fromUnixTimestamp64Milli(time)) + toIntervalDay(90); +-- Create "key_verifications_per_day_v3" table +CREATE TABLE `default`.`key_verifications_per_day_v3` ( + `time` Date, + `workspace_id` String, + `key_space_id` String, + `identity_id` String, + `external_id` String, + `key_id` String, + `outcome` LowCardinality(String), + `tags` Array(String), + `count` SimpleAggregateFunction(sum, Int64), + `spent_credits` SimpleAggregateFunction(sum, Int64), + `latency_avg` AggregateFunction(avg, Float64), + `latency_p75` AggregateFunction(quantilesTDigest(0.75), Float64), + `latency_p99` AggregateFunction(quantilesTDigest(0.99), Float64), + INDEX `idx_identity_id` ((identity_id)) TYPE bloom_filter GRANULARITY 4, + INDEX `idx_key_id` ((key_id)) TYPE bloom_filter GRANULARITY 4, + INDEX `idx_tags` ((tags)) TYPE bloom_filter GRANULARITY 4 +) ENGINE = AggregatingMergeTree +PRIMARY KEY (`workspace_id`, `time`, `key_space_id`, `identity_id`, `external_id`, `key_id`, `outcome`, `tags`) ORDER BY (`workspace_id`, `time`, `key_space_id`, `identity_id`, `external_id`, `key_id`, `outcome`, `tags`) PARTITION BY (toStartOfMonth(time)) TTL time + toIntervalDay(356) SETTINGS index_granularity = 8192; +-- Create "key_verifications_per_hour_v3" table +CREATE TABLE `default`.`key_verifications_per_hour_v3` ( + `time` DateTime, + `workspace_id` String, + `key_space_id` String, + `identity_id` String, + `external_id` String, + `key_id` String, + `outcome` LowCardinality(String), + `tags` Array(String), + `count` SimpleAggregateFunction(sum, Int64), + `spent_credits` SimpleAggregateFunction(sum, Int64), + `latency_avg` AggregateFunction(avg, Float64), + `latency_p75` AggregateFunction(quantilesTDigest(0.75), Float64), + `latency_p99` AggregateFunction(quantilesTDigest(0.99), Float64), + INDEX `idx_identity_id` ((identity_id)) TYPE bloom_filter GRANULARITY 4, + INDEX `idx_key_id` ((key_id)) TYPE bloom_filter GRANULARITY 4, + INDEX `idx_tags` ((tags)) TYPE bloom_filter GRANULARITY 4 +) ENGINE = AggregatingMergeTree +PRIMARY KEY (`workspace_id`, `time`, `key_space_id`, `identity_id`, `external_id`, `key_id`, `outcome`, `tags`) ORDER BY (`workspace_id`, `time`, `key_space_id`, `identity_id`, `external_id`, `key_id`, `outcome`, `tags`) PARTITION BY (toStartOfDay(time)) TTL time + toIntervalDay(90) SETTINGS index_granularity = 8192; +-- Create "key_verifications_per_minute_v3" table +CREATE TABLE `default`.`key_verifications_per_minute_v3` ( + `time` DateTime, + `workspace_id` String, + `key_space_id` String, + `identity_id` String, + `external_id` String, + `key_id` String, + `outcome` LowCardinality(String), + `tags` Array(String), + `count` SimpleAggregateFunction(sum, Int64), + `spent_credits` SimpleAggregateFunction(sum, Int64), + `latency_avg` AggregateFunction(avg, Float64), + `latency_p75` AggregateFunction(quantilesTDigest(0.75), Float64), + `latency_p99` AggregateFunction(quantilesTDigest(0.99), Float64), + INDEX `idx_identity_id` ((identity_id)) TYPE bloom_filter GRANULARITY 4, + INDEX `idx_key_id` ((key_id)) TYPE bloom_filter GRANULARITY 4, + INDEX `idx_tags` ((tags)) TYPE bloom_filter GRANULARITY 4 +) ENGINE = AggregatingMergeTree +PRIMARY KEY (`workspace_id`, `time`, `key_space_id`, `identity_id`, `external_id`, `key_id`, `outcome`, `tags`) ORDER BY (`workspace_id`, `time`, `key_space_id`, `identity_id`, `external_id`, `key_id`, `outcome`, `tags`) PARTITION BY (toStartOfDay(time)) TTL time + toIntervalDay(90) SETTINGS index_granularity = 8192; +-- Create "key_verifications_per_month_v3" table +CREATE TABLE `default`.`key_verifications_per_month_v3` ( + `time` Date, + `workspace_id` String, + `key_space_id` String, + `identity_id` String, + `external_id` String, + `key_id` String, + `outcome` LowCardinality(String), + `tags` Array(String), + `count` SimpleAggregateFunction(sum, Int64), + `spent_credits` SimpleAggregateFunction(sum, Int64), + `latency_avg` AggregateFunction(avg, Float64), + `latency_p75` AggregateFunction(quantilesTDigest(0.75), Float64), + `latency_p99` AggregateFunction(quantilesTDigest(0.99), Float64), + INDEX `idx_identity_id` ((identity_id)) TYPE bloom_filter GRANULARITY 4, + INDEX `idx_key_id` ((key_id)) TYPE bloom_filter GRANULARITY 4, + INDEX `idx_tags` ((tags)) TYPE bloom_filter GRANULARITY 4 +) ENGINE = AggregatingMergeTree +PRIMARY KEY (`workspace_id`, `time`, `key_space_id`, `identity_id`, `external_id`, `key_id`, `outcome`, `tags`) ORDER BY (`workspace_id`, `time`, `key_space_id`, `identity_id`, `external_id`, `key_id`, `outcome`, `tags`) PARTITION BY (toStartOfYear(time)) TTL time + toIntervalYear(3) SETTINGS index_granularity = 8192; +-- Drop "active_workspaces_keys_per_month_mv_v2" view +DROP VIEW `default`.`active_workspaces_keys_per_month_mv_v2`; +-- Create "active_workspaces_keys_per_month_mv_v2" view +CREATE MATERIALIZED VIEW `default`.`active_workspaces_keys_per_month_mv_v2` TO `default`.`active_workspaces_per_month_v2` AS SELECT workspace_id, toDate(time) AS time FROM default.key_verifications_per_month_v3; +-- Drop "billable_verifications_per_month_mv_v2" view +DROP VIEW `default`.`billable_verifications_per_month_mv_v2`; +-- Create "billable_verifications_per_month_mv_v2" view +CREATE MATERIALIZED VIEW `default`.`billable_verifications_per_month_mv_v2` TO `default`.`billable_verifications_per_month_v2` AS SELECT workspace_id, sum(count) AS count, toYear(time) AS year, toMonth(time) AS month FROM default.key_verifications_per_month_v3 WHERE outcome = 'VALID' GROUP BY workspace_id, year, month; +-- Create "key_verifications_per_day_mv_v3" view +CREATE MATERIALIZED VIEW `default`.`key_verifications_per_day_mv_v3` TO `default`.`key_verifications_per_day_v3` AS SELECT workspace_id, key_space_id, identity_id, external_id, key_id, outcome, tags, sum(count) AS count, sum(spent_credits) AS spent_credits, avgMergeState(latency_avg) AS latency_avg, quantilesTDigestMergeState(0.75)(latency_p75) AS latency_p75, quantilesTDigestMergeState(0.99)(latency_p99) AS latency_p99, toDate(toStartOfDay(time)) AS time FROM default.key_verifications_per_hour_v3 GROUP BY workspace_id, time, key_space_id, identity_id, external_id, key_id, outcome, tags; +-- Create "key_verifications_per_hour_mv_v3" view +CREATE MATERIALIZED VIEW `default`.`key_verifications_per_hour_mv_v3` TO `default`.`key_verifications_per_hour_v3` AS SELECT workspace_id, key_space_id, identity_id, external_id, key_id, outcome, tags, sum(count) AS count, sum(spent_credits) AS spent_credits, avgMergeState(latency_avg) AS latency_avg, quantilesTDigestMergeState(0.75)(latency_p75) AS latency_p75, quantilesTDigestMergeState(0.99)(latency_p99) AS latency_p99, toStartOfHour(time) AS time FROM default.key_verifications_per_minute_v3 GROUP BY workspace_id, time, key_space_id, identity_id, external_id, key_id, outcome, tags; +-- Create "key_verifications_per_minute_mv_v3" view +CREATE MATERIALIZED VIEW `default`.`key_verifications_per_minute_mv_v3` TO `default`.`key_verifications_per_minute_v3` AS SELECT workspace_id, key_space_id, identity_id, external_id, key_id, outcome, tags, count(*) AS count, sum(spent_credits) AS spent_credits, avgState(latency) AS latency_avg, quantilesTDigestState(0.75)(latency) AS latency_p75, quantilesTDigestState(0.99)(latency) AS latency_p99, toStartOfMinute(fromUnixTimestamp64Milli(time)) AS time FROM default.key_verifications_raw_v2 GROUP BY workspace_id, time, key_space_id, identity_id, external_id, key_id, outcome, tags; +-- Create "key_verifications_per_month_mv_v3" view +CREATE MATERIALIZED VIEW `default`.`key_verifications_per_month_mv_v3` TO `default`.`key_verifications_per_month_v3` AS SELECT workspace_id, key_space_id, identity_id, external_id, key_id, outcome, tags, sum(count) AS count, sum(spent_credits) AS spent_credits, avgMergeState(latency_avg) AS latency_avg, quantilesTDigestMergeState(0.75)(latency_p75) AS latency_p75, quantilesTDigestMergeState(0.99)(latency_p99) AS latency_p99, toDate(toStartOfMonth(time)) AS time FROM default.key_verifications_per_day_v3 GROUP BY workspace_id, time, key_space_id, identity_id, external_id, key_id, outcome, tags; diff --git a/go/pkg/clickhouse/migrations/20251125163818.sql b/go/pkg/clickhouse/migrations/20251125163818.sql new file mode 100644 index 0000000000..38ff170d6a --- /dev/null +++ b/go/pkg/clickhouse/migrations/20251125163818.sql @@ -0,0 +1 @@ +ALTER TABLE `default`.`key_verifications_per_day_v3` MODIFY TTL time + toIntervalDay(365); diff --git a/go/pkg/clickhouse/migrations/atlas.sum b/go/pkg/clickhouse/migrations/atlas.sum index 1b79cd2434..bf0291444e 100644 --- a/go/pkg/clickhouse/migrations/atlas.sum +++ b/go/pkg/clickhouse/migrations/atlas.sum @@ -1,6 +1,8 @@ -h1:Ukjp4GU8zIOFJ/rY85ohkRrb5lbvGv9Wwp4eWHxDbhM= +h1:nq2n+imY5qqr2NKncHLZ92vvC1+PbBUUj54xjru+CwU= 20250903085516_init.sql h1:Id61mpzn/VxahUVr4XYj7LFcOF/VrefEbefdL57k3Sw= 20250911070454.sql h1:Fwr5vMWtnDvRISOG1Ul9si+VFbzMrUlHb3lxW4yKGB4= 20250925091254.sql h1:zHWbGFg//sSLbARpVmSCnfaSwQH8eU9Tsf7hdF/+TRY= 20251010160229.sql h1:XYT+uL8ZZsqXasXs26XyqH6RPK6RCD4g63A0JIqsFGo= -20251107152509.sql h1:yRjVW/wZfU1ssXSmt2kBKQBWOvLylMVi0zyOM+6jxfE= \ No newline at end of file +20251107152509.sql h1:yRjVW/wZfU1ssXSmt2kBKQBWOvLylMVi0zyOM+6jxfE= +20251125075943.sql h1:7CmEQYeqN7BJDhmndE9XooNYRDlT8bc8VDxjDtS9OlY= +20251125163818.sql h1:hOgn0PcIXLmWQD/toFiZo30nmPtEA6sO8whICsLJntE= diff --git a/go/pkg/clickhouse/schema/001_key_verifications_raw_v2.sql b/go/pkg/clickhouse/schema/001_key_verifications_raw_v2.sql index 444a1c8703..145ca97599 100644 --- a/go/pkg/clickhouse/schema/001_key_verifications_raw_v2.sql +++ b/go/pkg/clickhouse/schema/001_key_verifications_raw_v2.sql @@ -41,7 +41,7 @@ CREATE TABLE key_verifications_raw_v2 ) ENGINE = MergeTree() ORDER BY (workspace_id, time, key_space_id, outcome) -TTL toDateTime(fromUnixTimestamp64Milli(time)) + INTERVAL 1 MONTH DELETE +TTL toDateTime(fromUnixTimestamp64Milli(time)) + INTERVAL 90 DAY DELETE SETTINGS non_replicated_deduplication_window = 10000 ; diff --git a/go/pkg/clickhouse/schema/002_key_verifications_per_minute_v3.sql b/go/pkg/clickhouse/schema/002_key_verifications_per_minute_v3.sql new file mode 100644 index 0000000000..2d22a81188 --- /dev/null +++ b/go/pkg/clickhouse/schema/002_key_verifications_per_minute_v3.sql @@ -0,0 +1,60 @@ +CREATE TABLE key_verifications_per_minute_v3 ( + time DateTime, + workspace_id String, + key_space_id String, + identity_id String, + external_id String, + key_id String, + outcome LowCardinality (String), + tags Array(String), + count SimpleAggregateFunction(sum, Int64), + spent_credits SimpleAggregateFunction(sum, Int64), + latency_avg AggregateFunction (avg, Float64), + latency_p75 AggregateFunction (quantilesTDigest (0.75), Float64), + latency_p99 AggregateFunction (quantilesTDigest (0.99), Float64), + INDEX idx_identity_id (identity_id) TYPE bloom_filter GRANULARITY 4, + INDEX idx_key_id (key_id) TYPE bloom_filter GRANULARITY 4, + INDEX idx_tags (tags) TYPE bloom_filter GRANULARITY 4 +) ENGINE = AggregatingMergeTree () +ORDER BY + ( + workspace_id, + time, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags + ) +PARTITION BY toStartOfDay(time) +TTL time + INTERVAL 90 DAY DELETE +; + +CREATE MATERIALIZED VIEW key_verifications_per_minute_mv_v3 TO key_verifications_per_minute_v3 AS +SELECT + workspace_id, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags, + count(*) as count, + sum(spent_credits) as spent_credits, + avgState (latency) as latency_avg, + quantilesTDigestState (0.75) (latency) as latency_p75, + quantilesTDigestState (0.99) (latency) as latency_p99, + toStartOfMinute (fromUnixTimestamp64Milli (time)) AS time +FROM + key_verifications_raw_v2 +GROUP BY + workspace_id, + time, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags +; diff --git a/go/pkg/clickhouse/schema/003_key_verifications_per_hour_v3.sql b/go/pkg/clickhouse/schema/003_key_verifications_per_hour_v3.sql new file mode 100644 index 0000000000..78ac739063 --- /dev/null +++ b/go/pkg/clickhouse/schema/003_key_verifications_per_hour_v3.sql @@ -0,0 +1,59 @@ +CREATE TABLE key_verifications_per_hour_v3 ( + time DateTime, + workspace_id String, + key_space_id String, + identity_id String, + external_id String, + key_id String, + outcome LowCardinality (String), + tags Array(String), + count SimpleAggregateFunction(sum, Int64), + spent_credits SimpleAggregateFunction(sum, Int64), + latency_avg AggregateFunction (avg, Float64), + latency_p75 AggregateFunction (quantilesTDigest (0.75), Float64), + latency_p99 AggregateFunction (quantilesTDigest (0.99), Float64), + INDEX idx_identity_id (identity_id) TYPE bloom_filter GRANULARITY 4, + INDEX idx_key_id (key_id) TYPE bloom_filter GRANULARITY 4, + INDEX idx_tags (tags) TYPE bloom_filter GRANULARITY 4 +) ENGINE = AggregatingMergeTree () +ORDER BY + ( + workspace_id, + time, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags + ) +PARTITION BY toStartOfDay(time) +TTL time + INTERVAL 90 DAY DELETE; + +CREATE MATERIALIZED VIEW key_verifications_per_hour_mv_v3 TO key_verifications_per_hour_v3 AS +SELECT + workspace_id, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags, + sum(count) as count, + sum(spent_credits) as spent_credits, + avgMergeState(latency_avg) as latency_avg, + quantilesTDigestMergeState(0.75)(latency_p75) as latency_p75, + quantilesTDigestMergeState(0.99)(latency_p99) as latency_p99, + toStartOfHour(time) AS time +FROM + key_verifications_per_minute_v3 +GROUP BY + workspace_id, + time, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags +; diff --git a/go/pkg/clickhouse/schema/004_key_verifications_per_day_v3.sql b/go/pkg/clickhouse/schema/004_key_verifications_per_day_v3.sql new file mode 100644 index 0000000000..29c4c82e26 --- /dev/null +++ b/go/pkg/clickhouse/schema/004_key_verifications_per_day_v3.sql @@ -0,0 +1,60 @@ +CREATE TABLE key_verifications_per_day_v3 ( + time Date, + workspace_id String, + key_space_id String, + identity_id String, + external_id String, + key_id String, + outcome LowCardinality (String), + tags Array(String), + count SimpleAggregateFunction(sum, Int64), + spent_credits SimpleAggregateFunction(sum, Int64), + latency_avg AggregateFunction (avg, Float64), + latency_p75 AggregateFunction (quantilesTDigest (0.75), Float64), + latency_p99 AggregateFunction (quantilesTDigest (0.99), Float64), + INDEX idx_identity_id (identity_id) TYPE bloom_filter GRANULARITY 4, + INDEX idx_key_id (key_id) TYPE bloom_filter GRANULARITY 4, + INDEX idx_tags (tags) TYPE bloom_filter GRANULARITY 4 +) ENGINE = AggregatingMergeTree () +ORDER BY + ( + workspace_id, + time, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags + ) +PARTITION BY toStartOfMonth(time) +TTL time + INTERVAL 365 DAY DELETE +; + +CREATE MATERIALIZED VIEW key_verifications_per_day_mv_v3 TO key_verifications_per_day_v3 AS +SELECT + workspace_id, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags, + sum(count) as count, + sum(spent_credits) as spent_credits, + avgMergeState(latency_avg) as latency_avg, + quantilesTDigestMergeState(0.75)(latency_p75) as latency_p75, + quantilesTDigestMergeState(0.99)(latency_p99) as latency_p99, + toDate(toStartOfDay(time)) AS time +FROM + key_verifications_per_hour_v3 +GROUP BY + workspace_id, + time, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags +; diff --git a/go/pkg/clickhouse/schema/005_key_verifications_per_month_v3.sql b/go/pkg/clickhouse/schema/005_key_verifications_per_month_v3.sql new file mode 100644 index 0000000000..1ba6dad46f --- /dev/null +++ b/go/pkg/clickhouse/schema/005_key_verifications_per_month_v3.sql @@ -0,0 +1,60 @@ +CREATE TABLE key_verifications_per_month_v3 ( + time Date, + workspace_id String, + key_space_id String, + identity_id String, + external_id String, + key_id String, + outcome LowCardinality (String), + tags Array(String), + count SimpleAggregateFunction(sum, Int64), + spent_credits SimpleAggregateFunction(sum, Int64), + latency_avg AggregateFunction (avg, Float64), + latency_p75 AggregateFunction (quantilesTDigest (0.75), Float64), + latency_p99 AggregateFunction (quantilesTDigest (0.99), Float64), + INDEX idx_identity_id (identity_id) TYPE bloom_filter GRANULARITY 4, + INDEX idx_key_id (key_id) TYPE bloom_filter GRANULARITY 4, + INDEX idx_tags (tags) TYPE bloom_filter GRANULARITY 4 +) ENGINE = AggregatingMergeTree () +ORDER BY + ( + workspace_id, + time, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags + ) +PARTITION BY toStartOfYear(time) +TTL time + INTERVAL 3 YEAR DELETE +; + +CREATE MATERIALIZED VIEW key_verifications_per_month_mv_v3 TO key_verifications_per_month_v3 AS +SELECT + workspace_id, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags, + sum(count) as count, + sum(spent_credits) as spent_credits, + avgMergeState(latency_avg) as latency_avg, + quantilesTDigestMergeState(0.75)(latency_p75) as latency_p75, + quantilesTDigestMergeState(0.99)(latency_p99) as latency_p99, + toDate(toStartOfMonth(time)) AS time +FROM + key_verifications_per_day_v3 +GROUP BY + workspace_id, + time, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags +; diff --git a/go/pkg/clickhouse/schema/017_active_workspaces_per_month_v2.sql b/go/pkg/clickhouse/schema/017_active_workspaces_per_month_v2.sql index 221f6781e1..5159b08ab6 100644 --- a/go/pkg/clickhouse/schema/017_active_workspaces_per_month_v2.sql +++ b/go/pkg/clickhouse/schema/017_active_workspaces_per_month_v2.sql @@ -10,7 +10,7 @@ SELECT workspace_id, toDate (time) as time FROM - key_verifications_per_month_v2; + key_verifications_per_month_v3; CREATE MATERIALIZED VIEW active_workspaces_ratelimits_per_month_mv_v2 TO active_workspaces_per_month_v2 AS SELECT diff --git a/go/pkg/clickhouse/schema/019_billable_verifications_per_month_v2.sql b/go/pkg/clickhouse/schema/019_billable_verifications_per_month_v2.sql index 9b826954ca..3cd4ec7591 100644 --- a/go/pkg/clickhouse/schema/019_billable_verifications_per_month_v2.sql +++ b/go/pkg/clickhouse/schema/019_billable_verifications_per_month_v2.sql @@ -15,7 +15,7 @@ SELECT toYear (time) AS year, toMonth (time) AS month FROM - key_verifications_per_month_v2 + key_verifications_per_month_v3 WHERE outcome = 'VALID' GROUP BY diff --git a/go/pkg/clickhouse/user.go b/go/pkg/clickhouse/user.go index afdd682401..7aee16d6be 100644 --- a/go/pkg/clickhouse/user.go +++ b/go/pkg/clickhouse/user.go @@ -176,8 +176,12 @@ func DefaultAllowedTables() []string { // Key verifications "default.key_verifications_raw_v2", "default.key_verifications_per_minute_v2", + "default.key_verifications_per_minute_v3", "default.key_verifications_per_hour_v2", + "default.key_verifications_per_hour_v3", "default.key_verifications_per_day_v2", + "default.key_verifications_per_day_v3", "default.key_verifications_per_month_v2", + "default.key_verifications_per_month_v3", } } diff --git a/internal/clickhouse/src/keys/active_keys.ts b/internal/clickhouse/src/keys/active_keys.ts index 7f15d70f8d..0e9814eea2 100644 --- a/internal/clickhouse/src/keys/active_keys.ts +++ b/internal/clickhouse/src/keys/active_keys.ts @@ -62,80 +62,80 @@ type TimeInterval = { const ACTIVE_KEYS_INTERVALS: Record = { // Minute-based intervals minute: { - table: "default.key_verifications_per_minute_v2", + table: "default.key_verifications_per_minute_v3", step: "MINUTE", stepSize: 1, }, fiveMinutes: { - table: "default.key_verifications_per_minute_v2", + table: "default.key_verifications_per_minute_v3", step: "MINUTE", stepSize: 5, }, fifteenMinutes: { - table: "default.key_verifications_per_minute_v2", + table: "default.key_verifications_per_minute_v3", step: "MINUTE", stepSize: 15, }, thirtyMinutes: { - table: "default.key_verifications_per_minute_v2", + table: "default.key_verifications_per_minute_v3", step: "MINUTE", stepSize: 30, }, // Hour-based intervals hour: { - table: "default.key_verifications_per_hour_v2", + table: "default.key_verifications_per_hour_v3", step: "HOUR", stepSize: 1, }, twoHours: { - table: "default.key_verifications_per_hour_v2", + table: "default.key_verifications_per_hour_v3", step: "HOUR", stepSize: 2, }, fourHours: { - table: "default.key_verifications_per_hour_v2", + table: "default.key_verifications_per_hour_v3", step: "HOUR", stepSize: 4, }, sixHours: { - table: "default.key_verifications_per_hour_v2", + table: "default.key_verifications_per_hour_v3", step: "HOUR", stepSize: 6, }, twelveHours: { - table: "default.key_verifications_per_hour_v2", + table: "default.key_verifications_per_hour_v3", step: "HOUR", stepSize: 12, }, // Day-based intervals day: { - table: "default.key_verifications_per_day_v2", + table: "default.key_verifications_per_day_v3", step: "DAY", stepSize: 1, }, threeDays: { - table: "default.key_verifications_per_day_v2", + table: "default.key_verifications_per_day_v3", step: "DAY", stepSize: 3, }, week: { - table: "default.key_verifications_per_day_v2", + table: "default.key_verifications_per_day_v3", step: "DAY", stepSize: 7, }, twoWeeks: { - table: "default.key_verifications_per_day_v2", + table: "default.key_verifications_per_day_v3", step: "DAY", stepSize: 14, }, // Monthly-based intervals month: { - table: "default.key_verifications_per_month_v2", + table: "default.key_verifications_per_month_v3", step: "MONTH", stepSize: 1, }, quarter: { - table: "default.key_verifications_per_month_v2", + table: "default.key_verifications_per_month_v3", step: "MONTH", stepSize: 3, }, diff --git a/internal/clickhouse/src/verification_outcomes_propagate_correctly.test.ts b/internal/clickhouse/src/verification_outcomes_propagate_correctly.test.ts index 28ebd15420..8dae085b98 100644 --- a/internal/clickhouse/src/verification_outcomes_propagate_correctly.test.ts +++ b/internal/clickhouse/src/verification_outcomes_propagate_correctly.test.ts @@ -109,7 +109,7 @@ describe.each([10, 100, 1_000, 10_000])("with %i verifications", (n) => { } await ch.querier.query({ - query: "OPTIMIZE TABLE default.key_verifications_per_day_v2 FINAL", + query: "OPTIMIZE TABLE default.key_verifications_per_day_v3 FINAL", schema: z.any(), })({}); @@ -119,7 +119,7 @@ describe.each([10, 100, 1_000, 10_000])("with %i verifications", (n) => { SELECT outcome, SUM(count) as total - FROM default.key_verifications_per_day_v2 + FROM default.key_verifications_per_day_v3 WHERE workspace_id = '${workspaceId}' AND key_space_id = '${keySpaceId}' AND diff --git a/internal/clickhouse/src/verifications.ts b/internal/clickhouse/src/verifications.ts index 3fe011baaa..24744bef4d 100644 --- a/internal/clickhouse/src/verifications.ts +++ b/internal/clickhouse/src/verifications.ts @@ -261,80 +261,80 @@ type TimeInterval = { const INTERVALS: Record = { // Minute-based intervals minute: { - table: "default.key_verifications_per_minute_v2", + table: "default.key_verifications_per_minute_v3", step: "MINUTE", stepSize: 1, }, fiveMinutes: { - table: "default.key_verifications_per_minute_v2", + table: "default.key_verifications_per_minute_v3", step: "MINUTE", stepSize: 5, }, fifteenMinutes: { - table: "default.key_verifications_per_minute_v2", + table: "default.key_verifications_per_minute_v3", step: "MINUTE", stepSize: 15, }, thirtyMinutes: { - table: "default.key_verifications_per_minute_v2", + table: "default.key_verifications_per_minute_v3", step: "MINUTE", stepSize: 30, }, // Hour-based intervals hour: { - table: "default.key_verifications_per_hour_v2", + table: "default.key_verifications_per_hour_v3", step: "HOUR", stepSize: 1, }, twoHours: { - table: "default.key_verifications_per_hour_v2", + table: "default.key_verifications_per_hour_v3", step: "HOUR", stepSize: 2, }, fourHours: { - table: "default.key_verifications_per_hour_v2", + table: "default.key_verifications_per_hour_v3", step: "HOUR", stepSize: 4, }, sixHours: { - table: "default.key_verifications_per_hour_v2", + table: "default.key_verifications_per_hour_v3", step: "HOUR", stepSize: 6, }, twelveHours: { - table: "default.key_verifications_per_hour_v2", + table: "default.key_verifications_per_hour_v3", step: "HOUR", stepSize: 12, }, // Day-based intervals day: { - table: "default.key_verifications_per_day_v2", + table: "default.key_verifications_per_day_v3", step: "DAY", stepSize: 1, }, threeDays: { - table: "default.key_verifications_per_day_v2", + table: "default.key_verifications_per_day_v3", step: "DAY", stepSize: 3, }, week: { - table: "default.key_verifications_per_day_v2", + table: "default.key_verifications_per_day_v3", step: "DAY", stepSize: 7, }, twoWeeks: { - table: "default.key_verifications_per_day_v2", + table: "default.key_verifications_per_day_v3", step: "DAY", stepSize: 14, }, // Monthly-based intervals month: { - table: "default.key_verifications_per_month_v2", + table: "default.key_verifications_per_month_v3", step: "MONTH", stepSize: 1, }, quarter: { - table: "default.key_verifications_per_month_v2", + table: "default.key_verifications_per_month_v3", step: "MONTH", stepSize: 3, }, diff --git a/tools/migrate/ch_copy.ts b/tools/migrate/ch_copy.ts new file mode 100644 index 0000000000..9e133e9dc4 --- /dev/null +++ b/tools/migrate/ch_copy.ts @@ -0,0 +1,131 @@ +import { createClient } from "@clickhouse/client-web"; + +const now = Date.now() + 6 * 60 * 60 * 1000; + +const tables = [ + //{ + // name: "default.key_verifications_per_minute", + // dt: 60 * 60 * 1000, + // start: now - 32 * 24 * 60 * 60 * 1000, + // end: now, + //}, + //{ + // name: "default.key_verifications_per_hour", + // dt: 7 * 24 * 60 * 60 * 1000, + // start: 1763581056759, + // end: now, + //}, + //{ + // name: "default.key_verifications_per_day", + // dt: 7 * 24 * 60 * 60 * 1000, + // start: now - 100 * 24 * 60 * 60 * 1000, + // end: now, + //}, + { + name: "default.key_verifications_per_month", + dt: 1 * 30 * 24 * 60 * 60 * 1000, + start: now - 4 * 356 * 24 * 60 * 60 * 1000, + end: now, + }, +]; + +if (!process.env.CLICKHOUSE_URL) { + throw new Error("CLICKHOUSE_URL is not set"); +} + +const rawCH = createClient({ + url: process.env.CLICKHOUSE_URL, + + clickhouse_settings: { + output_format_json_quote_64bit_integers: 0, + output_format_json_quote_64bit_floats: 0, + http_send_timeout: 60000, + async_insert: 1, + async_insert_deduplicate: 1, + wait_for_async_insert: 1, + }, +}); + +const semaphore = new Map>(); +let concurrency = 1; + +for (const { name, dt, start, end } of tables) { + const v2 = `${name}_v2`; + const v3 = `${name}_v3`; + + console.info("start", start, "end", end); + + for (let t = start; t < end; t += dt) { + console.log( + `${name}: ${new Date(t).toLocaleString("de")} - ${new Date(t + dt).toLocaleString("de")}`, + ); + + const res = await rawCH.query({ + query: ` + SELECT DISTINCT key_id + FROM ${v2} + WHERE NOT startsWith(workspace_id, 'test_') + AND time >= fromUnixTimestamp64Milli(${t}) + AND time < fromUnixTimestamp64Milli(${t + dt}) + AND not startsWith(key_id, 'test_') + `, + }); + const json = (await res.json()) as { + data: Array<{ key_id: string }>; + }; + console.log(json); + + const keyIds = json.data.map(({ key_id }) => key_id); + for (let i = 0; i < keyIds.length; i++) { + const keyId = keyIds[i]; + const semKey = `${name}-${t}-${keyId}`; + + while (semaphore.size >= concurrency) { + await Promise.race(semaphore.values()); + } + + console.log(semKey, `${i}/${keyIds.length} - ${semaphore.size} / ${Math.floor(concurrency)}`); + + semaphore.set( + semKey, + rawCH + .query({ + query: ` + INSERT INTO ${v3} + SELECT + time, + workspace_id, + key_space_id, + identity_id, + external_id, + key_id, + outcome, + tags, + count, + spent_credits, + latency_avg, + latency_p75, + latency_p99 + FROM ${v2} + WHERE time >= fromUnixTimestamp64Milli(${t}) + AND time < fromUnixTimestamp64Milli(${t + dt}) + AND key_id = '${keyId}' + `, + }) + .then(() => { + concurrency = Math.min(400, concurrency + 10 / concurrency); + }) + .catch(async (err) => { + console.error(err.message); + concurrency = Math.max(1, concurrency / 2); + await new Promise((resolve) => setTimeout(resolve, 10000)); + }) + .finally(() => { + semaphore.delete(semKey); + }), + ); + } + } +} + +await Promise.all(semaphore.values()); diff --git a/tools/migrate/ch_logs.ts b/tools/migrate/ch_logs.ts index df62862b3a..c7e3929086 100644 --- a/tools/migrate/ch_logs.ts +++ b/tools/migrate/ch_logs.ts @@ -1,99 +1,232 @@ +import { createClient } from "@clickhouse/client-web"; import { ClickHouse } from "@unkey/clickhouse"; -import { mysqlDrizzle, schema } from "@unkey/db"; +import { type Identity, mysqlDrizzle, schema } from "@unkey/db"; import mysql from "mysql2/promise"; import { z } from "zod"; -async function main() { - const ch = new ClickHouse({ - url: process.env.CLICKHOUSE_URL, - }); - const conn = await mysql.createConnection( - `mysql://${process.env.DATABASE_USERNAME}:${process.env.DATABASE_PASSWORD}@${process.env.DATABASE_HOST}:3306/unkey?ssl={}`, - ); +const tables = [ + // { + // name: "default.key_verifications_per_minute_v2", + // dt: 7 * 24 * 60 * 60 * 1000, + // retention: 40 * 24 * 60 * 60 * 1000, + // }, + // + // { + // name: "default.key_verifications_per_hour_v2", + // dt: 24 * 60 * 60 * 1000, + // retention: 40 * 24 * 60 * 60 * 1000, + // }, + // { + // name: "default.key_verifications_per_day_v2", + // dt: 7 * 24 * 60 * 60 * 1000, + // retention: 7 * 30 * 24 * 60 * 60 * 1000, + // }, + { + name: "default.key_verifications_per_month_v2", + dt: 30 * 24 * 60 * 60 * 1000, + retention: 4 * 356 * 24 * 60 * 60 * 1000, + }, +]; - await conn.ping(); - const db = mysqlDrizzle(conn, { schema, mode: "default" }); +if (!process.env.CLICKHOUSE_URL) { + throw new Error("CLICKHOUSE_URL is not set"); +} - const keySpaceCache = new Map(); +const ch = new ClickHouse({ + url: process.env.CLICKHOUSE_URL, +}); - const start = 1724930749353; - const end = 1738212041696; - const workspaceId = "ws_wB4SmWrYkhSbWE2rH61S6gMseWw"; +const rawCH = createClient({ + url: process.env.CLICKHOUSE_URL, - const query = ch.querier.query({ - query: ` - SELECT * FROM default.api_requests_raw_v2 - WHERE workspace_id = '${workspaceId}' - AND time > ${start} - AND time < ${end} - AND path = '/v1/keys/verify' - AND response_status = 200 - `, - schema: z - .object({ - request_id: z.string(), - time: z.number(), - workspace_id: z.string(), - response_body: z.string(), - }) - .passthrough(), - }); - const res = await query({}); - - const logs = res.val!; - - let i = 1; - const inserts = []; - for (const log of logs) { - console.infi(i++, logs.length); - - const body = z - .object({ - keyId: z.string().optional(), - valid: z.boolean(), - ownerId: z.string().optional(), - remaining: z.number().optional(), - code: z.string().optional(), - }) - .safeParse(JSON.parse(log.response_body)); - if (!body.success) { - console.error(body.error); - console.error(log.response_body); - continue; - } + clickhouse_settings: { + output_format_json_quote_64bit_integers: 0, + output_format_json_quote_64bit_floats: 0, + http_send_timeout: 60000, + mutations_sync: "2", + }, +}); + +const conn = await mysql.createConnection( + `mysql://${process.env.DATABASE_USERNAME}:${process.env.DATABASE_PASSWORD}@${process.env.DATABASE_HOST}:3306/unkey?ssl={}`, +); + +await conn.ping(); +const db = mysqlDrizzle(conn, { schema, mode: "default" }); + +const CACHE_FILE = "identity_cache.json"; + +// Load cache from file if it exists +let deletedIdentityCache = new Map(); +const migratedIdentities = new Map(); +try { + const file = Bun.file(CACHE_FILE); + if (await file.exists()) { + const cacheData = await file.json(); + deletedIdentityCache = new Map(Object.entries(cacheData)); + console.info(`Loaded ${deletedIdentityCache.size} cached identities from ${CACHE_FILE}`); + } +} catch (err) { + console.warn("Failed to load cache file:", err); +} + +// Function to save cache to file +async function saveCache() { + try { + const cacheData = Object.fromEntries(deletedIdentityCache); + await Bun.write(CACHE_FILE, JSON.stringify(cacheData, null, 2)); + console.info(`Saved ${deletedIdentityCache.size} identities to cache file`); + } catch (err) { + console.error("Failed to save cache:", err); + } +} + +// Save cache on exit +process.on("SIGINT", async () => { + console.info("\nSaving cache before exit..."); + await saveCache(); + process.exit(0); +}); + +process.on("SIGTERM", async () => { + console.info("\nSaving cache before exit..."); + await saveCache(); + process.exit(0); +}); + +const aggregatedSchema = z.object({ + workspace_id: z.string(), + key_space_id: z.string(), + identity_id: z.string(), + external_id: z.string(), +}); + +let concurrency = 1; - if (!body.data.keyId) { +for (const table of tables) { + const end = Date.now(); + const start = end - table.retention; + + console.info("start", start, "end", end); + const semaphore = new Map>(); + + for (let t = start; t < end; t += table.dt) { + console.log( + `${table.name}: ${new Date(t).toLocaleString("de")} - ${new Date(t + table.dt).toLocaleString( + "de", + )}``${table.name}: ${new Date( + t, + ).toLocaleString()} - ${new Date(t + table.dt).toLocaleString()}`, + ); + const query = ch.querier.query({ + query: ` + SELECT + workspace_id, + key_space_id, + identity_id, + external_id + + FROM + ${table.name} + FINAL + --WHERE workspace_id != 'ws_2vUFz88G6TuzMQHZaUhXADNyZWMy' + WHERE time >= fromUnixTimestamp64Milli(${t}) + AND time < fromUnixTimestamp64Milli(${t + table.dt}) + AND identity_id != '' + AND ( external_id = '' OR external_id = 'undefined' ) + GROUP BY workspace_id, key_space_id, identity_id, external_id + `, + schema: aggregatedSchema, + }); + const res = await query({}); + + if (res.err) { + console.error("query error", res.err); continue; } + const rows = res.val; + + for (let i = 0; i < rows.length; i++) { + console.info( + `${table.name}:`, + i + 1, + "/", + rows.length, + `Concurrency: ${semaphore.size} / ${Math.floor(concurrency)}`, + ); + const row = rows[i]; - let keySpaceId = keySpaceCache.get(body.data.keyId); - if (!keySpaceId) { - const key = await db.query.keys.findFirst({ - where: (table, { eq }) => eq(table.id, body.data.keyId), - }); - if (!key) { - console.error("Key not found", body.data.keyId); + if (migratedIdentities.has(row.identity_id)) { + console.log("Identity already migrated"); continue; } - keySpaceId = key.keyAuthId; - keySpaceCache.set(body.data.keyId, keySpaceId); + + while (semaphore.size >= concurrency) { + await Promise.race(semaphore.values()); + } + + const key = `${t}-${i}`; + semaphore.set( + key, + handleRow(table.name, row) + .then(() => { + concurrency = Math.min(100, concurrency + 0.1); + }) + .catch(async (err) => { + console.error(err.message); + concurrency = Math.max(1, concurrency / 2); + await new Promise((resolve) => setTimeout(resolve, 10000)); + }) + .finally(() => { + semaphore.delete(key); + }), + ); } + } + for (const p of semaphore.values()) { + await p; + } +} + +// Save cache after processing all tables +await saveCache(); - const insert = { - workspace_id: workspaceId, - request_id: log.request_id, - time: log.time, - key_space_id: keySpaceId, - key_id: body.data.keyId, - region: "", - tags: [], - outcome: body.data.code ?? "VALID", - }; - - console.info(insert); - inserts.push(insert); +async function handleRow(table: string, row: z.infer): Promise { + let externalId = deletedIdentityCache.get(row.identity_id); + if (externalId === null) { + return; + } + let identity: Identity | undefined = undefined; + if (!externalId) { + identity = await db.query.identities.findFirst({ + where: (table, { eq }) => eq(table.id, row.identity_id), + }); + if (!identity) { + console.error("identity not found", row.identity_id); + deletedIdentityCache.set(row.identity_id, null); + await saveCache(); + return; + } + externalId = identity.externalId; + deletedIdentityCache.set(identity.id, identity.externalId); + } + if (!externalId || !identity) { + console.log({ identity }); + return; } - await ch.verifications.insert(inserts); + migratedIdentities.set(identity.id, true); + + await rawCH.exec({ + query: ` + UPDATE ${table} + SET external_id = '${externalId}' + WHERE + workspace_id = '${row.workspace_id}' + AND key_space_id = '${row.key_space_id}' + AND identity_id = '${row.identity_id}' + AND ( external_id = '' OR external_id = 'undefined' ) + `, + }); } -main(); +process.exit(0);