Skip to content

Commit

Permalink
attempt to fix cff duplicates
Browse files Browse the repository at this point in the history
  • Loading branch information
adrian-codecov committed Jun 6, 2024
1 parent 2d39bca commit 16370ed
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 8 deletions.
Empty file added services/delete/__init__.py
Empty file.
41 changes: 41 additions & 0 deletions services/delete/delete.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging
from itertools import islice

import sentry_sdk

from database.engine import Session
from database.models import (
Commit,
Upload,
UploadError,
UploadLevelTotals,
uploadflagmembership,
)

log = logging.getLogger(__name__)


@sentry_sdk.trace
def delete_upload_by_ids(self, db_session: Session, upload_ids, commit: Commit):
db_session.query(UploadError).filter(UploadError.upload_id.in_(upload_ids)).delete(
synchronize_session=False
)
db_session.query(UploadLevelTotals).filter(
UploadLevelTotals.upload_id.in_(upload_ids)
).delete(synchronize_session=False)
db_session.query(uploadflagmembership).filter(
uploadflagmembership.c.upload_id.in_(upload_ids)
).delete(synchronize_session=False)
db_session.query(Upload).filter(Upload.id.in_(upload_ids)).delete(
synchronize_session=False
)
db_session.commit()
log.info(
"Deleted uploads",
extra=dict(
commit=commit.commitid,
repo=commit.repoid,
number_uploads=len(upload_ids),
upload_ids=dict(islice(upload_ids, 20)),
),
)
111 changes: 103 additions & 8 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import sys
import typing
import uuid
from collections import defaultdict
from dataclasses import dataclass
from json import loads
from time import time
from typing import Any, Dict, Mapping, Optional, Sequence
from typing import Any, Dict, List, Mapping, Optional, Sequence

import sentry_sdk
from celery.exceptions import SoftTimeLimitExceeded
Expand Down Expand Up @@ -49,6 +50,7 @@
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
)
from services.archive import ArchiveService
from services.delete.delete import delete_upload_by_ids
from services.redis import (
PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL,
get_parallel_upload_processing_session_counter_redis_key,
Expand All @@ -60,6 +62,8 @@
from services.repository import get_repo_provider_service
from services.yaml.reader import get_paths_from_flags, read_yaml_field

MINIMUM_PARENT_COMMIT_DEPTH = 1


@dataclass
class ProcessingError(object):
Expand Down Expand Up @@ -646,7 +650,9 @@ def get_appropriate_commit_to_carryforward_from(
) -> Optional[Commit]:
parent_commit = commit.get_parent_commit()
parent_commit_tracking = []
count = 1 # `parent_commit` is already the first parent
count = (
MINIMUM_PARENT_COMMIT_DEPTH # `parent_commit` is already the first parent
)
while (
parent_commit is not None
and parent_commit.state not in ("complete", "skipped")
Expand Down Expand Up @@ -688,7 +694,7 @@ def get_appropriate_commit_to_carryforward_from(
parent_tracing=parent_commit_tracking,
),
)
return None
return (None, None)
if parent_commit.state not in ("complete", "skipped"):
log.warning(
"None of the parent commits were in a complete state to be used as CFing base",
Expand All @@ -700,8 +706,8 @@ def get_appropriate_commit_to_carryforward_from(
would_be_parent=parent_commit.commitid,
),
)
return None
return parent_commit
return (None, None)
return (parent_commit, count)

async def _possibly_shift_carryforward_report(
self, carryforward_report: Report, base_commit: Commit, head_commit: Commit
Expand Down Expand Up @@ -778,8 +784,10 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report:
)
)

