Skip to content

Commit

Permalink
Remove parallel session_id allocations in Redis (#711)
Browse files Browse the repository at this point in the history
  • Loading branch information
Swatinem committed Sep 17, 2024
1 parent 4ec7500 commit e3cb698
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 101 deletions.
23 changes: 0 additions & 23 deletions services/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,6 @@

log = logging.getLogger(__name__)

# The purpose of this key is to synchronize allocation of session IDs
# when processing reports in parallel. If multiple uploads are processed
# at a time, they all need to know what session id to use when generating
# their chunks.txt. This is because in UploadFinisher, they will all be
# combined together and need to all have unique session IDs.

# This key is a counter that will hold an integer, representing the smallest
# session ID that is available to be claimed. It can be claimed by incrementing
# the key.
PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_REDIS_KEY = (
"parallel_session_counter:{repoid}:{commitid}"
)

PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL = (
10800 # 3 hours, chosen somewhat arbitrarily
)


def get_redis_url() -> str:
url = get_config("services", "redis_url")
Expand Down Expand Up @@ -53,9 +36,3 @@ def download_archive_from_redis(
if raw_uploaded_report is not None:
return raw_uploaded_report.decode()
return None


def get_parallel_upload_processing_session_counter_redis_key(repoid, commitid):
return PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_REDIS_KEY.format(
repoid=repoid, commitid=commitid
)
23 changes: 1 addition & 22 deletions services/report/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@
PARALLEL_UPLOAD_PROCESSING_BY_REPO,
)
from services.archive import ArchiveService
from services.redis import (
PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL,
get_parallel_upload_processing_session_counter_redis_key,
get_redis_connection,
)
from services.report.parser import get_proper_parser
from services.report.parser.types import ParsedRawReport
from services.report.parser.version_one import VersionOneReportParser
Expand Down Expand Up @@ -288,27 +283,11 @@ def initialize_and_save_report(
self.save_full_report(commit, report, report_code)

# Behind parallel processing flag, save the CFF report to GCS so the parallel variant of
# finisher can build off of it later. Makes the assumption that the CFFs occupy the first
# j to i session ids where i is the max id of the CFFs and j is some integer less than i.
# finisher can build off of it later.
if PARALLEL_UPLOAD_PROCESSING_BY_REPO.check_value(
identifier=commit.repository.repoid
):
self.save_parallel_report_to_archive(commit, report, report_code)
highest_session_id = max(
report.sessions.keys()
) # the largest id among the CFFs
get_redis_connection().incrby(
name=get_parallel_upload_processing_session_counter_redis_key(
commit.repository.repoid, commit.commitid
),
amount=highest_session_id + 1,
)
get_redis_connection().expire(
name=get_parallel_upload_processing_session_counter_redis_key(
commit.repository.repoid, commit.commitid
),
time=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL,
)

return current_report_row

Expand Down
23 changes: 1 addition & 22 deletions tasks/flush_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@
uploadflagmembership,
)
from services.archive import ArchiveService
from services.redis import (
PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL,
get_parallel_upload_processing_session_counter_redis_key,
get_redis_connection,
)
from tasks.base import BaseCodecovTask

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -156,25 +151,9 @@ def _delete_label_analysis(self, db_session: Session, commit_ids, repoid: int):

@sentry_sdk.trace
def _delete_commits(self, db_session: Session, repoid: int) -> int:
commits_to_delete = (
db_session.query(Commit.commitid).filter_by(repoid=repoid).all()
)
commit_ids_to_delete = [commit.commitid for commit in commits_to_delete]

pipeline = get_redis_connection().pipeline()
for id in commit_ids_to_delete:
pipeline.set(
get_parallel_upload_processing_session_counter_redis_key(
repoid=repoid, commitid=id
),
0,
ex=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL,
)
pipeline.execute()

delete_count = (
db_session.query(Commit)
.filter(Commit.commitid.in_(commit_ids_to_delete))
.filter_by(repoid=repoid)
.delete(synchronize_session=False)
)
db_session.commit()
Expand Down
35 changes: 1 addition & 34 deletions tasks/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,7 @@
from rollouts import PARALLEL_UPLOAD_PROCESSING_BY_REPO
from services.archive import ArchiveService
from services.bundle_analysis.report import BundleAnalysisReportService
from services.redis import (
download_archive_from_redis,
get_parallel_upload_processing_session_counter_redis_key,
get_redis_connection,
)
from services.redis import download_archive_from_redis, get_redis_connection
from services.report import NotReadyToBuildReportYetError, ReportService
from services.repository import (
create_webhook_on_provider,
Expand Down Expand Up @@ -636,35 +632,6 @@ def _schedule_coverage_processing_task(
report_service = ReportService(commit_yaml)
sessions = report_service.build_sessions(commit=commit)

# if session count expired due to TTL (which is unlikely for most cases), recalculate the
# session ids used and set it in redis.
redis_key = get_parallel_upload_processing_session_counter_redis_key(
repoid=commit.repository.repoid, commitid=commit.commitid
)
if not upload_context.redis_connection.exists(redis_key):
upload_context.redis_connection.set(
redis_key,
max(sessions.keys()) + 1 if sessions.keys() else 0,
)

# https://github.com/codecov/worker/commit/7d9c1984b8bc075c9fa002ee15cab3419684f2d6
# try to scrap the redis counter idea to fully mimic how session ids are allocated in the
# serial flow. This change is technically less performant, and would not allow for concurrent
# chords to be running at the same time. For now this is just a temporary change, just for
# verifying correctness.
#
# # increment redis to claim session ids
# parallel_session_id = (
# upload_context.redis_connection.incrby(
# name=redis_key,
# amount=num_sessions,
# )
# - num_sessions
# )
# upload_context.redis_connection.expire(
# name=redis_key,
# time=PARALLEL_UPLOAD_PROCESSING_SESSION_COUNTER_TTL,
# )
original_session_ids = list(sessions.keys())
parallel_session_ids = get_parallel_session_ids(
sessions,
Expand Down

0 comments on commit e3cb698

Please sign in to comment.