Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remove og lts recording storage #25945

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 8 additions & 107 deletions ee/session_recordings/session_recording_extensions.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
# EE extended functions for SessionRecording model
import gzip
import json
from datetime import timedelta, datetime
from typing import Optional, cast
from datetime import timedelta

import structlog
from django.utils import timezone
from prometheus_client import Histogram, Counter
from sentry_sdk import capture_exception, capture_message

from posthog import settings
from posthog.session_recordings.models.metadata import PersistedRecordingV1
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.session_recordings.session_recording_helpers import decompress
from posthog.storage import object_storage

logger = structlog.get_logger(__name__)
Expand All @@ -37,38 +31,12 @@
"Count of session recordings that were too young to be persisted",
)

MINIMUM_AGE_FOR_RECORDING = timedelta(hours=24)


# TODO rename this...
def save_recording_with_new_content(recording: SessionRecording, content: str) -> str:
if not settings.OBJECT_STORAGE_ENABLED:
return ""

logger.info(
"re-saving recording file into 2023-08-01 LTS storage format",
recording_id=recording.session_id,
team_id=recording.team_id,
)

target_prefix = recording.build_object_storage_path("2023-08-01")

start = int(cast(datetime, recording.start_time).timestamp() * 1000)
end = int(cast(datetime, recording.end_time).timestamp() * 1000)
new_path = f"{target_prefix}/{start}-{end}"

zipped_content = gzip.compress(content.encode("utf-8"))
object_storage.write(
new_path,
zipped_content,
extras={"ContentType": "application/json", "ContentEncoding": "gzip"},
)

recording.storage_version = "2023-08-01"
recording.object_storage_path = target_prefix
recording.save()
RECORDING_PERSIST_START_COUNTER = Counter(
"recording_persist_started",
"Count of session recordings that were persisted",
)

return new_path
MINIMUM_AGE_FOR_RECORDING = timedelta(hours=24)


class InvalidRecordingForPersisting(Exception):
Expand All @@ -78,8 +46,6 @@ class InvalidRecordingForPersisting(Exception):
def persist_recording(recording_id: str, team_id: int) -> None:
"""Persist a recording to the S3"""

logger.info("Persisting recording: init", recording_id=recording_id, team_id=team_id)

if not settings.OBJECT_STORAGE_ENABLED:
return

Expand All @@ -96,27 +62,18 @@ def persist_recording(recording_id: str, team_id: int) -> None:
)
return

logger.info(
"Persisting recording: loading metadata...",
recording_id=recording_id,
team_id=team_id,
)
RECORDING_PERSIST_START_COUNTER.inc()

recording.load_metadata()

if not recording.start_time or timezone.now() < recording.start_time + MINIMUM_AGE_FOR_RECORDING:
# Recording is too recent to be persisted.
# We can save the metadata as it is still useful for querying, but we can't move to S3 yet.
logger.info(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i've not looked at these logs in a year, let's remove them

"Persisting recording: skipping as recording start time is less than MINIMUM_AGE_FOR_RECORDING",
recording_id=recording_id,
team_id=team_id,
)
SNAPSHOT_PERSIST_TOO_YOUNG_COUNTER.inc()
recording.save()
return

target_prefix = recording.build_object_storage_path("2023-08-01")
target_prefix = recording.build_blob_lts_storage_path("2023-08-01")
source_prefix = recording.build_blob_ingestion_storage_path()
# if snapshots are already in blob storage, then we can just copy the files between buckets
with SNAPSHOT_PERSIST_TIME_HISTOGRAM.time():
Expand All @@ -127,12 +84,6 @@ def persist_recording(recording_id: str, team_id: int) -> None:
recording.object_storage_path = target_prefix
recording.save()
SNAPSHOT_PERSIST_SUCCESS_COUNTER.inc()
logger.info(
"Persisting recording: done!",
recording_id=recording_id,
team_id=team_id,
source="s3",
)
return
else:
SNAPSHOT_PERSIST_FAILURE_COUNTER.inc()
Expand All @@ -144,53 +95,3 @@ def persist_recording(recording_id: str, team_id: int) -> None:
source_prefix=source_prefix,
)
raise InvalidRecordingForPersisting("Could not persist recording: " + recording_id)


def load_persisted_recording(recording: SessionRecording) -> Optional[PersistedRecordingV1]:
"""Load a persisted recording from S3"""

logger.info(
"Persisting recording load: reading from S3...",
recording_id=recording.session_id,
storage_version=recording.storage_version,
path=recording.object_storage_path,
)

# originally storage version was written to the stored content
# some stored content is stored over multiple files, so we can't rely on that
# future recordings will have the storage version on the model
# and will not be loaded here
if not recording.storage_version:
try:
content = object_storage.read(str(recording.object_storage_path))
decompressed = json.loads(decompress(content)) if content else None
logger.info(
"Persisting recording load: loaded!",
recording_id=recording.session_id,
path=recording.object_storage_path,
)

