Skip to content

Commit

Permalink
Merge branch 'master' into vlad/uniq_in_select_and_having
Browse files Browse the repository at this point in the history
  • Loading branch information
vladkluev authored Jan 12, 2022
2 parents d8abc89 + 6f92898 commit 17c51f5
Show file tree
Hide file tree
Showing 16 changed files with 663 additions and 77 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ jobs:
# Run pre-commit to lint and format check files that were changed (but not deleted) compared to master.
# XXX: there is a very small chance that it'll expand to exceed Linux's limits
# `getconf ARG_MAX` - max # bytes of args + environ for exec()
# we skip the `no-commit-to-branch` because in CI we are in fact on master already
# and we have merged to it
run: |
pre-commit run --files $(git diff --diff-filter=d --name-only master)
SKIP=no-commit-to-branch pre-commit run --files $(git diff --diff-filter=d --name-only master)
typing:
name: "mypy typing"
Expand Down
93 changes: 93 additions & 0 deletions snuba/admin/clickhouse/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import MutableMapping

from snuba import settings
from snuba.clickhouse.native import ClickhousePool
from snuba.clusters.cluster import ClickhouseClientSettings, ClickhouseCluster
from snuba.datasets.storages import StorageKey
from snuba.datasets.storages.factory import get_storage
from snuba.utils.serializable_exception import SerializableException


class InvalidNodeError(SerializableException):
pass


class InvalidCustomQuery(SerializableException):
pass


class InvalidStorageError(SerializableException):
pass


def is_valid_node(host: str, port: int, cluster: ClickhouseCluster) -> bool:
nodes = cluster.get_local_nodes()
return any(node.host_name == host and node.port == port for node in nodes)


NODE_CONNECTIONS: MutableMapping[str, ClickhousePool] = {}


def get_ro_node_connection(
clickhouse_host: str, clickhouse_port: int, storage_name: str
) -> ClickhousePool:
storage_key = None
try:
storage_key = StorageKey(storage_name)
except ValueError:
raise InvalidStorageError(
f"storage {storage_name} is not a valid storage name",
extra_data={"storage_name": storage_name},
)

key = f"{storage_key}-{clickhouse_host}"
if key in NODE_CONNECTIONS:
return NODE_CONNECTIONS[key]

storage = get_storage(storage_key)
cluster = storage.get_cluster()

if not is_valid_node(clickhouse_host, clickhouse_port, cluster):
raise InvalidNodeError(
f"host {clickhouse_host} and port {clickhouse_port} are not valid",
extra_data={"host": clickhouse_host, "port": clickhouse_port},
)

database = cluster.get_database()
connection = ClickhousePool(
clickhouse_host,
clickhouse_port,
settings.CLICKHOUSE_READONLY_USER,
settings.CLICKHOUSE_READONLY_PASSWORD,
database,
max_pool_size=2,
# force read-only
client_settings=ClickhouseClientSettings.QUERY.value.settings,
)
NODE_CONNECTIONS[key] = connection
return connection


CLUSTER_CONNECTIONS: MutableMapping[StorageKey, ClickhousePool] = {}


def get_ro_cluster_connection(storage_name: str) -> ClickhousePool:

storage_key = None
try:
storage_key = StorageKey(storage_name)
except ValueError:
raise InvalidStorageError(
f"storage {storage_name} is not a valid storage name",
extra_data={"storage_name": storage_name},
)

if storage_key in CLUSTER_CONNECTIONS:
return CLUSTER_CONNECTIONS[storage_key]

storage = get_storage(storage_key)
cluster = storage.get_cluster()
connection = cluster.get_query_connection(ClickhouseClientSettings.QUERY)

CLUSTER_CONNECTIONS[storage_key] = connection
return connection
51 changes: 3 additions & 48 deletions snuba/admin/clickhouse/system_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,20 @@

from clickhouse_driver.errors import ErrorCodes

from snuba import settings
from snuba.admin.clickhouse.common import InvalidCustomQuery, get_ro_node_connection
from snuba.clickhouse.errors import ClickhouseError
from snuba.clickhouse.native import ClickhousePool, ClickhouseResult
from snuba.clusters.cluster import ClickhouseClientSettings, ClickhouseCluster
from snuba.datasets.storages import StorageKey
from snuba.datasets.storages.factory import get_storage
from snuba.clickhouse.native import ClickhouseResult
from snuba.utils.serializable_exception import SerializableException


class NonExistentSystemQuery(SerializableException):
pass


class InvalidNodeError(SerializableException):
pass


class InvalidStorageError(SerializableException):
pass


class InvalidResultError(SerializableException):
pass


class InvalidCustomQuery(SerializableException):
pass


class _QueryRegistry:
"""Keep a mapping of SystemQueries to their names"""

