Skip to content

Commit

Permalink
fix(replays): Move code requiring exception-prone variables into try …
Browse files Browse the repository at this point in the history
…block (#64658)

Fixes INC-630. Variables are not defined if an exception is raised
causing an unhandled crashloop.
  • Loading branch information
cmanallen authored Feb 8, 2024
1 parent ffd6b4f commit 087e99c
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 17 deletions.
35 changes: 18 additions & 17 deletions src/sentry/replays/consumers/recording_buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
than if throughput is low. The number of messages being operated on in parallel is material only
insofar as it impacts the throughput of the consumer.
"""

from __future__ import annotations

import logging
Expand Down Expand Up @@ -238,6 +239,23 @@ def process_message(buffer: RecordingBuffer, message: bytes) -> None:
decoded_message["retention_days"],
parsed_recording_data,
)

if replay_actions is not None:
buffer.replay_action_events.append(replay_actions)

# Useful for computing the average cost of a replay.
metrics.distribution(
"replays.usecases.ingest.size_compressed",
len(recording_data),
unit="byte",
)

# Useful for computing the compression ratio.
metrics.distribution(
"replays.usecases.ingest.size_uncompressed",
len(decompressed_segment),
unit="byte",
)
except Exception:
logging.exception(
"Failed to parse recording org=%s, project=%s, replay=%s, segment=%s",
Expand All @@ -247,23 +265,6 @@ def process_message(buffer: RecordingBuffer, message: bytes) -> None:
headers["segment_id"],
)

if replay_actions is not None:
buffer.replay_action_events.append(replay_actions)

# Useful for computing the average cost of a replay.
metrics.distribution(
"replays.usecases.ingest.size_compressed",
len(recording_data),
unit="byte",
)

# Useful for computing the compression ratio.
metrics.distribution(
"replays.usecases.ingest.size_uncompressed",
len(decompressed_segment),
unit="byte",
)


# Commit.

Expand Down
41 changes: 41 additions & 0 deletions tests/sentry/replays/consumers/test_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,47 @@ def test_uncompressed_segment_ingestion(self, mock_record, mock_onboarding_task)
user_id=self.organization.default_owner_id,
)

@patch("sentry.models.OrganizationOnboardingTask.objects.record")
@patch("sentry.analytics.record")
@patch("sentry.replays.usecases.ingest.dom_index.emit_replay_actions")
def test_invalid_json(self, emit_replay_actions, mock_record, mock_onboarding_task):
"""Assert invalid JSON does not break ingestion.
In production, we'll never received invalid JSON. Its validated in Relay. However, we
may still encounter issues when deserializing JSON that are not encountered in Relay
(e.g. max depth). These issues should not break ingestion.
"""
segment_id = 0
self.submit(
self.nonchunked_messages(segment_id=segment_id, compressed=True, message=b"[{]")
)

# Data was persisted even though an error was encountered.
bytes = self.get_recording_data(segment_id)
assert bytes == zlib.compress(b"[{]")

# Onboarding and billing tasks were called.
self.project.refresh_from_db()
assert self.project.flags.has_replays

mock_onboarding_task.assert_called_with(
organization_id=self.project.organization_id,
task=OnboardingTask.SESSION_REPLAY,
status=OnboardingTaskStatus.COMPLETE,
date_completed=ANY,
)

mock_record.assert_called_with(
"first_replay.sent",
organization_id=self.organization.id,
project_id=self.project.id,
platform=self.project.platform,
user_id=self.organization.default_owner_id,
)

# No replay actions were emitted because JSON deserialization failed.
assert not emit_replay_actions.called


class ThreadedRecordingTestCase(RecordingTestCase):
force_synchronous = False
Expand Down

0 comments on commit 087e99c

Please sign in to comment.