Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replays): Materialize replays_local table #5708

Merged
merged 37 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
9269bcb
Add materialized view
cmanallen Mar 29, 2024
3fb9a85
Fix data type
cmanallen Mar 29, 2024
0f85114
Add array materializations
cmanallen Apr 1, 2024
62f977d
Merge branch 'master' into cmanallen/replays-add-materialized-view
cmanallen Apr 1, 2024
df4b2dd
Materialize viewed-by-ids
cmanallen Apr 1, 2024
62e92f5
Remove .py extension
cmanallen Apr 1, 2024
9a7dc10
Add target
cmanallen Apr 1, 2024
5f8f369
Fix lint issues
cmanallen Apr 1, 2024
702ce00
Fix type issues
cmanallen Apr 1, 2024
fafc4dd
Drop use of simple aggregation state
cmanallen Apr 1, 2024
2e11f3b
Remove formatting
cmanallen Apr 1, 2024
168c28d
Remove YAML
cmanallen Apr 1, 2024
c6e9ade
style(lint): Auto commit lint changes
getsantry[bot] Apr 1, 2024
ff9c4b3
Remove formatting again
cmanallen Apr 1, 2024
a751e2e
Merge branch 'cmanallen/replays-add-materialized-view' of https://git…
cmanallen Apr 1, 2024
661fd51
Fix typing
cmanallen Apr 1, 2024
5db0240
Fix materialized view syntax
cmanallen Apr 2, 2024
79ada55
Remove low cardinality from materialization
cmanallen Apr 2, 2024
8b06ae9
Use backwards compatible filter
cmanallen Apr 2, 2024
bb6bc4a
Use replays storage set
cmanallen Apr 2, 2024
b72f371
Add down migration for materialized view
cmanallen Apr 3, 2024
17182d4
Just drop the table
cmanallen Apr 3, 2024
91a0a58
Merge branch 'master' into cmanallen/replays-add-materialized-view
cmanallen Apr 4, 2024
fa51db8
Merge branch 'master' into cmanallen/replays-add-materialized-view
cmanallen Apr 10, 2024
db9a833
Remove aggregated arrays
cmanallen Apr 10, 2024
09c13b9
Remove replay_id index
cmanallen Apr 10, 2024
39e57a4
Rename columns and add min_segment_id column
cmanallen Apr 10, 2024
3477c3e
Revert "Remove replay_id index"
cmanallen Apr 10, 2024
df93728
Remove release
cmanallen Apr 10, 2024
0da4f00
Allow null
cmanallen Apr 11, 2024
3c3bf38
Update ordering
cmanallen Apr 11, 2024
4b962d5
Wrap with modifiers
cmanallen Apr 11, 2024
3ec3b94
Group retention_days
cmanallen Apr 11, 2024
268258b
Remove index
cmanallen Apr 15, 2024
c48a616
Add retention_days to the order-by
cmanallen Apr 15, 2024
ca06bf0
Use maxIf for finished_at column. Only accumulate values if its a seg…
cmanallen Apr 17, 2024
2717862
Remove toHour wrapper from ordering key
cmanallen Apr 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions snuba/migrations/group_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ def get_migrations(self) -> Sequence[str]:
"0016_materialize_new_event_counts",
"0017_add_component_name_column",
"0018_add_viewed_by_id_column",
"0019_add_materialization",
]


Expand Down
182 changes: 182 additions & 0 deletions snuba/snuba_migrations/replays/0019_add_materialization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
from typing import Iterator, List, Sequence

from snuba.clickhouse.columns import (
UUID,
AggregateFunction,
Column,
DateTime,
IPv4,
IPv6,
String,
UInt,
)
from snuba.clusters.storage_sets import StorageSetKey
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers


class Migration(migration.ClickhouseNodeMigration):
blocking = False

def forwards_ops(self) -> Sequence[operations.SqlOperation]:
return list(forward_iter())

def backwards_ops(self) -> Sequence[operations.SqlOperation]:
return list(backward_iter())


def forward_iter() -> Iterator[operations.SqlOperation]:
yield operations.CreateTable(
storage_set=StorageSetKey.REPLAYS,
table_name="replays_aggregated_local",
columns=columns,
engine=table_engines.AggregatingMergeTree(
storage_set=StorageSetKey.REPLAYS,
order_by="(project_id, to_hour_timestamp, replay_id, retention_days)",
partition_by="(retention_days, toMonday(to_hour_timestamp))",
settings={"index_granularity": "8192"},
ttl="to_hour_timestamp + toIntervalDay(retention_days)",
),
target=operations.OperationTarget.LOCAL,
)

