Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add bundle analysis processor task #215

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion database/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ class TrialStatus(Enum):
class ReportType(Enum):
COVERAGE = "coverage"
TEST_RESULTS = "test_results"
BUNDLE_ANALYIS = "bundle_analysis"
BUNDLE_ANALYSIS = "bundle_analysis"
16 changes: 16 additions & 0 deletions database/models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,22 @@
.first()
)

def commit_report(self, report_type: ReportType):
db_session = self.get_db_session()
CommitReport = database.models.reports.CommitReport
if report_type == ReportType.COVERAGE:
return self.report

Check warning on line 228 in database/models/core.py

View check run for this annotation

Codecov - Staging / codecov/patch

database/models/core.py#L228

Added line #L228 was not covered by tests

Check warning on line 228 in database/models/core.py

View check run for this annotation

Codecov - QA / codecov/patch

database/models/core.py#L228

Added line #L228 was not covered by tests

Check warning on line 228 in database/models/core.py

View check run for this annotation

Codecov Public QA / codecov/patch

database/models/core.py#L228

Added line #L228 was not covered by tests
else:
return (
db_session.query(CommitReport)
.filter(
(CommitReport.commit_id == self.id_)
& (CommitReport.code == None)
& (CommitReport.report_type == report_type.value)
)
.first()
)

@property
def id(self):
return self.id_
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
# This file is autogenerated by pip-compile with Python 3.10
# by the following command:
#
# pip-compile requirements.in
# pip-compile
#
amqp==5.1.0
# via kombu
Expand Down
142 changes: 142 additions & 0 deletions services/bundle_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import logging
import os
import tempfile
from dataclasses import dataclass
from typing import Any, Dict, Optional

import sentry_sdk
from shared.bundle_analysis import BundleAnalysisReport, BundleAnalysisReportLoader
from shared.bundle_analysis.storage import BUCKET_NAME
from shared.reports.enums import UploadState
from shared.storage import get_appropriate_storage_service
from shared.storage.exceptions import FileNotInStorageError

from database.enums import ReportType
from database.models import Commit, CommitReport, Upload, UploadError
from services.archive import ArchiveService
from services.report import BaseReportService
from services.storage import get_storage_client

log = logging.getLogger(__name__)


@dataclass
class ProcessingError:
code: str
params: Dict[str, Any]
is_retryable: bool = False

def as_dict(self):
return {"code": self.code, "params": self.params}


@dataclass
class ProcessingResult:
upload: Upload
bundle_report: Optional[BundleAnalysisReport] = None
session_id: Optional[int] = None
error: Optional[ProcessingError] = None

def as_dict(self):
return {
"upload_id": self.upload.id_,
"session_id": self.session_id,
"error": self.error.as_dict() if self.error else None,
}

def update_upload(self):
"""
Updates this result's `Upload` record with information from
this result.
"""
db_session = self.upload.get_db_session()

if self.error:
self.upload.state = "error"
self.upload.state_id = UploadState.ERROR.db_id

upload_error = UploadError(
upload_id=self.upload.id_,
error_code=self.error.code,
error_params=self.error.params,
)
db_session.add(upload_error)
else:
assert self.bundle_report is not None
self.upload.state = "processed"
self.upload.state_id = UploadState.PROCESSED.db_id
self.upload.order_number = self.session_id

db_session.flush()


class BundleAnalysisReportService(BaseReportService):
async def initialize_and_save_report(
self, commit: Commit, report_code: str = None
) -> CommitReport:
db_session = commit.get_db_session()

commit_report = (
db_session.query(CommitReport)
.filter_by(
commit_id=commit.id_,
code=report_code,
report_type=ReportType.BUNDLE_ANALYSIS.value,
)
.first()
)
if not commit_report:
commit_report = CommitReport(
commit_id=commit.id_,
code=report_code,
report_type=ReportType.BUNDLE_ANALYSIS.value,
)
db_session.add(commit_report)
db_session.flush()
return commit_report

@sentry_sdk.trace
def process_upload(self, upload: Upload) -> ProcessingResult:
"""
Download and parse the data associated with the given upload and
merge the results into a bundle report.
"""
commit_report: CommitReport = upload.report
repo_hash = ArchiveService.get_archive_hash(commit_report.commit.repository)
storage_service = get_storage_client()
bundle_loader = BundleAnalysisReportLoader(storage_service, repo_hash)

# fetch existing bundle report from storage
bundle_report = bundle_loader.load(commit_report.external_id)

if bundle_report is None:
bundle_report = BundleAnalysisReport()

# download raw upload data to local tempfile
_, local_path = tempfile.mkstemp()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Who's responsible for cleaning up this local_path?

