Skip to content

Commit

Permalink
feat(rpc): Update tags list rpc (#6301)
Browse files Browse the repository at this point in the history
sentry-protos 0.1.21 changes the definition a bit, updating the endpoint
to accept it.
  • Loading branch information
Zylphrex authored Sep 13, 2024
1 parent 08928b9 commit b43bf24
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 60 deletions.
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ sqlparse==0.4.2
google-api-python-client==2.88.0
sentry-usage-accountant==0.0.10
freezegun==1.2.2
sentry-protos==0.1.20
sentry-protos==0.1.21
2 changes: 1 addition & 1 deletion snuba/web/rpc/span_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def _build_query(request: SpanSamplesRequest) -> Query:
limit=request.limit,
)
treeify_or_and_conditions(res)
apply_virtual_columns(res, request.virtual_column_context)
apply_virtual_columns(res, request.virtual_column_contexts)
return res


Expand Down
58 changes: 24 additions & 34 deletions snuba/web/rpc/tags_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@
from typing import List, Optional

from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import (
TagsListRequest,
TagsListResponse,
TraceItemAttributesRequest,
TraceItemAttributesResponse,
)
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey

from snuba.clickhouse.formatter.nodes import FormattedQuery, StringNode
from snuba.datasets.schemas.tables import TableSource
Expand All @@ -15,15 +16,17 @@


def tags_list_query(
request: TagsListRequest, _timer: Optional[Timer] = None
) -> TagsListResponse:
str_storage = get_storage(StorageKey("spans_str_attrs"))
num_storage = get_storage(StorageKey("spans_num_attrs"))

str_data_source = str_storage.get_schema().get_data_source()
assert isinstance(str_data_source, TableSource)
num_data_source = num_storage.get_schema().get_data_source()
assert isinstance(num_data_source, TableSource)
request: TraceItemAttributesRequest, _timer: Optional[Timer] = None
) -> TraceItemAttributesResponse:
if request.type == AttributeKey.Type.TYPE_STRING:
storage = get_storage(StorageKey("spans_str_attrs"))
elif request.type == AttributeKey.Type.TYPE_FLOAT:
storage = get_storage(StorageKey("spans_num_attrs"))
else:
return TraceItemAttributesResponse(tags=[])

data_source = storage.get_schema().get_data_source()
assert isinstance(data_source, TableSource)

if request.limit > 1000:
raise BadSnubaRPCRequestException("Limit can be at most 1000")
Expand All @@ -35,39 +38,26 @@ def tags_list_query(
)

query = f"""
SELECT * FROM (
SELECT DISTINCT attr_key, 'str' as type, timestamp
FROM {str_data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
UNION ALL
SELECT DISTINCT attr_key, 'num' as type, timestamp
FROM {num_data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
)
SELECT DISTINCT attr_key, timestamp
FROM {data_source.get_table_name()}
WHERE organization_id={request.meta.organization_id}
AND project_id IN ({', '.join(str(pid) for pid in request.meta.project_ids)})
AND timestamp BETWEEN fromUnixTimestamp({request.meta.start_timestamp.seconds}) AND fromUnixTimestamp({request.meta.end_timestamp.seconds})
ORDER BY attr_key
LIMIT {request.limit} OFFSET {request.offset}
"""

cluster = str_storage.get_cluster()
cluster = storage.get_cluster()
reader = cluster.get_reader()
result = reader.execute(FormattedQuery([StringNode(query)]))

tags: List[TagsListResponse.Tag] = []
tags: List[TraceItemAttributesResponse.Tag] = []
for row in result.get("data", []):
tags.append(
TagsListResponse.Tag(
TraceItemAttributesResponse.Tag(
name=row["attr_key"],
type={
"str": TagsListResponse.TYPE_STRING,
"num": TagsListResponse.TYPE_NUMBER,
}[row["type"]],
type=request.type,
)
)

return TagsListResponse(tags=tags)
return TraceItemAttributesResponse(tags=tags)
6 changes: 4 additions & 2 deletions snuba/web/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@
AggregateBucketRequest,
)
from sentry_protos.snuba.v1alpha.endpoint_span_samples_pb2 import SpanSamplesRequest
from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import TagsListRequest
from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import (
TraceItemAttributesRequest,
)
from werkzeug import Response as WerkzeugResponse
from werkzeug.exceptions import InternalServerError

