diff --git a/services/redis.py b/services/redis.py index f26b6385a..97f09dec1 100644 --- a/services/redis.py +++ b/services/redis.py @@ -7,23 +7,6 @@ log = logging.getLogger(__name__) -# The purpose of this key is to synchronize allocation of session IDs -# when processing reports in parallel. If multiple uploads are processed -# at a time, they all need to know what session id to use when generating -# their chunks.txt. This is because in UploadFinisher, they will all be -# combined together and need to all have unique session IDs. - -# This key is a counter that will hold an integer, representing the smallest -# session ID that is available to be claimed. It can be claimed by incrementing -# the key. -PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_REDIS_KEY = ( - "parallel_session_counter:{repoid}:{commitid}" -) - -PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL = ( - 10800 # 3 hours, chosen somewhat arbitrarily -) - def get_redis_url() -> str: url = get_config("services", "redis_url") @@ -53,9 +36,3 @@ def download_archive_from_redis( if raw_uploaded_report is not None: return raw_uploaded_report.decode() return None - - -def get_parallel_upload_processing_session_counter_redis_key(repoid, commitid): - return PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_REDIS_KEY.format( - repoid=repoid, commitid=commitid - ) diff --git a/services/report/__init__.py b/services/report/__init__.py index 732de9906..951d4eeb3 100644 --- a/services/report/__init__.py +++ b/services/report/__init__.py @@ -50,11 +50,6 @@ PARALLEL_UPLOAD_PROCESSING_BY_REPO, ) from services.archive import ArchiveService -from services.redis import ( - PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, - get_parallel_upload_processing_session_counter_redis_key, - get_redis_connection, -) from services.report.parser import get_proper_parser from services.report.parser.types import ParsedRawReport from services.report.parser.version_one import VersionOneReportParser @@ -288,27 +283,11 @@ def initialize_and_save_report( self.save_full_report(commit, report, report_code) # Behind parallel processing flag, save the CFF report to GCS so the parallel variant of - # finisher can build off of it later. Makes the assumption that the CFFs occupy the first - # j to i session ids where i is the max id of the CFFs and j is some integer less than i. + # finisher can build off of it later. if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value( identifier=commit.repository.repoid ): self.save_parallel_report_to_archive(commit, report, report_code) - highest_session_id = max( - report.sessions.keys() - ) # the largest id among the CFFs - get_redis_connection().incrby( - name=get_parallel_upload_processing_session_counter_redis_key( - commit.repository.repoid, commit.commitid - ), - amount=highest_session_id + 1, - ) - get_redis_connection().expire( - name=get_parallel_upload_processing_session_counter_redis_key( - commit.repository.repoid, commit.commitid - ), - time=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, - ) return current_report_row diff --git a/tasks/flush_repo.py b/tasks/flush_repo.py index 1198f09cd..7f901ee25 100644 --- a/tasks/flush_repo.py +++ b/tasks/flush_repo.py @@ -30,11 +30,6 @@ uploadflagmembership, ) from services.archive import ArchiveService -from services.redis import ( - PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, - get_parallel_upload_processing_session_counter_redis_key, - get_redis_connection, -) from tasks.base import BaseCodecovTask log = logging.getLogger(__name__) @@ -156,25 +151,9 @@ def _delete_label_analysis(self, db_session: Session, commit_ids, repoid: int): @sentry_sdk.trace def _delete_commits(self, db_session: Session, repoid: int) -> int: - commits_to_delete = ( - db_session.query(Commit.commitid).filter_by(repoid=repoid).all() - ) - commit_ids_to_delete = [commit.commitid for commit in commits_to_delete] - - pipeline = get_redis_connection().pipeline() - for id in commit_ids_to_delete: - pipeline.set( - get_parallel_upload_processing_session_counter_redis_key( - repoid=repoid, commitid=id - ), - 0, - ex=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, - ) - pipeline.execute() - delete_count = ( db_session.query(Commit) - .filter(Commit.commitid.in_(commit_ids_to_delete)) + .filter_by(repoid=repoid) .delete(synchronize_session=False) ) db_session.commit() diff --git a/tasks/upload.py b/tasks/upload.py index 48d20822a..3b7c99392 100644 --- a/tasks/upload.py +++ b/tasks/upload.py @@ -37,11 +37,7 @@ from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO from services.archive import ArchiveService from services.bundle_analysis.report import BundleAnalysisReportService -from services.redis import ( - download_archive_from_redis, - get_parallel_upload_processing_session_counter_redis_key, - get_redis_connection, -) +from services.redis import download_archive_from_redis, get_redis_connection from services.report import NotReadyToBuildReportYetError, ReportService from services.repository import ( create_webhook_on_provider, @@ -636,35 +632,6 @@ def _schedule_coverage_processing_task( report_service = ReportService(commit_yaml) sessions = report_service.build_sessions(commit=commit) - # if session count expired due to TTL (which is unlikely for most cases), recalculate the - # session ids used and set it in redis. - redis_key = get_parallel_upload_processing_session_counter_redis_key( - repoid=commit.repository.repoid, commitid=commit.commitid - ) - if not upload_context.redis_connection.exists(redis_key): - upload_context.redis_connection.set( - redis_key, - max(sessions.keys()) + 1 if sessions.keys() else 0, - ) - - # https://github.com/codecov/worker/commit/7d9c1984b8bc075c9fa002ee15cab3419684f2d6 - # try to scrap the redis counter idea to fully mimic how session ids are allocated in the - # serial flow. This change is technically less performant, and would not allow for concurrent - # chords to be running at the same time. For now this is just a temporary change, just for - # verifying correctness. - # - # # increment redis to claim session ids - # parallel_session_id = ( - # upload_context.redis_connection.incrby( - # name=redis_key, - # amount=num_sessions, - # ) - # - num_sessions - # ) - # upload_context.redis_connection.expire( - # name=redis_key, - # time=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL, - # ) original_session_ids = list(sessions.keys()) parallel_session_ids = get_parallel_session_ids( sessions,