I see that the BuindleAnalysisReport does clean up its own report file (the sqlite one). But I don't think that's the same as local_path...
In any case it was not clear, so maybe add a comment saying who does the cleanup?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh good point. I can delete it in this function right after calling ingest. I'll do it in a finally block so we cleanup even on an unexpected exception. And come to think of it - I'm not actually calling the bundle_report.cleanup() method either. I'll do that right before the task ends.

I think these will get cleaned up by the OS (since they're in /tmp) but probably a good practice to do it ourselves as well.

try:
with open(local_path, "wb") as f:
storage_service.read_file(BUCKET_NAME, upload.storage_path, file_obj=f)

# load the downloaded data into the bundle report
session_id = bundle_report.ingest(local_path)

# save the bundle report back to storage
bundle_loader.save(bundle_report, commit_report.external_id)
except FileNotInStorageError:
return ProcessingResult(
upload=upload,
error=ProcessingError(
code="file_not_in_storage",
params={"location": upload.storage_path},
is_retryable=True,
),
)
finally:
os.remove(local_path)

return ProcessingResult(
upload=upload,
bundle_report=bundle_report,
session_id=session_id,
)
100 changes: 100 additions & 0 deletions services/lock_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import logging
import random
from contextlib import contextmanager
from enum import Enum
from typing import Optional

from redis import Redis
from redis.exceptions import LockError

from database.enums import ReportType
from services.redis import get_redis_connection

log = logging.getLogger(__name__)


class LockType(Enum):
BUNDLE_ANALYSIS_PROCESSING = "bundle_analysis_processing"
# TODO: port existing task locking to use `LockManager`


class LockRetry(Exception):
def __init__(self, countdown: int):
self.countdown = countdown


class LockManager:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn that's cool... nice

def __init__(
self,
repoid: int,
commitid: str,
report_type=ReportType.COVERAGE,
lock_timeout=300, # 5 min
redis_connection: Optional[Redis] = None,
):
self.repoid = repoid
self.commitid = commitid
self.report_type = report_type
self.lock_timeout = lock_timeout
self.redis_connection = redis_connection or get_redis_connection()

def lock_name(self, lock_type: LockType):
if self.report_type == ReportType.COVERAGE:
# for backward compat this does not include the report type
return f"{lock_type.value}_lock_{self.repoid}_{self.commitid}"

Check warning on line 44 in services/lock_manager.py

View check run for this annotation

Codecov - Staging / codecov/patch

services/lock_manager.py#L44

Added line #L44 was not covered by tests

Check warning on line 44 in services/lock_manager.py

View check run for this annotation

Codecov - QA / codecov/patch

services/lock_manager.py#L44

Added line #L44 was not covered by tests

Check warning on line 44 in services/lock_manager.py

View check run for this annotation

Codecov Public QA / codecov/patch

services/lock_manager.py#L44

Added line #L44 was not covered by tests
else:
return f"{lock_type.value}_lock_{self.repoid}_{self.commitid}_{self.report_type.value}"

def is_locked(self, lock_type: LockType) -> bool:
lock_name = self.lock_name(lock_type)
if self.redis_connection.get(lock_name):
return True
return False

Check warning on line 52 in services/lock_manager.py

View check run for this annotation

Codecov - Staging / codecov/patch

services/lock_manager.py#L49-L52

Added lines #L49 - L52 were not covered by tests

Check warning on line 52 in services/lock_manager.py

View check run for this annotation

Codecov - QA / codecov/patch

services/lock_manager.py#L49-L52

Added lines #L49 - L52 were not covered by tests

Check warning on line 52 in services/lock_manager.py

View check run for this annotation

Codecov Public QA / codecov/patch

services/lock_manager.py#L49-L52

Added lines #L49 - L52 were not covered by tests

@contextmanager
def locked(self, lock_type: LockType, retry_num=0):
lock_name = self.lock_name(lock_type)
try:
log.info(
"Acquiring lock",
extra=dict(
repoid=self.repoid,
commitid=self.commitid,
lock_name=lock_name,
),
)
with self.redis_connection.lock(
lock_name, timeout=self.lock_timeout, blocking_timeout=5
):
log.info(
"Acquired lock",
extra=dict(
repoid=self.repoid,
commitid=self.commitid,
lock_name=lock_name,
),
)
yield
log.info(
"Releasing lock",
extra=dict(
repoid=self.repoid,
commitid=self.commitid,
lock_name=lock_name,
),
)
except LockError:
max_retry = 200 * 3**retry_num
countdown = min(random.randint(max_retry / 2, max_retry), 60 * 60 * 5)

log.warning(
"Unable to acquire lock",
extra=dict(
repoid=self.repoid,
commitid=self.commitid,
lock_name=lock_name,
countdown=countdown,
retry_num=retry_num,
),
)
raise LockRetry(countdown)
Loading
Loading