yield operations.CreateMaterializedView(
storage_set=StorageSetKey.REPLAYS,
view_name="replays_aggregation_mv",
destination_table_name="replays_aggregated_local",
columns=columns,
query="""
SELECT
project_id,
toStartOfHour(timestamp) as to_hour_timestamp,
replay_id,
retention_days,
anyIfState(browser_name, browser_name != '') as browser_name,
anyIfState(browser_version, browser_version != '') as browser_version,
sumState(toUInt64(click_is_dead)) as count_dead_clicks,
sumState(toUInt64(error_id != '00000000-0000-0000-0000-000000000000' OR fatal_id != '00000000-0000-0000-0000-000000000000')) as count_errors,
sumState(toUInt64(debug_id != '00000000-0000-0000-0000-000000000000' OR info_id != '00000000-0000-0000-0000-000000000000')) as count_infos,
sumState(toUInt64(click_is_rage)) as count_rage_clicks,
countState(toUInt64(segment_id)) as count_segments,
sumState(length(urls)) as count_urls,
sumState(toUInt64(warning_id != '00000000-0000-0000-0000-000000000000')) as count_warnings,
anyIfState(device_brand, device_brand != '') as device_brand,
anyIfState(device_family, device_family != '') as device_family,
anyIfState(device_model, device_model != '') as device_model,
anyIfState(device_name, device_name != '') as device_name,
anyIfState(dist, dist != '') as dist,
maxIfState(timestamp, segment_id IS NOT NULL) as finished_at,
anyIfState(environment, environment != '') as environment,
anyState(ip_address_v4) as ip_address_v4,
anyState(ip_address_v6) as ip_address_v6,
sumState(toUInt64(is_archived)) as is_archived,
anyIfState(os_name, os_name != '') as os_name,
anyIfState(os_version, os_version != '') as os_version,
anyIfState(platform, platform != '') as platform,
anyIfState(sdk_name, sdk_name != '') as sdk_name,
anyIfState(sdk_version, sdk_version != '') as sdk_version,
minState(replay_start_timestamp) as started_at,
anyIfState(user, user != '') as user,
anyIfState(user_id, user_id != '') as user_id,
anyIfState(user_name, user_name != '') as user_name,
anyIfState(user_email, user_email != '') as user_email,
minState(segment_id) as min_segment_id
FROM replays_local
GROUP BY project_id, toStartOfHour(timestamp), replay_id, retention_days
""",
target=operations.OperationTarget.LOCAL,
)


def backward_iter() -> Iterator[operations.SqlOperation]:
yield operations.DropTable(
storage_set=StorageSetKey.REPLAYS,
table_name="replays_aggregation_mv",
target=operations.OperationTarget.LOCAL,
)

yield operations.DropTable(
storage_set=StorageSetKey.REPLAYS,
table_name="replays_aggregated_local",
target=operations.OperationTarget.LOCAL,
)


def any_if_string(
column_name: str, nullable: bool = False, low_cardinality: bool = False
) -> Column[Modifiers]:
return Column(
column_name,
AggregateFunction(
"anyIf",
[
String(Modifiers(nullable=nullable, low_cardinality=low_cardinality)),
UInt(8, Modifiers(nullable=nullable)),
],
),
)


def any_if_nullable_string(
column_name: str, low_cardinality: bool = False
) -> Column[Modifiers]:
"""Returns an aggregate anyIf function."""
return any_if_string(column_name, nullable=True, low_cardinality=low_cardinality)


def sum(column_name: str) -> Column[Modifiers]:
"""Returns an aggregate sum function."""
return Column(column_name, AggregateFunction("sum", [UInt(64)]))


def count_nullable(column_name: str) -> Column[Modifiers]:
"""Returns an aggregate count function capable of accepting nullable integer values."""
return Column(
column_name, AggregateFunction("count", [UInt(64, Modifiers(nullable=True))])
)


columns: List[Column[Modifiers]] = [
# Primary-key.
Column("project_id", UInt(64)),
Column("to_hour_timestamp", DateTime()),
Column("replay_id", UUID()),
Column("retention_days", UInt(16)),
# Columns ordered by column-name.
any_if_nullable_string("browser_name"),
any_if_nullable_string("browser_version"),
sum("count_dead_clicks"),
sum("count_errors"),
sum("count_infos"),
sum("count_rage_clicks"),
count_nullable("count_segments"),
sum("count_urls"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cmanallen I see that some of these columns are materialized already, so with this materialized table, are we now going to be doing materialization twice? would we need the materialized columns still if we have this table? maybe I don't quite know exactly from the product standpoint if both are needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MeredithAnya We'll still need the materialized columns. Some queries aren't possible with the materialized view so we have a tiered system where performance degrades as queries get more complex.

sum("count_warnings"),
any_if_nullable_string("device_brand"),
any_if_nullable_string("device_family"),
any_if_nullable_string("device_model"),
any_if_nullable_string("device_name"),
any_if_nullable_string("dist"),
any_if_nullable_string("environment"),
Column("finished_at", AggregateFunction("maxIf", [DateTime(), UInt(8)])),
Column("ip_address_v4", AggregateFunction("any", [IPv4(Modifiers(nullable=True))])),
Column("ip_address_v6", AggregateFunction("any", [IPv6(Modifiers(nullable=True))])),
Column(
"is_archived", AggregateFunction("sum", [UInt(64, Modifiers(nullable=True))])
),
Column(
"min_segment_id", AggregateFunction("min", [UInt(16, Modifiers(nullable=True))])
),
any_if_nullable_string("os_name"),
any_if_nullable_string("os_version"),
any_if_string("platform", low_cardinality=False),
any_if_nullable_string("sdk_name"),
any_if_nullable_string("sdk_version"),
Column(
"started_at", AggregateFunction("min", [DateTime(Modifiers(nullable=True))])
),
any_if_nullable_string("user"),
any_if_nullable_string("user_id"),
any_if_nullable_string("user_name"),
any_if_nullable_string("user_email"),
]
Loading