parent_commit = self.get_appropriate_commit_to_carryforward_from(
commit, max_parenthood_deepness
parent_commit, parent_depth = (
self.get_appropriate_commit_to_carryforward_from(
commit, max_parenthood_deepness
)
)
if parent_commit is None:
log.warning(
Expand Down Expand Up @@ -807,6 +815,14 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report:
if self.current_yaml.flag_has_carryfoward(flag_name):
flags_to_carryforward.append(flag_name)
if not flags_to_carryforward:
log.warning(
"There weren't flags to carry forward in the parent commit",
extra=dict(
commit=commit.commitid,
repoid=commit.repoid,
parent_commit=parent_commit.commitid,
),
)
return Report()
paths_to_carryforward = get_paths_from_flags(
self.current_yaml, flags_to_carryforward
Expand All @@ -826,7 +842,10 @@ async def create_new_report_for_commit(self, commit: Commit) -> Report:
parent_report,
flags_to_carryforward,
paths_to_carryforward,
session_extras=dict(carriedforward_from=parent_commit.commitid),
session_extras=dict(
carriedforward_from=parent_commit.commitid,
parent_depth=parent_depth,
),
)
# If the parent report has labels we also need to carryforward the label index
# Considerations:
Expand Down Expand Up @@ -1173,6 +1192,71 @@ def save_report(self, commit: Commit, report: Report, report_code=None):
)
return {"url": url}

def _determine_existing_cffs_and_depths(
self, db_session, commit: Commit
) -> Optional[Dict[int, int]]:
depths_with_cff_upload_ids = defaultdict(list)
existing_cff_uploads: Optional[List[Upload]] = db_session.query(Upload).filter(
(
Upload.report_id == commit.report.id_,
Upload.upload_type == UploadType.CARRIEDFORWARD.db_id,
)
)

for upload in existing_cff_uploads:
parent_depth = upload.upload_extras.get("parent_depth")
if parent_depth:
depths_with_cff_upload_ids[parent_depth].append(upload.id)

log.info(
"Existing cffs and their respective depths",
extra=dict(
commit=commit.commitid,
repo=commit.repoid,
existing_uploads=dict(
itertools.islice(existing_cff_uploads.items(), 20)
),
),
)
return depths_with_cff_upload_ids

def _possibly_delete_existing_cffs(
self,
db_session,
commit: Commit,
cff_report: Report,
):
existing_cffs_and_depths = self._determine_existing_cffs_and_depths(
db_session=db_session, commit=commit
)

if not existing_cffs_and_depths:
return

# Gets first 'parent_depth' it finds, as all cffs should be carried forward
# by the same parent within the same report
cff_report_parent_depth = MINIMUM_PARENT_COMMIT_DEPTH
for session in cff_report.sessions.values():
if session.upload_extras.get("parent_depth"):
cff_report_parent_depth = session.upload_extras.get("parent_depth")
break

# Delete all cff uploads that have a higher depth than the one in
# the latest report
for parent_depth, upload_ids in existing_cffs_and_depths.items():
if cff_report_parent_depth > parent_depth:
log.info(
"Deleting upload from DB",
extra=dict(
repoid=commit.repoid,
commit=commit.commitid,
),
)
# Potentially make task here - this would make the preprocess task likely timeout
delete_upload_by_ids(
db_session=db_session, upload_ids=upload_ids, commit=commit
)

def save_full_report(self, commit: Commit, report: Report, report_code=None):
"""
Saves the report (into database and storage) AND takes care of backfilling its sessions
Expand All @@ -1193,6 +1277,17 @@ def save_full_report(self, commit: Commit, report: Report, report_code=None):
)
res = self.save_report(commit, report, report_code)
db_session = commit.get_db_session()

# We've seen instances where the a commit report carries forward
# uploads from multiple parents, a side effect of choosing a different parent
# commit when the direct parent is still pending. This function will delete out
# of date uploads that weren't carried forward by the closest parent commit
self._possibly_delete_existing_cffs(
db_session=db_session,
commit=Commit,
cff_report=report,
)

for sess_id, session in report.sessions.items():
upload = Upload(
build_code=session.build,
Expand Down

0 comments on commit 16370ed

Please sign in to comment.