return decompressed
except object_storage.ObjectStorageError as ose:
capture_exception(ose)
logger.error(
"session_recording.object-storage-load-error",
recording_id=recording.session_id,
path=recording.object_storage_path,
version="2022-12-22",
exception=ose,
exc_info=True,
)

capture_message(
"session_recording.load_persisted_recording.unexpected_recording_storage_version",
extras={
"recording_id": recording.session_id,
"storage_version": recording.storage_version,
"path": recording.object_storage_path,
},
tags={
"team_id": recording.team_id,
},
)
return None
72 changes: 17 additions & 55 deletions ee/session_recordings/test/test_session_recording_extensions.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,14 @@
import gzip
from datetime import timedelta, datetime, UTC
from secrets import token_urlsafe
from unittest.mock import patch, MagicMock
from uuid import uuid4

from boto3 import resource
from botocore.config import Config
from freezegun import freeze_time

from ee.session_recordings.session_recording_extensions import (
load_persisted_recording,
persist_recording,
save_recording_with_new_content,
)
from posthog.models.signals import mute_selected_signals
from posthog.session_recordings.models.session_recording import SessionRecording
from posthog.session_recordings.queries.test.session_replay_sql import (
produce_replay_summary,
Expand All @@ -24,7 +19,7 @@
OBJECT_STORAGE_SECRET_ACCESS_KEY,
OBJECT_STORAGE_BUCKET,
)
from posthog.storage.object_storage import write, list_objects
from posthog.storage.object_storage import write, list_objects, object_storage_client
from posthog.test.base import APIBaseTest, ClickhouseTestMixin

long_url = f"https://app.posthog.com/my-url?token={token_urlsafe(600)}"
Expand Down Expand Up @@ -64,21 +59,19 @@ def test_does_not_persist_too_recent_recording(self):

assert not recording.object_storage_path

def test_can_build_different_object_storage_paths(self) -> None:
def test_can_build_object_storage_paths(self) -> None:
produce_replay_summary(
session_id="test_can_build_different_object_storage_paths-s1",
team_id=self.team.pk,
)

recording: SessionRecording = SessionRecording.objects.create(
team=self.team,
session_id="test_can_build_different_object_storage_paths-s1",
)

assert (
recording.build_object_storage_path("2022-12-22")
== f"session_recordings_lts/team-{self.team.pk}/session-test_can_build_different_object_storage_paths-s1"
)
assert (
recording.build_object_storage_path("2023-08-01")
recording.build_blob_lts_storage_path("2023-08-01")
== f"session_recordings_lts/team_id/{self.team.pk}/session_id/test_can_build_different_object_storage_paths-s1/data"
)

Expand All @@ -100,14 +93,21 @@ def test_persists_recording_from_blob_ingested_storage(self):

# this recording already has several files stored from Mr. Blobby
# these need to be written before creating the recording object
blob_path = f"{TEST_BUCKET}/team_id/{self.team.pk}/session_id/{session_id}/data"
for file in ["a", "b", "c"]:
blob_path = f"{TEST_BUCKET}/team_id/{self.team.pk}/session_id/{session_id}/data"
file_name = f"{blob_path}/{file}"
write(file_name, f"my content-{file}".encode())

assert object_storage_client().list_objects(OBJECT_STORAGE_BUCKET, blob_path) == [
f"{blob_path}/a",
f"{blob_path}/b",
f"{blob_path}/c",
]

Comment on lines +101 to +105
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my change at first broke this test but it failed elsewhere... added this assertion so that i got a meaningful failure

recording: SessionRecording = SessionRecording.objects.create(team=self.team, session_id=session_id)

assert recording.created_at == two_minutes_ago
assert recording.storage_version is None

persist_recording(recording.session_id, recording.team_id)
recording.refresh_from_db()
Expand All @@ -126,47 +126,9 @@ def test_persists_recording_from_blob_ingested_storage(self):
assert recording.keypress_count == 0
assert recording.start_url == "https://app.posthog.com/my-url"

# recordings which were blob ingested can not be loaded with this mechanism
assert load_persisted_recording(recording) is None

stored_objects = list_objects(recording.build_object_storage_path("2023-08-01"))
stored_objects = list_objects(recording.build_blob_lts_storage_path("2023-08-01"))
assert stored_objects == [
f"{recording.build_object_storage_path('2023-08-01')}/a",
f"{recording.build_object_storage_path('2023-08-01')}/b",
f"{recording.build_object_storage_path('2023-08-01')}/c",
f"{recording.build_blob_lts_storage_path('2023-08-01')}/a",
f"{recording.build_blob_lts_storage_path('2023-08-01')}/b",
f"{recording.build_blob_lts_storage_path('2023-08-01')}/c",
]

