Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(replays): Add additional parsing safety to integer and datetime column types #3352

Merged
merged 8 commits into from
Nov 10, 2022
78 changes: 54 additions & 24 deletions snuba/datasets/processors/replays_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import dataclasses
import logging
import uuid
from datetime import datetime
from datetime import datetime, timezone
from hashlib import md5
from typing import Any, Callable, Mapping, MutableMapping, Optional, TypeVar

Expand All @@ -16,7 +16,8 @@
from snuba.processor import (
InsertBatch,
ProcessedMessage,
_ensure_valid_date,
_collapse_uint16,
_collapse_uint32,
_ensure_valid_ip,
)
from snuba.util import force_bytes
Expand All @@ -36,22 +37,6 @@


class ReplaysProcessor(DatasetMessageProcessor):
def __extract_timestamp(self, field: int) -> datetime:
timestamp = _ensure_valid_date(datetime.utcfromtimestamp(field))
if timestamp is None:
timestamp = datetime.utcnow()
return timestamp

def __extract_started_at(self, started_at: Optional[int]) -> Optional[datetime]:
if started_at is None:
return None

timestamp = _ensure_valid_date(datetime.utcfromtimestamp(started_at))
if timestamp:
return timestamp
else:
raise TypeError("Missing data for replay_start_timestamp column.")

def __extract_urls(self, replay_event: ReplayEventDict) -> list[str]:
if "url" in replay_event:
# Backwards compat for non-public, pre-alpha javascript SDK.
Expand Down Expand Up @@ -100,14 +85,16 @@ def _process_base_replay_event_values(
self, processed: MutableMapping[str, Any], replay_event: ReplayEventDict
) -> None:
processed["replay_id"] = str(uuid.UUID(replay_event["replay_id"]))
processed["segment_id"] = replay_event["segment_id"]
processed["segment_id"] = maybe(
_coerce_segment_id, replay_event.get("segment_id")
)
processed["trace_ids"] = self.__process_trace_ids(replay_event.get("trace_ids"))

processed["timestamp"] = self.__extract_timestamp(
replay_event["timestamp"],
processed["timestamp"] = default(
utcnow, maybe(datetimeify, replay_event.get("timestamp"))
)
processed["replay_start_timestamp"] = self.__extract_started_at(
replay_event.get("replay_start_timestamp"),
processed["replay_start_timestamp"] = maybe(
datetimeify, replay_event.get("replay_start_timestamp")
)
processed["urls"] = self.__extract_urls(replay_event)
processed["release"] = maybe(stringify, replay_event.get("release"))
Expand All @@ -118,7 +105,9 @@ def _process_base_replay_event_values(
processed["error_ids"] = self.__process_error_ids(replay_event.get("error_ids"))

# Archived can only be 1 or null.
processed["is_archived"] = 1 if replay_event.get("is_archived") else None
processed["is_archived"] = (
1 if replay_event.get("is_archived") is True else None
)

def _process_tags(
self, processed: MutableMapping[str, Any], replay_event: ReplayEventDict
Expand Down Expand Up @@ -249,6 +238,14 @@ def process_message(
U = TypeVar("U")


def default(default: Callable[[], T], value: T | None) -> T:
"""Return a default value only if the given value was null.

Falsey types such as 0, "", False, [], {} are returned.
"""
return default() if value is None else value


def maybe(into: Callable[[T], U], value: T | None) -> U | None:
"""Optionally return a processed value."""
return None if value is None else into(value)
Expand All @@ -274,6 +271,39 @@ def _encode_utf8(value: str) -> str:
return value.encode("utf8", errors="backslashreplace").decode("utf8")


def datetimeify(value: Any) -> datetime:
"""Return a datetime instance or err.

Datetimes for the replays schema standardize on 32 bit dates.
"""
return _timestamp_to_datetime(_collapse_or_err(_collapse_uint32, int(value)))


def _collapse_or_err(callable: Callable[[int], int | None], value: int) -> int:
"""Return the integer or error if it overflows."""
if callable(value) is None:
# This exception can only be triggered through abuse. We choose not to suppress these
# exceptions in favor of identifying the origin.
raise ValueError(f'Integer "{value}" overflowed.')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this cause the whole replay processing pipeline to stop if one org is causing the abuse?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This particular error should trigger the replays DLQ (let me know if this assumption is incorrect). An exception raised after the processing step (e.g. the write step) caused INC-250. I don't believe this change introduces any pipeline risk.

else:
return value


def _timestamp_to_datetime(timestamp: int) -> datetime:
"""Convert an integer timestamp to a timezone-aware utc datetime instance."""
return datetime.fromtimestamp(timestamp, tz=timezone.utc)


def _coerce_segment_id(value: Any) -> int:
"""Return a 16-bit integer or err."""
return _collapse_or_err(_collapse_uint16, int(value))


def utcnow() -> datetime:
"""Return a timezone-aware utc datetime."""
return datetime.now(timezone.utc)


# Tags processor.


Expand Down
Loading