-
Notifications
You must be signed in to change notification settings - Fork 16.6k
fix: Log table retention policy #32572
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,109 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| import logging | ||
| import time | ||
| from datetime import datetime, timedelta | ||
|
|
||
| import sqlalchemy as sa | ||
|
|
||
| from superset import db | ||
| from superset.commands.base import BaseCommand | ||
| from superset.models.core import Log | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| # pylint: disable=consider-using-transaction | ||
| class LogPruneCommand(BaseCommand): | ||
| """ | ||
| Command to prune the logs table by deleting rows older than the specified retention period. | ||
|
|
||
| This command deletes records from the `Log` table that have not been changed within the | ||
| specified number of days. It helps in maintaining the database by removing outdated entries | ||
| and freeing up space. | ||
|
|
||
| Attributes: | ||
| retention_period_days (int): The number of days for which records should be retained. | ||
| Records older than this period will be deleted. | ||
| """ # noqa: E501 | ||
|
|
||
| def __init__(self, retention_period_days: int): | ||
| """ | ||
| :param retention_period_days: Number of days to keep in the logs table | ||
| """ | ||
| self.retention_period_days = retention_period_days | ||
|
|
||
| def run(self) -> None: | ||
| """ | ||
| Executes the prune command | ||
| """ | ||
| batch_size = 999 # SQLite has a IN clause limit of 999 | ||
| total_deleted = 0 | ||
| start_time = time.time() | ||
|
|
||
| # Select all IDs that need to be deleted | ||
| ids_to_delete = ( | ||
| db.session.execute( | ||
| sa.select(Log.id).where( | ||
| Log.dttm | ||
| < datetime.now() - timedelta(days=self.retention_period_days) | ||
| ) | ||
| ) | ||
| .scalars() | ||
| .all() | ||
| ) | ||
|
Comment on lines
+59
to
+68
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Memory-intensive ID loading
Tell me moreWhat is the issue?Loading all IDs into memory at once could cause memory issues with large log tables. Why this mattersFor tables with millions of records to delete, this approach could exhaust available memory and crash the application. Suggested change ∙ Feature Previewdef run(self) -> None:
batch_size = 999
total_deleted = 0
start_time = time.time()
while True:
# Select only the next batch of IDs
ids_to_delete = (
db.session.execute(
sa.select(Log.id)
.where(Log.dttm < datetime.now() - timedelta(days=self.retention_period_days))
.limit(batch_size)
)
.scalars()
.all()
)
if not ids_to_delete:
break
result = db.session.execute(sa.delete(Log).where(Log.id.in_(ids_to_delete)))
total_deleted += result.rowcount
db.session.commit()
logger.info(
"Deleted %s rows from the logs table older than %s days",
total_deleted,
self.retention_period_days,
)💬 Looking for more details? Reply to this comment to chat with Korbit. |
||
|
|
||
| total_rows = len(ids_to_delete) | ||
|
|
||
| logger.info("Total rows to be deleted: %s", total_rows) | ||
|
|
||
| next_logging_threshold = 1 | ||
|
|
||
| # Iterate over the IDs in batches | ||
| for i in range(0, total_rows, batch_size): | ||
| batch_ids = ids_to_delete[i : i + batch_size] | ||
|
|
||
| # Delete the selected batch using IN clause | ||
| result = db.session.execute(sa.delete(Log).where(Log.id.in_(batch_ids))) | ||
|
|
||
| # Update the total number of deleted records | ||
| total_deleted += result.rowcount | ||
|
|
||
| # Explicitly commit the transaction given that if an error occurs, we want to ensure that the # noqa: E501 | ||
| # records that have been deleted so far are committed | ||
| db.session.commit() | ||
|
|
||
| # Log the number of deleted records every 1% increase in progress | ||
| percentage_complete = (total_deleted / total_rows) * 100 | ||
| if percentage_complete >= next_logging_threshold: | ||
| logger.info( | ||
| "Deleted %s rows from the logs table older than %s days (%d%% complete)", # noqa: E501 | ||
| total_deleted, | ||
| self.retention_period_days, | ||
| percentage_complete, | ||
| ) | ||
| next_logging_threshold += 1 | ||
|
|
||
| elapsed_time = time.time() - start_time | ||
| minutes, seconds = divmod(elapsed_time, 60) | ||
| formatted_time = f"{int(minutes):02}:{int(seconds):02}" | ||
| logger.info( | ||
| "Pruning complete: %s rows deleted in %s", total_deleted, formatted_time | ||
| ) | ||
|
|
||
| def validate(self) -> None: | ||
| pass | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ | |
|
|
||
| from superset import app, is_feature_enabled | ||
| from superset.commands.exceptions import CommandException | ||
| from superset.commands.logs.prune import LogPruneCommand | ||
| from superset.commands.report.exceptions import ReportScheduleUnexpectedError | ||
| from superset.commands.report.execute import AsyncExecuteReportScheduleCommand | ||
| from superset.commands.report.log_prune import AsyncPruneReportScheduleLogCommand | ||
|
|
@@ -142,3 +143,24 @@ | |
| QueryPruneCommand(retention_period_days).run() | ||
| except CommandException as ex: | ||
| logger.exception("An error occurred while pruning queries: %s", ex) | ||
|
|
||
|
|
||
| @celery_app.task(name="prune_logs") | ||
| def prune_logs(retention_period_days: Optional[int] = None) -> None: | ||
|
Comment on lines
+148
to
+149
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Missing Celery Task Performance Guards
Tell me moreWhat is the issue?The prune_logs task lacks performance-related task options that could help manage resource consumption during log pruning operations. Why this mattersWithout proper task options like rate limiting or soft/hard time limits, large log pruning operations could consume excessive system resources or run indefinitely, potentially impacting other operations. Suggested change ∙ Feature PreviewAdd appropriate Celery task options to manage resource consumption: @celery_app.task(
name="prune_logs",
soft_time_limit=3600, # 1 hour soft timeout
time_limit=3900, # 1 hour + 5 min hard timeout
rate_limit="1/hour" # Limit to one execution per hour
)💬 Looking for more details? Reply to this comment to chat with Korbit. |
||
| stats_logger: BaseStatsLogger = app.config["STATS_LOGGER"] | ||
| stats_logger.incr("prune_logs") | ||
|
|
||
| # TODO: Deprecated: Remove support for passing retention period via options in 6.0 | ||
| if retention_period_days is None: | ||
| retention_period_days = prune_logs.request.properties.get( | ||
| "retention_period_days" | ||
| ) | ||
| logger.warning( | ||
| "Your `prune_logs` beat schedule uses `options` to pass the retention " | ||
| "period, please use `kwargs` instead." | ||
| ) | ||
|
|
||
| try: | ||
| LogPruneCommand(retention_period_days).run() | ||
| except CommandException as ex: | ||
| logger.exception("An error occurred while pruning logs: %s", ex) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Generic exception log missing retention period context
Tell me moreWhat is the issue?The exception log message is too generic and lacks context about the retention period being used. Why this mattersDuring troubleshooting, it would be difficult to determine which retention period was active when the pruning failed, making debugging more time-consuming. Suggested change ∙ Feature Previewlogger.exception(
"An error occurred while pruning logs with retention period of %s days: %s",
retention_period_days,
ex
)💬 Looking for more details? Reply to this comment to chat with Korbit. |
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic Number Should Be Named Constant
Tell me more
What is the issue?
The magic number 999 should be defined as a named constant at the module or class level.
Why this matters
Magic numbers make code harder to maintain and understand their purpose without the comment. A named constant makes the intent clear and provides a single point of change.
Suggested change ∙ Feature Preview
💬 Looking for more details? Reply to this comment to chat with Korbit.