From 95dfafc2c2c73e1a4c1158d988591adbe859ebe8 Mon Sep 17 00:00:00 2001 From: "Michael S. Molina" Date: Mon, 10 Mar 2025 16:24:56 -0300 Subject: [PATCH 1/2] fix: Log table retention policy --- superset/commands/logs/prune.py | 109 ++++++++++++++++++++++++++++++++ superset/config.py | 10 ++- superset/tasks/scheduler.py | 22 +++++++ 3 files changed, 139 insertions(+), 2 deletions(-) create mode 100644 superset/commands/logs/prune.py diff --git a/superset/commands/logs/prune.py b/superset/commands/logs/prune.py new file mode 100644 index 000000000000..6a9cffb3f738 --- /dev/null +++ b/superset/commands/logs/prune.py @@ -0,0 +1,109 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +import logging +import time +from datetime import datetime, timedelta + +import sqlalchemy as sa + +from superset import db +from superset.commands.base import BaseCommand +from superset.models.core import Log + +logger = logging.getLogger(__name__) + + +# pylint: disable=consider-using-transaction +class LogPruneCommand(BaseCommand): + """ + Command to prune the logs table by deleting rows older than the specified retention period. + + This command deletes records from the `Log` table that have not been changed within the + specified number of days. It helps in maintaining the database by removing outdated entries + and freeing up space. + + Attributes: + retention_period_days (int): The number of days for which records should be retained. + Records older than this period will be deleted. + """ # noqa: E501 + + def __init__(self, retention_period_days: int): + """ + :param retention_period_days: Number of days to keep in the logs table + """ + self.retention_period_days = retention_period_days + + def run(self) -> None: + """ + Executes the prune command + """ + batch_size = 999 # SQLite has a IN clause limit of 999 + total_deleted = 0 + start_time = time.time() + + # Select all IDs that need to be deleted + ids_to_delete = ( + db.session.execute( + sa.select(Log.id).where( + Log.dttm + < datetime.now() - timedelta(days=self.retention_period_days) + ) + ) + .scalars() + .all() + ) + + total_rows = len(ids_to_delete) + + logger.info("Total rows to be deleted: %s", total_rows) + + next_logging_threshold = 1 + + # Iterate over the IDs in batches + for i in range(0, total_rows, batch_size): + batch_ids = ids_to_delete[i : i + batch_size] + + # Delete the selected batch using IN clause + result = db.session.execute(sa.delete(Log).where(Log.id.in_(batch_ids))) + + # Update the total number of deleted records + total_deleted += result.rowcount + + # Explicitly commit the transaction given that if an error occurs, we want to ensure that the # noqa: E501 + # records that have been deleted so far are committed + db.session.commit() + + # Log the number of deleted records every 1% increase in progress + percentage_complete = (total_deleted / total_rows) * 100 + if percentage_complete >= next_logging_threshold: + logger.info( + "Deleted %s rows from the logs table older than %s days (%d%% complete)", # noqa: E501 + total_deleted, + self.retention_period_days, + percentage_complete, + ) + next_logging_threshold += 1 + + elapsed_time = time.time() - start_time + minutes, seconds = divmod(elapsed_time, 60) + formatted_time = f"{int(minutes):02}:{int(seconds):02}" + logger.info( + "Pruning complete: %s rows deleted in %s", total_deleted, formatted_time + ) + + def validate(self) -> None: + pass diff --git a/superset/config.py b/superset/config.py index d7c451feb747..1df29713f113 100644 --- a/superset/config.py +++ b/superset/config.py @@ -1040,6 +1040,12 @@ class CeleryConfig: # pylint: disable=too-few-public-methods # "schedule": crontab(minute=0, hour=0, day_of_month=1), # "kwargs": {"retention_period_days": 180}, # }, + # Uncomment to enable pruning of the logs table + # "prune_logs": { + # "task": "prune_logs", + # "schedule": crontab(minute="*", hour="*"), + # "kwargs": {"retention_period_days": 180}, + # }, } @@ -1134,8 +1140,8 @@ class CeleryConfig: # pylint: disable=too-few-public-methods # else: # return f'tmp_{schema}' # Function accepts database object, user object, schema name and sql that will be run. -SQLLAB_CTAS_SCHEMA_NAME_FUNC: ( - None | (Callable[[Database, models.User, str, str], str]) +SQLLAB_CTAS_SCHEMA_NAME_FUNC: None | ( + Callable[[Database, models.User, str, str], str] ) = None # If enabled, it can be used to store the results of long-running queries diff --git a/superset/tasks/scheduler.py b/superset/tasks/scheduler.py index 25158bd04fb3..f894a6794b88 100644 --- a/superset/tasks/scheduler.py +++ b/superset/tasks/scheduler.py @@ -23,6 +23,7 @@ from superset import app, is_feature_enabled from superset.commands.exceptions import CommandException +from superset.commands.logs.prune import LogPruneCommand from superset.commands.report.exceptions import ReportScheduleUnexpectedError from superset.commands.report.execute import AsyncExecuteReportScheduleCommand from superset.commands.report.log_prune import AsyncPruneReportScheduleLogCommand @@ -142,3 +143,24 @@ def prune_query(retention_period_days: Optional[int] = None) -> None: QueryPruneCommand(retention_period_days).run() except CommandException as ex: logger.exception("An error occurred while pruning queries: %s", ex) + + +@celery_app.task(name="prune_logs") +def prune_logs(retention_period_days: Optional[int] = None) -> None: + stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"] + stats_logger.incr("prune_logs") + + # TODO: Deprecated: Remove support for passing retention period via options in 6.0 + if retention_period_days is None: + retention_period_days = prune_logs.request.properties.get( + "retention_period_days" + ) + logger.warning( + "Your `prune_logs` beat schedule uses `options` to pass the retention " + "period, please use `kwargs` instead." + ) + + try: + LogPruneCommand(retention_period_days).run() + except CommandException as ex: + logger.exception("An error occurred while pruning logs: %s", ex) From 091581ff5291f7d464e99a1909458cd76bbbd0c3 Mon Sep 17 00:00:00 2001 From: "Michael S. Molina" Date: Mon, 10 Mar 2025 16:41:25 -0300 Subject: [PATCH 2/2] Fixes ruff --- superset/config.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/superset/config.py b/superset/config.py index 1df29713f113..d36fe30f6965 100644 --- a/superset/config.py +++ b/superset/config.py @@ -475,7 +475,7 @@ class D3TimeFormat(TypedDict, total=False): "PRESTO_EXPAND_DATA": False, # Exposes API endpoint to compute thumbnails "THUMBNAILS": False, - # Enable the endpoints to cache and retrieve dashboard screenshots via webdriver. + # Enables the endpoints to cache and retrieve dashboard screenshots via webdriver. # Requires configuring Celery and a cache using THUMBNAIL_CACHE_CONFIG. "ENABLE_DASHBOARD_SCREENSHOT_ENDPOINTS": False, # Generate screenshots (PDF or JPG) of dashboards using the web driver. @@ -1140,8 +1140,8 @@ class CeleryConfig: # pylint: disable=too-few-public-methods # else: # return f'tmp_{schema}' # Function accepts database object, user object, schema name and sql that will be run. -SQLLAB_CTAS_SCHEMA_NAME_FUNC: None | ( - Callable[[Database, models.User, str, str], str] +SQLLAB_CTAS_SCHEMA_NAME_FUNC: ( + None | (Callable[[Database, models.User, str, str], str]) ) = None # If enabled, it can be used to store the results of long-running queries