@patch("ee.session_recordings.session_recording_extensions.object_storage.write")
def test_can_save_content_to_new_location(self, mock_write: MagicMock):
# mute selected signals so the post create signal does not try to persist the recording
with self.settings(OBJECT_STORAGE_SESSION_RECORDING_BLOB_INGESTION_FOLDER=TEST_BUCKET), mute_selected_signals():
session_id = f"{uuid4()}"

recording = SessionRecording.objects.create(
team=self.team,
session_id=session_id,
start_time=datetime.fromtimestamp(12345),
end_time=datetime.fromtimestamp(12346),
object_storage_path="some_starting_value",
# None, but that would trigger the persistence behavior, and we don't want that
storage_version="None",
)

new_key = save_recording_with_new_content(recording, "the new content")

recording.refresh_from_db()

expected_path = f"session_recordings_lts/team_id/{self.team.pk}/session_id/{recording.session_id}/data"
assert new_key == f"{expected_path}/12345000-12346000"

assert recording.object_storage_path == expected_path
assert recording.storage_version == "2023-08-01"

mock_write.assert_called_with(
f"{expected_path}/12345000-12346000",
gzip.compress(b"the new content"),
extras={
"ContentEncoding": "gzip",
"ContentType": "application/json",
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -222,8 +222,22 @@ export const parseEncodedSnapshots = async (
return []
}
try {
const snapshotLine = typeof l === 'string' ? (JSON.parse(l) as EncodedRecordingSnapshot) : l
const snapshotData = isRecordingSnapshot(snapshotLine) ? [snapshotLine] : snapshotLine['data']
let snapshotLine: { windowId: string } | EncodedRecordingSnapshot
if (typeof l === 'string') {
// is loaded from blob or realtime storage
snapshotLine = JSON.parse(l) as EncodedRecordingSnapshot
} else {
// is loaded from file export
snapshotLine = l
}
let snapshotData: ({ windowId: string } | EncodedRecordingSnapshot)[]
if (isRecordingSnapshot(snapshotLine)) {
// is loaded from file export
snapshotData = [snapshotLine]
} else {
// is loaded from blob or realtime storage
snapshotData = snapshotLine['data']
}

if (!isMobileSnapshots) {
isMobileSnapshots = hasAnyWireframes(snapshotData)
Expand Down
6 changes: 0 additions & 6 deletions posthog/session_recordings/models/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,3 @@ class RecordingMetadata(TypedDict):

class RecordingMatchingEvents(TypedDict):
events: list[MatchingSessionRecordingEvent]


class PersistedRecordingV1(TypedDict):
version: str # "2022-12-22"
snapshot_data_by_window_id: dict[WindowId, list[Union[SnapshotData, SessionRecordingEventSummary]]]
distinct_id: str
19 changes: 5 additions & 14 deletions posthog/session_recordings/models/session_recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,14 @@ def check_viewed_for_user(self, user: Any, save_viewed=False) -> None:
SessionRecordingViewed.objects.get_or_create(team=self.team, user=user, session_id=self.session_id)
self.viewed = True

def build_object_storage_path(self, version: Literal["2023-08-01", "2022-12-22"]) -> str:
if version == "2022-12-22":
path_parts: list[str] = [
settings.OBJECT_STORAGE_SESSION_RECORDING_LTS_FOLDER,
f"team-{self.team_id}",
f"session-{self.session_id}",
]
return "/".join(path_parts)
elif version == "2023-08-01":
return self._build_session_blob_path(settings.OBJECT_STORAGE_SESSION_RECORDING_LTS_FOLDER)
def build_blob_lts_storage_path(self, version: Literal["2023-08-01"]) -> str:
if version == "2023-08-01":
return self.build_blob_ingestion_storage_path(settings.OBJECT_STORAGE_SESSION_RECORDING_LTS_FOLDER)
else:
raise NotImplementedError(f"Unknown session replay object storage version {version}")

def build_blob_ingestion_storage_path(self) -> str:
return self._build_session_blob_path(settings.OBJECT_STORAGE_SESSION_RECORDING_BLOB_INGESTION_FOLDER)

def _build_session_blob_path(self, root_prefix: str) -> str:
def build_blob_ingestion_storage_path(self, root_prefix: Optional[str] = None) -> str:
root_prefix = root_prefix or settings.OBJECT_STORAGE_SESSION_RECORDING_BLOB_INGESTION_FOLDER
return f"{root_prefix}/team_id/{self.team_id}/session_id/{self.session_id}/data"

@staticmethod
Expand Down
Loading
Loading