Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow efficient trimming of history #77

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 41 additions & 13 deletions pycrdt_websocket/ystore.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from inspect import isawaitable
from logging import Logger, getLogger
from pathlib import Path
from typing import AsyncIterator, Awaitable, Callable, cast
from typing import AsyncIterator, Awaitable, Callable, Literal, cast

import anyio
from anyio import TASK_STATUS_IGNORED, Event, Lock, create_task_group
Expand Down Expand Up @@ -323,6 +323,10 @@ class MySQLiteYStore(SQLiteYStore):
# latest update of a document must be before purging document history.
# Defaults to never purging document history (None).
document_ttl: int | None = None
# The maximum length of the history of the documents in seconds that is kept.
history_length: int | None = None
# The minimum interval in seconds between history cleanup operations.
min_cleanup_interval: int = 60
path: str
lock: Lock
db_initialized: Event | None
Expand Down Expand Up @@ -478,36 +482,60 @@ async def write(self, data: bytes) -> None:
async with self._db:
# first, determine time elapsed since last update
cursor = await self._db.cursor()
await cursor.execute(
"SELECT timestamp FROM yupdates WHERE path = ? "
"ORDER BY timestamp DESC LIMIT 1",
(self.path,),
)
row = await cursor.fetchone()
diff = (time.time() - row[0]) if row else 0

if self.document_ttl is not None and diff > self.document_ttl:
newest_diff = await self._get_time_differential_to_entry(cursor, direction="DESC")
oldest_diff = await self._get_time_differential_to_entry(cursor, direction="ASC")

squashed = False
if (self.document_ttl is not None and newest_diff > self.document_ttl) or (
self.history_length is not None
and oldest_diff > self.min_cleanup_interval + self.history_length
):
# squash updates
ydoc = Doc()
older_than = time.time() - (
self.history_length if self.history_length is not None else 0
)
await cursor.execute(
"SELECT yupdate FROM yupdates WHERE path = ?",
(self.path,),
"SELECT yupdate FROM yupdates WHERE path = ? AND timestamp < ?",
(self.path, older_than),
)
for (update,) in await cursor.fetchall():
ydoc.apply_update(update)
# delete history
await cursor.execute("DELETE FROM yupdates WHERE path = ?", (self.path,))
# delete older history
await cursor.execute(
"DELETE FROM yupdates WHERE path = ? AND timestamp < ?",
(self.path, older_than),
)
# insert squashed updates
squashed_update = ydoc.get_update()
metadata = await self.get_metadata()
await cursor.execute(
"INSERT INTO yupdates VALUES (?, ?, ?, ?)",
(self.path, squashed_update, metadata, time.time()),
)
squashed = True

# finally, write this update to the DB
metadata = await self.get_metadata()
await cursor.execute(
"INSERT INTO yupdates VALUES (?, ?, ?, ?)",
(self.path, data, metadata, time.time()),
)

if squashed:
# Vacuuming database
await self._db.commit()
await cursor.execute("VACUUM")

async def _get_time_differential_to_entry(
self, cursor, direction: Literal["ASC", "DESC"] = "DESC"
) -> float:
"""Get the time differential to the newest (DESC) or oldest (ASC) entry in the database."""
await cursor.execute(
"SELECT timestamp FROM yupdates WHERE path = ? "
f"ORDER BY timestamp {direction} LIMIT 1",
(self.path,),
)
row = await cursor.fetchone()
return (time.time() - row[0]) if row else 0
Loading