Expand Down Expand Up @@ -288,7 +290,7 @@ def rpc(*, name: str, timer: Timer) -> Response:
] = {
"AggregateBucketRequest": (timeseries_query, AggregateBucketRequest),
"SpanSamplesRequest": (span_samples_query, SpanSamplesRequest),
"TagsListRequest": (tags_list_query, TagsListRequest),
"TraceItemAttributesRequest": (tags_list_query, TraceItemAttributesRequest),
}
try:
endpoint, req_class = rpcs[name]
Expand Down
4 changes: 2 additions & 2 deletions tests/web/rpc/test_span_samples.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ def test_with_virtual_columns(self, setup_teardown: Any) -> None:
)
],
limit=61,
virtual_column_context=[
virtual_column_contexts=[
VirtualColumnContext(
from_column_name="project_id",
to_column_name="project_name",
Expand Down Expand Up @@ -317,7 +317,7 @@ def test_order_by_virtual_columns(self, setup_teardown: Any) -> None:
)
],
limit=61,
virtual_column_context=[
virtual_column_contexts=[
VirtualColumnContext(
from_column_name="color",
to_column_name="special_color",
Expand Down
85 changes: 65 additions & 20 deletions tests/web/rpc/test_tags_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import pytest
from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1alpha.endpoint_tags_list_pb2 import (
TagsListRequest,
TagsListResponse,
TraceItemAttributesRequest,
TraceItemAttributesResponse,
)
from sentry_protos.snuba.v1alpha.request_common_pb2 import RequestMeta
from sentry_protos.snuba.v1alpha.trace_item_attribute_pb2 import AttributeKey

from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
Expand Down Expand Up @@ -63,11 +64,11 @@ def setup_teardown(clickhouse_db: None, redis_db: None) -> None:

@pytest.mark.clickhouse_db
@pytest.mark.redis_db
class TestTagsList(BaseApiTest):
class TestTraceItemAttributes(BaseApiTest):
def test_basic(self) -> None:
ts = Timestamp()
ts.GetCurrentTime()
message = TagsListRequest(
message = TraceItemAttributesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
Expand Down Expand Up @@ -98,12 +99,12 @@ def test_basic(self) -> None:
offset=20,
)
response = self.app.post(
"/rpc/TagsListRequest", data=message.SerializeToString()
"/rpc/TraceItemAttributesRequest", data=message.SerializeToString()
)
assert response.status_code == 200

def test_simple_case(self, setup_teardown: Any) -> None:
message = TagsListRequest(
def test_simple_case_str(self, setup_teardown: Any) -> None:
message = TraceItemAttributesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
Expand Down Expand Up @@ -132,17 +133,58 @@ def test_simple_case(self, setup_teardown: Any) -> None:
),
limit=10,
offset=0,
type=AttributeKey.Type.TYPE_STRING,
)
response = tags_list_query(message)
assert response.tags == [
TagsListResponse.Tag(
name=f"a_tag_{i:03}", type=TagsListResponse.TYPE_STRING
TraceItemAttributesResponse.Tag(
name=f"a_tag_{i:03}", type=AttributeKey.Type.TYPE_STRING
)
for i in range(0, 10)
]

def test_simple_case_float(self, setup_teardown: Any) -> None:
message = TraceItemAttributesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(
seconds=int(
datetime(
year=BASE_TIME.year,
month=BASE_TIME.month,
day=BASE_TIME.day - 1,
tzinfo=UTC,
).timestamp()
)
),
end_timestamp=Timestamp(
seconds=int(
datetime(
year=BASE_TIME.year,
month=BASE_TIME.month,
day=BASE_TIME.day + 1,
tzinfo=UTC,
).timestamp()
)
),
),
limit=10,
offset=0,
type=AttributeKey.Type.TYPE_FLOAT,
)
response = tags_list_query(message)
assert response.tags == [
TraceItemAttributesResponse.Tag(
name=f"b_measurement_{i:03}", type=AttributeKey.Type.TYPE_FLOAT
)
for i in range(0, 10)
]

def test_with_offset(self, setup_teardown: Any) -> None:
message = TagsListRequest(
message = TraceItemAttributesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
Expand Down Expand Up @@ -170,21 +212,24 @@ def test_with_offset(self, setup_teardown: Any) -> None:
),
),
limit=5,
offset=29,
offset=10,
type=AttributeKey.Type.TYPE_FLOAT,
)
response = tags_list_query(message)
assert response.tags == [
TagsListResponse.Tag(name="a_tag_029", type=TagsListResponse.TYPE_STRING),
TagsListResponse.Tag(
name="b_measurement_000", type=TagsListResponse.TYPE_NUMBER
TraceItemAttributesResponse.Tag(
name="b_measurement_010", type=AttributeKey.Type.TYPE_FLOAT
),
TraceItemAttributesResponse.Tag(
name="b_measurement_011", type=AttributeKey.Type.TYPE_FLOAT
),
TagsListResponse.Tag(
name="b_measurement_001", type=TagsListResponse.TYPE_NUMBER
TraceItemAttributesResponse.Tag(
name="b_measurement_012", type=AttributeKey.Type.TYPE_FLOAT
),
TagsListResponse.Tag(
name="b_measurement_002", type=TagsListResponse.TYPE_NUMBER
TraceItemAttributesResponse.Tag(
name="b_measurement_013", type=AttributeKey.Type.TYPE_FLOAT
),
TagsListResponse.Tag(
name="b_measurement_003", type=TagsListResponse.TYPE_NUMBER
TraceItemAttributesResponse.Tag(
name="b_measurement_014", type=AttributeKey.Type.TYPE_FLOAT
),
]

0 comments on commit b43bf24

Please sign in to comment.