Skip to content

Commit

Permalink
ref(attribution): Split RequestSettings into QuerySetings and attribu…
Browse files Browse the repository at this point in the history
…tion (#2808)

This PR is step one of an effort to decouple attribution of queries from the settings passed to the query pipeline.

* decouple attribution no rename (#2807)

This commit contains the logic changes

* attribution: mass rename of files
  • Loading branch information
volokluev authored Jun 15, 2022
1 parent facd39c commit fb036da
Show file tree
Hide file tree
Showing 126 changed files with 700 additions and 640 deletions.
18 changes: 18 additions & 0 deletions snuba/attribution/attribution_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

from dataclasses import dataclass

from snuba.attribution import AppID


@dataclass(frozen=True)
class AttributionInfo:
"""The settings for a attribution of a query + quota enforcement
should be immutable
"""

app_id: AppID
referrer: str
team: str | None
feature: str | None
parent_api: str | None
6 changes: 3 additions & 3 deletions snuba/clickhouse/processors.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from snuba.clickhouse.query import Query
from snuba.query.composite import CompositeQuery
from snuba.query.data_source.simple import Table
from snuba.request.request_settings import RequestSettings
from snuba.query.query_settings import QuerySettings


class QueryProcessor(ABC):
Expand All @@ -22,7 +22,7 @@ class QueryProcessor(ABC):
"""

@abstractmethod
def process_query(self, query: Query, request_settings: RequestSettings) -> None:
def process_query(self, query: Query, query_settings: QuerySettings) -> None:
# TODO: Consider making the Query immutable.
raise NotImplementedError

Expand All @@ -40,6 +40,6 @@ class CompositeQueryProcessor(ABC):

@abstractmethod
def process_query(
self, query: CompositeQuery[Table], request_settings: RequestSettings
self, query: CompositeQuery[Table], query_settings: QuerySettings
) -> None:
raise NotImplementedError
4 changes: 3 additions & 1 deletion snuba/datasets/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,6 @@ def build_execution_pipeline(
request, runner
)
else:
return CompositeExecutionPipeline(request.query, request.settings, runner)
return CompositeExecutionPipeline(
request.query, request.query_settings, runner
)
4 changes: 2 additions & 2 deletions snuba/datasets/entities/clickhouse_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@

from snuba import environment, settings
from snuba.query.logical import Query
from snuba.query.query_settings import QuerySettings
from snuba.reader import Row
from snuba.request.request_settings import RequestSettings
from snuba.state import get_config
from snuba.utils.metrics.wrapper import MetricsWrapper
from snuba.utils.threaded_function_delegator import Result
Expand Down Expand Up @@ -119,7 +119,7 @@ def choose(self, referrer: str) -> Choice:

def comparison_callback(
_query: Query,
_settings: RequestSettings,
_settings: QuerySettings,
referrer: str,
primary_result: Optional[Result[QueryResult]],
results: List[Result[QueryResult]],
Expand Down
10 changes: 5 additions & 5 deletions snuba/datasets/entities/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@
from snuba.query.processors.quota_processor import ResourceQuotaProcessor
from snuba.query.processors.tags_expander import TagsExpanderProcessor
from snuba.query.processors.timeseries_processor import TimeSeriesProcessor
from snuba.query.query_settings import QuerySettings
from snuba.query.validation.validators import EntityRequiredColumnValidator
from snuba.request.request_settings import RequestSettings

event_translator = TranslationMappers(
columns=[
Expand Down Expand Up @@ -103,11 +103,11 @@ def __init__(self, mappers: TranslationMappers) -> None:
self.__mappers = mappers

def select_storage(
self, query: Query, request_settings: RequestSettings
self, query: Query, query_settings: QuerySettings
) -> StorageAndMappers:
use_readonly_storage = (
state.get_config("enable_events_readonly_table", False)
and not request_settings.get_consistent()
and not query_settings.get_consistent()
)

storage = (
Expand All @@ -123,9 +123,9 @@ def __init__(self, mappers: TranslationMappers) -> None:
self.__mappers = mappers

def select_storage(
self, query: Query, request_settings: RequestSettings
self, query: Query, query_settings: QuerySettings
) -> StorageAndMappers:
use_readonly_storage = not request_settings.get_consistent()
use_readonly_storage = not query_settings.get_consistent()

storage = (
self.__errors_ro_table if use_readonly_storage else self.__errors_table
Expand Down
4 changes: 2 additions & 2 deletions snuba/datasets/entities/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@
)
from snuba.query.processors.quota_processor import ResourceQuotaProcessor
from snuba.query.processors.timeseries_processor import TimeSeriesProcessor
from snuba.query.query_settings import QuerySettings
from snuba.query.validation.validators import (
EntityRequiredColumnValidator,
GranularityValidator,
QueryValidator,
)
from snuba.request.request_settings import RequestSettings


class TagsTypeTransformer(QueryProcessor):
def process_query(self, query: Query, request_settings: RequestSettings) -> None:
def process_query(self, query: Query, query_settings: QuerySettings) -> None:
def transform_expression(exp: Expression) -> Expression:
if not isinstance(exp, SubscriptableReference):
return exp
Expand Down
8 changes: 4 additions & 4 deletions snuba/datasets/entities/sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
TimeSeriesProcessor,
extract_granularity_from_query,
)
from snuba.query.query_settings import QuerySettings, SubscriptionQuerySettings
from snuba.query.validation.validators import (
ColumnValidationMode,
EntityRequiredColumnValidator,
)
from snuba.request.request_settings import RequestSettings, SubscriptionRequestSettings
from snuba.utils.metrics.wrapper import MetricsWrapper

metrics = MetricsWrapper(environment.metrics, "api.sessions")
Expand Down Expand Up @@ -221,18 +221,18 @@ def __init__(self) -> None:
self.raw_storage = get_storage(StorageKey.SESSIONS_RAW)

def select_storage(
self, query: Query, request_settings: RequestSettings
self, query: Query, query_settings: QuerySettings
) -> StorageAndMappers:

# If the passed in `request_settings` arg is an instance of `SubscriptionRequestSettings`,
# If the passed in `query_settings` arg is an instance of `SubscriptionQuerySettings`,
# then it is a crash rate alert subscription, and hence we decide on whether to use the
# materialized storage or the raw storage by examining the time_window.
# If the `time_window` <=1h, then select the raw storage otherwise select materialized
# storage
# NOTE: If we were to support other types of subscriptions over the sessions dataset that
# do not follow this method used to identify which storage to use, we would need to
# find a different way to distinguish them.
if isinstance(request_settings, SubscriptionRequestSettings):
if isinstance(query_settings, SubscriptionQuerySettings):
from_date, to_date = get_time_range(query, "started")
if from_date and to_date:
use_materialized_storage = to_date - from_date > timedelta(hours=1)
Expand Down
6 changes: 3 additions & 3 deletions snuba/datasets/entities/transactions.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
from snuba.query.processors.quota_processor import ResourceQuotaProcessor
from snuba.query.processors.tags_expander import TagsExpanderProcessor
from snuba.query.processors.timeseries_processor import TimeSeriesProcessor
from snuba.query.query_settings import QuerySettings
from snuba.query.validation.validators import EntityRequiredColumnValidator
from snuba.request.request_settings import RequestSettings

transaction_translator = TranslationMappers(
columns=[
Expand Down Expand Up @@ -110,10 +110,10 @@ def __init__(self, mappers: TranslationMappers) -> None:
self.__mappers = mappers

def select_storage(
self, query: Query, request_settings: RequestSettings
self, query: Query, query_settings: QuerySettings
) -> StorageAndMappers:
readonly_referrer = (
request_settings.referrer
query_settings.referrer
in settings.TRANSACTIONS_DIRECT_TO_READONLY_REFERRERS
)
use_readonly_storage = readonly_referrer or state.get_config(
Expand Down
12 changes: 6 additions & 6 deletions snuba/datasets/plans/query_plan.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
from snuba.query.composite import CompositeQuery
from snuba.query.data_source.simple import Table
from snuba.query.logical import Query as LogicalQuery
from snuba.query.query_settings import QuerySettings
from snuba.reader import Reader
from snuba.request.request_settings import RequestSettings
from snuba.web import QueryResult

QueryRunner = Callable[
[Union[Query, CompositeQuery[Table]], RequestSettings, Reader], QueryResult
[Union[Query, CompositeQuery[Table]], QuerySettings, Reader], QueryResult
]

TQuery = TypeVar("TQuery", bound=AbstractQuery)
Expand Down Expand Up @@ -160,7 +160,7 @@ class QueryPlanExecutionStrategy(ABC, Generic[TQuery]):
def execute(
self,
query: TQuery,
request_settings: RequestSettings,
query_settings: QuerySettings,
runner: QueryRunner,
) -> QueryResult:
"""
Expand All @@ -182,7 +182,7 @@ class ClickhouseQueryPlanBuilder(ABC):

@abstractmethod
def build_and_rank_plans(
self, query: LogicalQuery, request_settings: RequestSettings
self, query: LogicalQuery, query_settings: QuerySettings
) -> Sequence[ClickhouseQueryPlan]:
"""
Returns all the valid plans for this query sorted in ranking
Expand All @@ -191,8 +191,8 @@ def build_and_rank_plans(
raise NotImplementedError

def build_best_plan(
self, query: LogicalQuery, request_settings: RequestSettings
self, query: LogicalQuery, query_settings: QuerySettings
) -> ClickhouseQueryPlan:
plans = self.build_and_rank_plans(query, request_settings)
plans = self.build_and_rank_plans(query, query_settings)
assert plans, "Query planner did not produce a plan"
return plans[0]
18 changes: 9 additions & 9 deletions snuba/datasets/plans/single_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from snuba.query.logical import Query as LogicalQuery
from snuba.query.processors.conditions_enforcer import MandatoryConditionEnforcer
from snuba.query.processors.mandatory_condition_applier import MandatoryConditionApplier
from snuba.request.request_settings import RequestSettings
from snuba.query.query_settings import QuerySettings

# TODO: Importing snuba.web here is just wrong. What's need to be done to avoid this
# dependency is a refactoring of the methods that return RawQueryResult to make them
Expand All @@ -47,18 +47,18 @@ def __init__(
def execute(
self,
query: Query,
request_settings: RequestSettings,
query_settings: QuerySettings,
runner: QueryRunner,
) -> QueryResult:
def process_and_run_query(
query: Query, request_settings: RequestSettings
query: Query, query_settings: QuerySettings
) -> QueryResult:
for processor in self.__query_processors:
with sentry_sdk.start_span(
description=type(processor).__name__, op="processor"
):
processor.process_query(query, request_settings)
return runner(query, request_settings, self.__cluster.get_reader())
processor.process_query(query, query_settings)
return runner(query, query_settings, self.__cluster.get_reader())

use_split = state.get_config("use_split", 1)
if use_split:
Expand All @@ -67,12 +67,12 @@ def process_and_run_query(
description=type(splitter).__name__, op="splitter"
):
result = splitter.execute(
query, request_settings, process_and_run_query
query, query_settings, process_and_run_query
)
if result is not None:
return result

return process_and_run_query(query, request_settings)
return process_and_run_query(query, query_settings)


def get_query_data_source(
Expand Down Expand Up @@ -117,7 +117,7 @@ def __init__(

@with_span()
def build_and_rank_plans(
self, query: LogicalQuery, settings: RequestSettings
self, query: LogicalQuery, settings: QuerySettings
) -> Sequence[ClickhouseQueryPlan]:
with sentry_sdk.start_span(
op="build_plan.single_storage", description="translate"
Expand Down Expand Up @@ -178,7 +178,7 @@ def __init__(

@with_span()
def build_and_rank_plans(
self, query: LogicalQuery, settings: RequestSettings
self, query: LogicalQuery, settings: QuerySettings
) -> Sequence[ClickhouseQueryPlan]:
with sentry_sdk.start_span(
op="build_plan.selected_storage", description="select_storage"
Expand Down
8 changes: 4 additions & 4 deletions snuba/datasets/plans/split_strategy.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@
from typing import Callable, Optional

from snuba.clickhouse.query import Query
from snuba.request.request_settings import RequestSettings
from snuba.query.query_settings import QuerySettings
from snuba.web import QueryResult

SplitQueryRunner = Callable[[Query, RequestSettings], QueryResult]
SplitQueryRunner = Callable[[Query, QuerySettings], QueryResult]


class QuerySplitStrategy(ABC):
"""
Implements a query split algorithm. It works in a similar way as a
QueryExecutionStrategy, it takes a query, request settings and a query runner
QueryExecutionStrategy, it takes a query, request.query_settings and a query runner
and decides if it should split the query into more efficient parts.
If it can split the query, it uses the SplitQueryRunner to execute every chunk,
otherwise it returns None immediately.
Expand All @@ -31,7 +31,7 @@ class QuerySplitStrategy(ABC):
def execute(
self,
query: Query,
request_settings: RequestSettings,
query_settings: QuerySettings,
runner: SplitQueryRunner,
) -> Optional[QueryResult]:
"""
Expand Down
4 changes: 2 additions & 2 deletions snuba/datasets/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
from snuba.datasets.table_storage import KafkaStreamLoader, TableWriter
from snuba.query.expressions import Expression
from snuba.query.logical import Query
from snuba.query.query_settings import QuerySettings
from snuba.replacers.replacer_processor import ReplacerProcessor
from snuba.request.request_settings import RequestSettings


class Storage(ABC):
Expand Down Expand Up @@ -211,6 +211,6 @@ class QueryStorageSelector(ABC):

@abstractmethod
def select_storage(
self, query: Query, request_settings: RequestSettings
self, query: Query, query_settings: QuerySettings
) -> StorageAndMappers:
raise NotImplementedError
6 changes: 3 additions & 3 deletions snuba/datasets/storages/events_bool_contexts.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from snuba.query.expressions import FunctionCall as FunctionCallExpr
from snuba.query.expressions import Literal as LiteralExpr
from snuba.query.matchers import Column, FunctionCall, Literal, Or, String
from snuba.request.request_settings import RequestSettings
from snuba.query.query_settings import QuerySettings


class EventsPromotedBooleanContextsProcessor(QueryProcessor):
Expand All @@ -30,7 +30,7 @@ class EventsPromotedBooleanContextsProcessor(QueryProcessor):
patch to the events storage for as long as it exists.
"""

def process_query(self, query: Query, request_settings: RequestSettings) -> None:
def process_query(self, query: Query, query_settings: QuerySettings) -> None:
# We care only of promoted contexts, so we do not need to match
# the original nested expression.
matcher = FunctionCall(
Expand Down Expand Up @@ -80,7 +80,7 @@ class EventsBooleanContextsProcessor(QueryProcessor):
from the errors and events storages.
"""

def process_query(self, query: Query, request_settings: RequestSettings) -> None:
def process_query(self, query: Query, query_settings: QuerySettings) -> None:
matcher = FunctionCall(
String("arrayElement"),
(
Expand Down
4 changes: 2 additions & 2 deletions snuba/datasets/storages/group_id_column_processor.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from snuba.clickhouse.processors import QueryProcessor
from snuba.clickhouse.query import Query
from snuba.query.expressions import Column, Expression, FunctionCall, Literal
from snuba.request.request_settings import RequestSettings
from snuba.query.query_settings import QuerySettings


class GroupIdColumnProcessor(QueryProcessor):
def process_query(self, query: Query, request_settings: RequestSettings) -> None:
def process_query(self, query: Query, query_settings: QuerySettings) -> None:
def process_column(exp: Expression) -> Expression:
if isinstance(exp, Column):
if exp.column_name == "group_id":
Expand Down
4 changes: 2 additions & 2 deletions snuba/datasets/storages/processors/consistency_enforcer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from snuba.clickhouse.processors import QueryProcessor
from snuba.clickhouse.query import Query
from snuba.request.request_settings import RequestSettings
from snuba.query.query_settings import QuerySettings


class ConsistencyEnforcerProcessor(QueryProcessor):
Expand All @@ -14,5 +14,5 @@ class ConsistencyEnforcerProcessor(QueryProcessor):
like the CDC tables.
"""

def process_query(self, query: Query, request_settings: RequestSettings) -> None:
def process_query(self, query: Query, query_settings: QuerySettings) -> None:
query.set_from_clause(replace(query.get_from_clause(), final=True))
Loading

0 comments on commit fb036da

Please sign in to comment.