Skip to content

Commit

Permalink
move sleep to cron job call stack only
Browse files Browse the repository at this point in the history
rebase on master
  • Loading branch information
dbanda committed Nov 3, 2022
1 parent d8375ce commit 1ac6a7d
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 39 deletions.
50 changes: 18 additions & 32 deletions snuba/clickhouse/optimize/optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from snuba.datasets.storage import ReadableTableStorage
from snuba.settings import (
OPTIMIZE_BASE_SLEEP_TIME,
OPTIMIZE_MERGE_MAX_LONG_CONCURRENT_JOBS,
OPTIMIZE_MERGE_MIN_ELAPSED_CUTTOFF_TIME,
OPTIMIZE_MERGE_SIZE_CUTOFF,
)
Expand Down Expand Up @@ -90,6 +89,11 @@ def run_optimize_cron_job(
database = storage.get_cluster().get_database()
optimize_scheduler = OptimizeScheduler(parallel=parallel)

# if theres a merge in progress wait for it to finish
while is_busy_merging(clickhouse, database, table):
logger.info(f"busy merging, sleeping for {OPTIMIZE_BASE_SLEEP_TIME}s")
time.sleep(OPTIMIZE_BASE_SLEEP_TIME)

try:
partitions_to_optimize = tracker.get_partitions_to_optimize()
except NoOptimizedStateException:
Expand Down Expand Up @@ -262,13 +266,6 @@ def optimize_partition_runner(
raise an exception which would be propagated to the caller.
"""
remaining_partitions = partitions
# if theres a merge in progress wait for it to finish
while True:
busy_merging, _ = is_busy_merging(clickhouse, database, table)
if not busy_merging:
break
else:
time.sleep(OPTIMIZE_BASE_SLEEP_TIME)
while remaining_partitions:
schedule = scheduler.get_next_schedule(remaining_partitions)
num_threads = len(schedule.partitions)
Expand Down Expand Up @@ -367,32 +364,21 @@ def optimize_partitions(
)


def is_busy_merging(
clickhouse: ClickhousePool, database: str, table: str
) -> Tuple[bool, float]:
def is_busy_merging(clickhouse: ClickhousePool, database: str, table: str) -> bool:
"""
Returns true and the estimated sleep time if clickhouse is busy with merges
in progress for the table. Clickhouse is considered busy if
1. there are more than OPTIMIZE_MERGE_MAX_LONG_CONCURRENT_JOBS merges in progress
with an elapsed time greater than OPTIMIZE_MERGE_MIN_ELAPSED_CUTTOFF_TIME
2. or there is any merge of size greater than OPTIMIZE_MERGE_SIZE_CUTOFF
Returns true if clickhouse is busy with merges in progress
for the table. Clickhouse is considered busy if there is any
merge of size greater than OPTIMIZE_MERGE_SIZE_CUTOFF
"""
merge_info = get_current_large_merges(clickhouse, database, table)

if len(merge_info) > OPTIMIZE_MERGE_MAX_LONG_CONCURRENT_JOBS:
estimated_sleep_time = max(
merge_info, key=lambda x: x.estimated_time
).estimated_time
logger.info(
f"too many concurrent long merges {len(merge_info)}, sleeping for {estimated_sleep_time}s"
)
return True, estimated_sleep_time

if any(merge.size > OPTIMIZE_MERGE_SIZE_CUTOFF for merge in merge_info):
estimated_sleep_time = max(
merge_info, key=lambda x: x.estimated_time
).estimated_time
logger.info(f"large ongoing merge, sleeping for {estimated_sleep_time}s")
return True, estimated_sleep_time
for merge in merge_info:
if merge.size > OPTIMIZE_MERGE_SIZE_CUTOFF:
logger.info(
"large ongoing merge detected "
f"result part: {merge.result_part_name}, size: {merge.size}"
f"progress: {merge.progress}, elapsed: {merge.elapsed}"
)
return True

return False, 0
return False
7 changes: 6 additions & 1 deletion snuba/settings/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,12 @@ class RedisClusters(TypedDict):
# avoid spilling over to the next day.
OPTIMIZE_JOB_CUTOFF_TIME = timedelta(hours=23)
OPTIMIZE_QUERY_TIMEOUT = 4 * 60 * 60 # 4 hours

# sleep time to wait for a merge to complete
OPTIMIZE_BASE_SLEEP_TIME = 300 # 5 mins
# merges longer than this will be considered long running
OPTIMIZE_MERGE_MIN_ELAPSED_CUTTOFF_TIME = 10 * 60 # 10 mins
# merges larger than this will be considered large and will be waited on
OPTIMIZE_MERGE_SIZE_CUTOFF = 50_000_000_000 # 50GB
# Maximum jitter to add to the scheduling of threads of an optimize job
OPTIMIZE_PARALLEL_MAX_JITTER_MINUTES = 30

Expand Down
23 changes: 17 additions & 6 deletions tests/clickhouse/optimize/test_optimize_tracker.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import time
from unittest.mock import patch
import uuid
from datetime import datetime, timedelta
from unittest.mock import call, patch

import pytest

Expand Down Expand Up @@ -219,11 +219,19 @@ def write_error_message(writable_storage: WritableTableStorage, time: int) -> No
60_000_000_000,
),
]
small_merges = [
MergeInfo(
"90-20220613_0_1216096_1417",
10,
0.5,
60_000,
),
]
mock_merge_ids.side_effect = [
current_merges,
current_merges,
[],
] # first & second call returns ongoing merges, rest return no ongoing merges
small_merges,
] # first & second call returns large ongoing merges, third call returns small ongoing merges

with patch.object(time, "sleep") as sleep_mock:
num_optimized = run_optimize_cron_job(
Expand All @@ -238,7 +246,11 @@ def write_error_message(writable_storage: WritableTableStorage, time: int) -> No
assert mock_merge_ids.call_count == 3

sleep_mock.assert_called_with(settings.OPTIMIZE_BASE_SLEEP_TIME)
sleep_mock.call_count = 4 # twice for first and second patitition
assert sleep_mock.call_count == 2 # twice for first and second
assert sleep_mock.call_args_list == [
call(settings.OPTIMIZE_BASE_SLEEP_TIME),
call(settings.OPTIMIZE_BASE_SLEEP_TIME),
]


def test_merge_info() -> None:
Expand Down Expand Up @@ -281,12 +293,11 @@ def test_merge_info() -> None:
assert merge_info[0].estimated_time == 8020.61436897 / (
0.9895385071013121 + 0.0001
)
busy, sleep_time = optimize.is_busy_merging(
busy = optimize.is_busy_merging(
clickhouse=ClickhousePool(
"localhost", 9000, "user", "password", "database"
),
database="default",
table="errors_local",
)
assert busy
assert sleep_time == merge_info[0].estimated_time

0 comments on commit 1ac6a7d

Please sign in to comment.