Expand Down Expand Up @@ -101,44 +86,14 @@ class ActivePartitions(SystemQuery):
"""


def _is_valid_node(host: str, port: int, cluster: ClickhouseCluster) -> bool:
nodes = cluster.get_local_nodes()
return any(node.host_name == host and node.port == port for node in nodes)


def _run_sql_query_on_host(
clickhouse_host: str, clickhouse_port: int, storage_name: str, sql: str
) -> ClickhouseResult:
"""
Run the SQL query. It should be validated before getting to this point
"""
storage_key = None
try:
storage_key = StorageKey(storage_name)
except ValueError:
raise InvalidStorageError(extra_data={"storage_name": storage_name})

storage = get_storage(storage_key)
cluster = storage.get_cluster()

if not _is_valid_node(clickhouse_host, clickhouse_port, cluster):
raise InvalidNodeError(
extra_data={"host": clickhouse_host, "port": clickhouse_port}
)

database = cluster.get_database()

connection = ClickhousePool(
clickhouse_host,
clickhouse_port,
settings.CLICKHOUSE_READONLY_USER,
settings.CLICKHOUSE_READONLY_PASSWORD,
database,
# force read-only
client_settings=ClickhouseClientSettings.QUERY.value.settings,
)
connection = get_ro_node_connection(clickhouse_host, clickhouse_port, storage_name)
query_result = connection.execute(query=sql, with_column_types=True)
connection.close()

return query_result

Expand Down
27 changes: 27 additions & 0 deletions snuba/admin/clickhouse/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from snuba.admin.clickhouse.common import InvalidCustomQuery, get_ro_cluster_connection
from snuba.clickhouse.native import ClickhouseResult


def validate_trace_query(sql_query: str) -> None:
"""
Simple validation to ensure query only attempts read queries.
Raises InvalidCustomQuery if query is invalid or not allowed.
"""
sql_query = " ".join(sql_query.split())
lowered = sql_query.lower().strip()

if not lowered.startswith("select"):
raise InvalidCustomQuery("Only SELECT queries are allowed")

disallowed_keywords = ["insert", ";"]
for kw in disallowed_keywords:
if kw in lowered:
raise InvalidCustomQuery(f"{kw} is not allowed in the query")


def run_query_and_get_trace(storage_name: str, query: str) -> ClickhouseResult:
validate_trace_query(query)
connection = get_ro_cluster_connection(storage_name)
query_result = connection.execute(query=query, capture_trace=True)
return query_result
65 changes: 61 additions & 4 deletions snuba/admin/views.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from __future__ import annotations

import logging
from typing import Any, List, MutableMapping, Optional, cast

Expand All @@ -6,17 +8,20 @@

from snuba import state
from snuba.admin.auth import UnauthorizedException, authorize_request
from snuba.admin.clickhouse.nodes import get_storage_info
from snuba.admin.clickhouse.system_queries import (
from snuba.admin.clickhouse.common import (
InvalidCustomQuery,
InvalidNodeError,
InvalidResultError,
InvalidStorageError,
)
from snuba.admin.clickhouse.nodes import get_storage_info
from snuba.admin.clickhouse.system_queries import (
InvalidResultError,
NonExistentSystemQuery,
SystemQuery,
run_system_query_on_host_by_name,
run_system_query_on_host_with_sql,
)
from snuba.admin.clickhouse.tracing import run_query_and_get_trace
from snuba.admin.notifications.base import RuntimeConfigAction, RuntimeConfigAutoClient
from snuba.admin.runtime_config import (
ConfigChange,
Expand Down Expand Up @@ -74,7 +79,6 @@ def clickhouse_queries() -> Response:
@application.route("/run_clickhouse_system_query", methods=["POST"])
def clickhouse_system_query() -> Response:
req = request.get_json()

try:
host = req["host"]
port = req["port"]
Expand Down Expand Up @@ -142,6 +146,59 @@ def clickhouse_system_query() -> Response:
return make_response(jsonify({"error": "Something went wrong"}), 400)


# Sample cURL command:
#
# curl -X POST \
# -H 'Content-Type: application/json' \
# http://localhost:1219/clickhouse_trace_query?query=SELECT+count()+FROM+errors_local
@application.route("/clickhouse_trace_query", methods=["POST"])
def clickhouse_trace_query() -> Response:
req = json.loads(request.data)
try:
storage = req["storage"]
raw_sql = req["sql"]
except KeyError as e:
return make_response(
jsonify(
{
"error": {
"type": "request",
"message": f"Invalid request, missing key {e.args[0]}",
}
}
),
400,
)

try:
result = run_query_and_get_trace(storage, raw_sql)
trace_output = result.trace_output
return make_response(jsonify({"trace_output": trace_output}), 200)
except InvalidCustomQuery as err:
return make_response(
jsonify(
{
"error": {
"type": "validation",
"message": err.message or "Invalid query",
}
}
),
400,
)
except ClickhouseError as err:
details = {
"type": "clickhouse",
"message": str(err),
"code": err.code,
}
return make_response(jsonify({"error": details}), 400)
except Exception as err:
return make_response(
jsonify({"error": {"type": "unknown", "message": str(err)}}), 500,
)


@application.route("/configs", methods=["GET", "POST"])
def configs() -> Response:
if request.method == "POST":
Expand Down
14 changes: 8 additions & 6 deletions snuba/clickhouse/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,13 @@ def execute(
conn = self._create_conn()

try:
if capture_trace:
settings = (
{**settings, "send_logs_level": "trace"}
if settings
else {"send_logs_level": "trace"}
)

query_execute = partial(
conn.execute,
query,
Expand All @@ -134,12 +141,7 @@ def execute(
trace_output = ""
if capture_trace:
with capture_logging() as buffer:
if settings:
settings = {**settings, "send_logs_level": "trace"}
else:
settings = {"send_logs_level": "trace"}
result_data = query_execute()
# In order to avoid exposing PII the results are discarded
query_execute() # In order to avoid exposing PII the results are discarded
result_data = [[], []] if with_column_types else []
trace_output = buffer.getvalue()
else:
Expand Down
Loading

0 comments on commit 17c51f5

Please sign in to comment.