-
-
Notifications
You must be signed in to change notification settings - Fork 57
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(metrics): create generic sets aggregate table + indices #2782
Conversation
This PR has a migration; here is the generated SQL -- start migrations
-- migration generic_metrics : 0001_sets_aggregate_table
Local operations:
CREATE TABLE IF NOT EXISTS generic_metric_sets_local (org_id UInt64, project_id UInt64, metric_id UInt64, granularity UInt8, timestamp DateTime CODEC (DoubleDelta), retention_days UInt16, tags Nested(key UInt64, indexed_value UInt64, raw_value String), value AggregateFunction(uniqCombined64, UInt64), use_case_id LowCardinality(String)) ENGINE ReplicatedAggregatingMergeTree('/clickhouse/tables/generic_metrics_sets/{shard}/default/generic_metric_sets_local', '{replica}') PRIMARY KEY (org_id, project_id, metric_id, granularity, timestamp) ORDER BY (org_id, project_id, metric_id, granularity, timestamp, tags.key, tags.indexed_value, tags.raw_value, retention_days, use_case_id) PARTITION BY (retention_days, toMonday(timestamp)) TTL timestamp + toIntervalDay(retention_days) SETTINGS index_granularity=2048;
ALTER TABLE generic_metric_sets_local ADD COLUMN IF NOT EXISTS _indexed_tags_hash Array(UInt64) MATERIALIZED arrayMap((k, v) -> cityHash64(concat(toString(k), '=', toString(v))), tags.key, tags.indexed_value);
ALTER TABLE generic_metric_sets_local ADD COLUMN IF NOT EXISTS _raw_tags_hash Array(UInt64) MATERIALIZED arrayMap((k, v) -> cityHash64(concat(toString(k), '=', v)), tags.key, tags.raw_value);
ALTER TABLE generic_metric_sets_local ADD INDEX IF NOT EXISTS bf_indexed_tags_hash _indexed_tags_hash TYPE bloom_filter() GRANULARITY 1;
ALTER TABLE generic_metric_sets_local ADD INDEX IF NOT EXISTS bf_raw_tags_hash _raw_tags_hash TYPE bloom_filter() GRANULARITY 1;
ALTER TABLE generic_metric_sets_local ADD INDEX IF NOT EXISTS bf_tags_key_hash tags.key TYPE bloom_filter() GRANULARITY 1;
Dist operations:
CREATE TABLE IF NOT EXISTS generic_metric_sets_aggregated_dist (org_id UInt64, project_id UInt64, metric_id UInt64, granularity UInt8, timestamp DateTime CODEC (DoubleDelta), retention_days UInt16, tags Nested(key UInt64, indexed_value UInt64, raw_value String), value AggregateFunction(uniqCombined64, UInt64), use_case_id LowCardinality(String)) ENGINE Distributed(cluster_one_sh, default, generic_metric_sets_local);
-- end migration generic_metrics : 0001_sets_aggregate_table |
Column("indexed_tags", Nested([("key", UInt(64)), ("value", UInt(64))])), | ||
Column("raw_tags", Nested([("key", UInt(64)), ("value", String())])), | ||
Column("value", AggregateFunction("uniqCombined64", [UInt(64)])), | ||
Column("timeseries_id", UInt(64)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeseries_id
will be a hash generated by the consumer for sharding so we can isolate that logic from ClickHouse itself
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not need this in the aggregated merge tree. Only in the raw table.
Sharding happens at the raw table level.
- you write on the raw table
- the distributed raw table partitions data across shards according to the sharding key
- each storage node writes data on the raw table locally
- the materialized view generates the aggregated data and stores them in the aggregatingMergeTree locally (from the local raw table). There is no sharding happening at this step.
(also I think we will be able to get away with 32 bit for the time being, since it is data retained for a short period of time it will not be a problem.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see comments inline
snuba/migrations/snuba_migrations/generic_metrics/0001_sets_aggregate_table.py
Outdated
Show resolved
Hide resolved
snuba/migrations/snuba_migrations/generic_metrics/0001_sets_aggregate_table.py
Outdated
Show resolved
Hide resolved
Column("indexed_tags", Nested([("key", UInt(64)), ("value", UInt(64))])), | ||
Column("raw_tags", Nested([("key", UInt(64)), ("value", String())])), | ||
Column("value", AggregateFunction("uniqCombined64", [UInt(64)])), | ||
Column("timeseries_id", UInt(64)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should not need this in the aggregated merge tree. Only in the raw table.
Sharding happens at the raw table level.
- you write on the raw table
- the distributed raw table partitions data across shards according to the sharding key
- each storage node writes data on the raw table locally
- the materialized view generates the aggregated data and stores them in the aggregatingMergeTree locally (from the local raw table). There is no sharding happening at this step.
(also I think we will be able to get away with 32 bit for the time being, since it is data retained for a short period of time it will not be a problem.)
snuba/migrations/snuba_migrations/generic_metrics/0001_sets_aggregate_table.py
Outdated
Show resolved
Hide resolved
snuba/migrations/snuba_migrations/generic_metrics/0001_sets_aggregate_table.py
Outdated
Show resolved
Hide resolved
I think I've addressed all the issues you've brought up. |
Codecov Report
@@ Coverage Diff @@
## master #2782 +/- ##
==========================================
+ Coverage 92.83% 92.84% +0.01%
==========================================
Files 610 612 +2
Lines 28680 28723 +43
==========================================
+ Hits 26626 26669 +43
Misses 2054 2054
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
"tags", | ||
Nested( | ||
[ | ||
("key", UInt(64)), | ||
("indexed_value", UInt(64)), | ||
("raw_value", String()), | ||
] | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You do not need to do anything right now.
But watch out when you introduce the storage. I think some of the storage query processor to optimize tags may rely on the value column being called value
and they will likely have to be updated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, I noticed that might be an issue on the ArrayJoinKeyValueOptimizer
create generic_metrics_sets in ClickHouse
generic_metrics_sets
is the first aggregate table I'm creating for the metrics-enhanced performance project (internal doc: https://www.notion.so/sentry/Metrics-Enhanced-Performance-Q2-SNS-Plan-79247be0ab134aae90269de0523c35f5). The goal with this approach is to create end-to-end functionality for sets (starting from the indexer ingestion in sentry, to writing to ClickHouse, and returning results via the query API) before implementing the other metric types (counters, distributions) and their processors.Changes in this PR:
Next steps:
generic_metrics_sets
.generic_metrics_sets
Differences from old sets aggregate table:
a newtimeseries_id
column that lets us control sharding from within the python codebase.Testing: