Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rpc): Update tags list rpc #6301

Merged
merged 3 commits into from
Sep 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ sqlparse==0.4.2
google-api-python-client==2.88.0
sentry-usage-accountant==0.0.10
freezegun==1.2.2
sentry-protos==0.1.20
sentry-protos==0.1.21
58 changes: 24 additions & 34 deletions snuba/web/rpc/tags_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from typing import List, Optional

from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import (
TagsListRequest,
TagsListResponse,
TraceItemAttributesRequest,
TraceItemAttributesResponse,
)
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey

from snuba.clickhouse.formatter.nodes import FormattedQuery, StringNode
from snuba.datasets.schemas.tables import TableSource
Expand All @@ -15,15 +16,17 @@


def tags_list_query(
request: TagsListRequest, _timer: Optional[Timer] = None
) -> TagsListResponse:
str_storage = get_storage(StorageKey("spans_str_attrs"))
num_storage = get_storage(StorageKey("spans_num_attrs"))

str_data_source = str_storage.get_schema().get_data_source()
assert isinstance(str_data_source, TableSource)
num_data_source = num_storage.get_schema().get_data_source()
assert isinstance(num_data_source, TableSource)
request: TraceItemAttributesRequest, _timer: Optional[Timer] = None
) -> TraceItemAttributesResponse:
if request.type == AttributeKey.Type.TYPE_STRING:
storage = get_storage(StorageKey("spans_str_attrs"))
elif request.type == AttributeKey.Type.TYPE_FLOAT:
storage = get_storage(StorageKey("spans_num_attrs"))
else:
return TraceItemAttributesResponse(tags=[])

data_source = storage.get_schema().get_data_source()
assert isinstance(data_source, TableSource)

if request.limit > 1000:
raise BadSnubaRPCRequestException("Limit can be at most 1000")
Expand All @@ -35,39 +38,26 @@ def tags_list_query(
)

query = f"""
SELECT * FROM (
SELECT DISTINCT attr_key, 'str' as type, timestamp
FROM {str_data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})

UNION ALL

SELECT DISTINCT attr_key, 'num' as type, timestamp
FROM {num_data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
)
SELECT DISTINCT attr_key, timestamp
FROM {data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
ORDER BY attr_key
LIMIT {request.limit} OFFSET {request.offset}
"""

cluster = str_storage.get_cluster()
cluster = storage.get_cluster()
reader = cluster.get_reader()
result = reader.execute(FormattedQuery([StringNode(query)]))

tags: List[TagsListResponse.Tag] = []
tags: List[TraceItemAttributesResponse.Tag] = []
for row in result.get("data", []):
tags.append(
TagsListResponse.Tag(
TraceItemAttributesResponse.Tag(
name=row["attr_key"],
type={
"str": TagsListResponse.TYPE_STRING,
"num": TagsListResponse.TYPE_NUMBER,
}[row["type"]],
type=request.type,
)
)

return TagsListResponse(tags=tags)
return TraceItemAttributesResponse(tags=tags)
6 changes: 4 additions & 2 deletions snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
AggregateBucketRequest,
)
from sentry_protos.snuba.v1alpha.endpoint_span_samples_pb2 import SpanSamplesRequest
from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import TagsListRequest
from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import (
TraceItemAttributesRequest,
)
from werkzeug import Response as WerkzeugResponse
from werkzeug.exceptions import InternalServerError

Expand Down Expand Up @@ -288,7 +290,7 @@ def rpc(*, name: str, timer: Timer) -> Response:
] = {
"AggregateBucketRequest": (timeseries_query, AggregateBucketRequest),
"SpanSamplesRequest": (span_samples_query, SpanSamplesRequest),
"TagsListRequest": (tags_list_query, TagsListRequest),
"TraceItemAttributesRequest": (tags_list_query, TraceItemAttributesRequest),
}
try:
endpoint, req_class = rpcs[name]
Expand Down
85 changes: 65 additions & 20 deletions tests/web/rpc/test_tags_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import pytest
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import (
TagsListRequest,
TagsListResponse,
TraceItemAttributesRequest,
TraceItemAttributesResponse,
)
from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
Expand Down Expand Up @@ -63,11 +64,11 @@ def setup_teardown(clickhouse_db: None, redis_db: None) -> None:

@pytest.mark.clickhouse_db
@pytest.mark.redis_db
class TestTagsList(BaseApiTest):
class TestTraceItemAttributes(BaseApiTest):
def test_basic(self) -> None:
ts = Timestamp()
ts.GetCurrentTime()
message = TagsListRequest(
message = TraceItemAttributesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
Expand Down Expand Up @@ -98,12 +99,12 @@ def test_basic(self) -> None:
offset=20,
)
response = self.app.post(
"/rpc/TagsListRequest", data=message.SerializeToString()
"/rpc/TraceItemAttributesRequest", data=message.SerializeToString()
)
assert response.status_code == 200

def test_simple_case(self, setup_teardown: Any) -> None:
message = TagsListRequest(
def test_simple_case_str(self, setup_teardown: Any) -> None:
message = TraceItemAttributesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
Expand Down Expand Up @@ -132,17 +133,58 @@ def test_simple_case(self, setup_teardown: Any) -> None:
),
limit=10,
offset=0,
type=AttributeKey.Type.TYPE_STRING,
)
response = tags_list_query(message)
assert response.tags == [
TagsListResponse.Tag(
name=f"a_tag_{i:03}", type=TagsListResponse.TYPE_STRING
TraceItemAttributesResponse.Tag(
name=f"a_tag_{i:03}", type=AttributeKey.Type.TYPE_STRING
)
for i in range(0, 10)
]

def test_simple_case_float(self, setup_teardown: Any) -> None:
message = TraceItemAttributesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(
seconds=int(
datetime(
year=BASE_TIME.year,
month=BASE_TIME.month,
day=BASE_TIME.day - 1,
tzinfo=UTC,
).timestamp()
)
),
end_timestamp=Timestamp(
seconds=int(
datetime(
year=BASE_TIME.year,
month=BASE_TIME.month,
day=BASE_TIME.day + 1,
tzinfo=UTC,
).timestamp()
)
),
),
limit=10,
offset=0,
type=AttributeKey.Type.TYPE_FLOAT,
)
response = tags_list_query(message)
assert response.tags == [
TraceItemAttributesResponse.Tag(
name=f"b_measurement_{i:03}", type=AttributeKey.Type.TYPE_FLOAT
)
for i in range(0, 10)
]

def test_with_offset(self, setup_teardown: Any) -> None:
message = TagsListRequest(
message = TraceItemAttributesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
Expand Down Expand Up @@ -170,21 +212,24 @@ def test_with_offset(self, setup_teardown: Any) -> None:
),
),
limit=5,
offset=29,
offset=10,
type=AttributeKey.Type.TYPE_FLOAT,
)
response = tags_list_query(message)
assert response.tags == [
TagsListResponse.Tag(name="a_tag_029", type=TagsListResponse.TYPE_STRING),
TagsListResponse.Tag(
name="b_measurement_000", type=TagsListResponse.TYPE_NUMBER
TraceItemAttributesResponse.Tag(
name="b_measurement_010", type=AttributeKey.Type.TYPE_FLOAT
),
TraceItemAttributesResponse.Tag(
name="b_measurement_011", type=AttributeKey.Type.TYPE_FLOAT
),
TagsListResponse.Tag(
name="b_measurement_001", type=TagsListResponse.TYPE_NUMBER
TraceItemAttributesResponse.Tag(
name="b_measurement_012", type=AttributeKey.Type.TYPE_FLOAT
),
TagsListResponse.Tag(
name="b_measurement_002", type=TagsListResponse.TYPE_NUMBER
TraceItemAttributesResponse.Tag(
name="b_measurement_013", type=AttributeKey.Type.TYPE_FLOAT
),
TagsListResponse.Tag(
name="b_measurement_003", type=TagsListResponse.TYPE_NUMBER
TraceItemAttributesResponse.Tag(
name="b_measurement_014", type=AttributeKey.Type.TYPE_FLOAT
),
]
Loading