Skip to content

Commit 3a04d73

Browse files
authored
Merge pull request #410 from HSF/dev
Dev
2 parents 5fc8a80 + 04fa1a6 commit 3a04d73

File tree

18 files changed

+635
-70
lines changed

18 files changed

+635
-70
lines changed

doma/lib/idds/doma/workflowv2/domapandawork.py

+54-3
Original file line numberDiff line numberDiff line change
@@ -165,16 +165,58 @@ def get_site(self):
165165
return self.queue
166166
return self.task_cloud
167167

168+
@property
169+
def num_inputs(self):
170+
num = self.get_metadata_item('num_inputs', None)
171+
return num
172+
173+
@num_inputs.setter
174+
def num_inputs(self, value):
175+
self.add_metadata_item('num_inputs', value)
176+
177+
@property
178+
def num_dependencies(self):
179+
num = self.get_metadata_item('num_dependencies', None)
180+
return num
181+
182+
@num_dependencies.setter
183+
def num_dependencies(self, value):
184+
self.add_metadata_item('num_dependencies', value)
185+
186+
def count_dependencies(self, data):
187+
if self.num_dependencies is not None and self.num_inputs is not None:
188+
return self.num_inputs, self.num_dependencies
189+
190+
num_inputs = 0
191+
num_dependencies = 0
192+
try:
193+
for item in data:
194+
# item_name = item['name']
195+
inputs_dependency = item["dependencies"]
196+
num_dependencies += len(inputs_dependency)
197+
num_inputs += 1
198+
except Exception as ex:
199+
self.logger.warn(f"Failed to count dependencies: {ex}")
200+
return num_inputs, num_dependencies
201+
168202
@property
169203
def dependency_map(self):
170204
if self.should_unzip('_dependency_map'):
171205
data = self.unzip_data(self._dependency_map)
206+
num_inputs, num_dependencies = self.count_dependencies(data)
207+
self.num_inputs = num_inputs
208+
self.num_dependencies = num_dependencies
172209
return data
173210

211+
num_inputs, num_dependencies = self.count_dependencies(self._dependency_map)
212+
self.num_inputs = num_inputs
213+
self.num_dependencies = num_dependencies
174214
return self._dependency_map
175215

176216
@dependency_map.setter
177217
def dependency_map(self, value):
218+
num_dependencies = 0
219+
num_inputs = 0
178220
if value:
179221
if type(value) not in [list, tuple]:
180222
raise exceptions.IDDSException("dependency_map should be a list or tuple")
@@ -184,6 +226,8 @@ def dependency_map(self, value):
184226
if len(item_name) > self.max_name_length:
185227
raise exceptions.IDDSException("The file name is long (%s), which is bigger than the maximum name length (%s)" % (len(item_name), self.max_name_length))
186228
inputs_dependency = item["dependencies"]
229+
num_inputs += 1
230+
num_dependencies += len(inputs_dependency)
187231
if item_name not in item_names:
188232
item_names[item_name] = item
189233
else:
@@ -201,6 +245,9 @@ def dependency_map(self, value):
201245

202246
self._dependency_map = value
203247

248+
self.num_inputs = num_inputs
249+
self.num_dependencies = num_dependencies
250+
204251
if self.es:
205252
self.construct_es_files()
206253

@@ -1458,7 +1505,7 @@ def get_event_job(self, sub_map_id, panda_jobs, job_set_events):
14581505
ret_job = item.get('job', None)
14591506
return ret_event, ret_job
14601507

1461-
def get_update_contents(self, unterminated_jobs_status, input_output_maps, contents_ext, job_info_maps, abort=False, log_prefix=''):
1508+
def get_update_contents(self, unterminated_jobs_status, input_output_maps, contents_ext, job_info_maps, abort=False, terminated_status=False, log_prefix=''):
14621509
inputname_to_map_id_outputs = {}
14631510
for map_id in input_output_maps:
14641511
inputs = input_output_maps[map_id]['inputs']
@@ -1781,7 +1828,7 @@ def get_update_contents(self, unterminated_jobs_status, input_output_maps, conte
17811828
update_content_ext['exe_exit_diag'] = event_error_diag
17821829
update_contents_ext.append(update_content_ext)
17831830

1784-
if abort:
1831+
if abort or terminated_status:
17851832
for map_id in input_output_maps:
17861833
outputs = input_output_maps[map_id]['outputs']
17871834
for content in outputs:
@@ -1847,7 +1894,11 @@ def poll_panda_task(self, processing=None, input_output_maps=None, contents_ext=
18471894
abort_status = False
18481895
if processing_status in [ProcessingStatus.Cancelled]:
18491896
abort_status = True
1850-
ret_contents = self.get_update_contents(unterminated_jobs_status, input_output_maps, contents_ext, job_info_maps, abort=abort_status, log_prefix=log_prefix)
1897+
terminated_status = False
1898+
if processing_status in [ProcessingStatus.Cancelled, ProcessingStatus.Failed, ProcessingStatus.Broken]:
1899+
terminated_status = True
1900+
ret_contents = self.get_update_contents(unterminated_jobs_status, input_output_maps, contents_ext, job_info_maps,
1901+
abort=abort_status, terminated_status=terminated_status, log_prefix=log_prefix)
18511902
updated_contents, update_contents_full, new_contents_ext, update_contents_ext = ret_contents
18521903

18531904
return processing_status, updated_contents, update_contents_full, new_contents_ext, update_contents_ext

main/lib/idds/agents/carrier/poller.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -603,7 +603,9 @@ def process_update_processing(self, event):
603603

604604
def clean_locks(self):
605605
self.logger.info("clean locking")
606-
core_processings.clean_locking(time_period=self.locking_period)
606+
health_items = self.get_health_items()
607+
min_request_id = BaseAgent.min_request_id
608+
core_processings.clean_locking(health_items=health_items, min_request_id=min_request_id, time_period=None)
607609

608610
def init_event_function_map(self):
609611
self.event_func_map = {

main/lib/idds/agents/carrier/utils.py

+61-14
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ def get_collection_ids(collections):
184184
return coll_ids
185185

186186

187-
def get_input_output_maps(transform_id, work):
187+
def get_input_output_maps(transform_id, work, with_deps=True):
188188
# link collections
189189
input_collections = work.get_input_collections()
190190
output_collections = work.get_output_collections()
@@ -202,7 +202,8 @@ def get_input_output_maps(transform_id, work):
202202
input_coll_ids=input_coll_ids,
203203
output_coll_ids=output_coll_ids,
204204
log_coll_ids=log_coll_ids,
205-
with_sub_map_id=work.with_sub_map_id())
205+
with_sub_map_id=work.with_sub_map_id(),
206+
with_deps=with_deps)
206207

207208
# work_name_to_coll_map = core_transforms.get_work_name_to_coll_map(request_id=transform['request_id'])
208209
# work.set_work_name_to_coll_map(work_name_to_coll_map)
@@ -603,7 +604,7 @@ def handle_new_processing(processing, agent_attributes, func_site_to_cloud=None,
603604
update_collections.append(u_coll)
604605

605606
if proc.submitted_at:
606-
input_output_maps = get_input_output_maps(transform_id, work)
607+
input_output_maps = get_input_output_maps(transform_id, work, with_deps=False)
607608
new_input_output_maps = work.get_new_input_output_maps(input_output_maps)
608609
request_id = processing['request_id']
609610
transform_id = processing['transform_id']
@@ -700,7 +701,7 @@ def get_input_output_sub_maps(inputs, outputs, inputs_dependency, logs=[]):
700701
return input_output_sub_maps
701702

702703

703-
def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated=False, max_updates_per_round=2000, logger=None, log_prefix=''):
704+
def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated=False, max_updates_per_round=2000, with_deps=False, logger=None, log_prefix=''):
704705
updated_contents, updated_contents_full_input, updated_contents_full_output = [], [], []
705706
updated_contents_full_input_deps = []
706707
new_update_contents = []
@@ -762,10 +763,12 @@ def get_updated_contents_by_input_output_maps(input_output_maps=None, terminated
762763
inputs_dependency_sub = input_output_sub_maps[sub_map_id]['inputs_dependency']
763764

764765
input_content_update_status = None
765-
if is_all_contents_available(inputs_dependency_sub):
766-
input_content_update_status = ContentStatus.Available
767-
elif is_all_contents_terminated(inputs_dependency_sub, terminated):
768-
input_content_update_status = ContentStatus.Missing
766+
if with_deps:
767+
# if deps are not loaded. This part should not be executed. Otherwise it will release all jobs
768+
if is_all_contents_available(inputs_dependency_sub):
769+
input_content_update_status = ContentStatus.Available
770+
elif is_all_contents_terminated(inputs_dependency_sub, terminated):
771+
input_content_update_status = ContentStatus.Missing
769772
if input_content_update_status:
770773
for content in inputs_sub:
771774
if content['substatus'] != input_content_update_status:
@@ -1187,7 +1190,7 @@ def handle_update_processing(processing, agent_attributes, max_updates_per_round
11871190
work = proc.work
11881191
work.set_agent_attributes(agent_attributes, processing)
11891192

1190-
input_output_maps = get_input_output_maps(transform_id, work)
1193+
input_output_maps = get_input_output_maps(transform_id, work, with_deps=False)
11911194
logger.debug(log_prefix + "get_input_output_maps: len: %s" % len(input_output_maps))
11921195
logger.debug(log_prefix + "get_input_output_maps.keys[:3]: %s" % str(list(input_output_maps.keys())[:3]))
11931196

@@ -1414,11 +1417,35 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=
14141417
request_id = processing['request_id']
14151418
transform_id = processing['transform_id']
14161419
workload_id = processing['workload_id']
1420+
processing_id = processing['processing_id']
14171421

14181422
proc = processing['processing_metadata']['processing']
14191423
work = proc.work
14201424
work.set_agent_attributes(agent_attributes, processing)
14211425

1426+
num_dependencies = None
1427+
num_inputs = None
1428+
default_input_dep_page_size = 500
1429+
min_input_dep_page_size = 100
1430+
max_dependencies = 5000
1431+
try:
1432+
num_inputs = work.num_inputs
1433+
num_dependencies = work.num_dependencies
1434+
if num_inputs is not None and num_dependencies is not None and num_dependencies > 0:
1435+
input_dep_page_size = int(max_dependencies * num_inputs / num_dependencies)
1436+
if input_dep_page_size < default_input_dep_page_size:
1437+
default_input_dep_page_size = input_dep_page_size
1438+
log_info = f"input_dep_page_size ({input_dep_page_size}) is smaller than default_input_dep_page_size ({default_input_dep_page_size}),"
1439+
log_info = "update default_input_dep_page_size from input_dep_page_size"
1440+
logger.info(log_info)
1441+
if default_input_dep_page_size < min_input_dep_page_size:
1442+
log_info = f"default_input_dep_page_size ({default_input_dep_page_size}) is smaller than min_input_dep_page_size ({min_input_dep_page_size}),"
1443+
log_info = "update default_input_dep_page_size from min_input_dep_page_size"
1444+
logger.info(log_info)
1445+
default_input_dep_page_size = min_input_dep_page_size
1446+
except Exception as ex:
1447+
logger.warn(f"request_id ({request_id}) transform_id ({transform_id}) processing_id ({processing_id}) fails to get num_dependencies: {ex}")
1448+
14221449
if (not work.use_dependency_to_release_jobs()) or workload_id is None:
14231450
return processing['substatus'], [], [], {}, {}, {}, [], [], has_updates
14241451
else:
@@ -1467,6 +1494,7 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=
14671494
core_catalog.delete_contents_update(request_id=request_id, transform_id=transform_id, fetch=True)
14681495
logger.debug(log_prefix + "sync contents_update to contents done")
14691496

1497+
"""
14701498
logger.debug(log_prefix + "update_contents_from_others_by_dep_id")
14711499
# core_catalog.update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id)
14721500
to_triggered_contents = core_catalog.get_update_contents_from_others_by_dep_id(request_id=request_id, transform_id=transform_id)
@@ -1490,19 +1518,38 @@ def handle_trigger_processing(processing, agent_attributes, trigger_new_updates=
14901518
wait_futures_finish(ret_futures, "update_contents_from_others_by_dep_id", logger, log_prefix)
14911519
14921520
logger.debug(log_prefix + "update_contents_from_others_by_dep_id done")
1521+
"""
14931522

1494-
input_output_maps = get_input_output_maps(transform_id, work)
1495-
logger.debug(log_prefix + "input_output_maps.keys[:2]: %s" % str(list(input_output_maps.keys())[:2]))
1523+
logger.debug(log_prefix + "update_contents_from_others_by_dep_id_pages")
1524+
status_not_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable,
1525+
ContentStatus.FinalFailed, ContentStatus.Missing]
1526+
core_catalog.update_contents_from_others_by_dep_id_pages(request_id=request_id, transform_id=transform_id,
1527+
page_size=1000, status_not_to_check=status_not_to_check)
1528+
logger.debug(log_prefix + "update_contents_from_others_by_dep_id_pages done")
14961529

14971530
terminated_processing = False
14981531
terminated_status = [ProcessingStatus.Finished, ProcessingStatus.Failed, ProcessingStatus.SubFinished,
14991532
ProcessingStatus.Terminating, ProcessingStatus.Cancelled]
15001533
if processing['status'] in terminated_status or processing['substatus'] in terminated_status:
15011534
terminated_processing = True
15021535

1536+
logger.debug(log_prefix + "update_input_contents_by_dependency_pages")
1537+
status_not_to_check = [ContentStatus.Available, ContentStatus.FakeAvailable,
1538+
ContentStatus.FinalFailed, ContentStatus.Missing]
1539+
core_catalog.update_input_contents_by_dependency_pages(request_id=request_id, transform_id=transform_id,
1540+
page_size=default_input_dep_page_size,
1541+
terminated=terminated_processing,
1542+
batch_size=1000, status_not_to_check=status_not_to_check)
1543+
logger.debug(log_prefix + "update_input_contents_by_dependency_pages done")
1544+
1545+
with_deps = False
1546+
input_output_maps = get_input_output_maps(transform_id, work, with_deps=with_deps)
1547+
logger.debug(log_prefix + "input_output_maps.keys[:2]: %s" % str(list(input_output_maps.keys())[:2]))
1548+
15031549
updated_contents_ret_chunks = get_updated_contents_by_input_output_maps(input_output_maps=input_output_maps,
15041550
terminated=terminated_processing,
15051551
max_updates_per_round=max_updates_per_round,
1552+
with_deps=with_deps,
15061553
logger=logger,
15071554
log_prefix=log_prefix)
15081555

@@ -1913,7 +1960,7 @@ def handle_messages_processing(messages, logger=None, log_prefix='', update_proc
19131960
def sync_collection_status(request_id, transform_id, workload_id, work, input_output_maps=None,
19141961
close_collection=False, force_close_collection=False, abort=False, terminate=False):
19151962
if input_output_maps is None:
1916-
input_output_maps = get_input_output_maps(transform_id, work)
1963+
input_output_maps = get_input_output_maps(transform_id, work, with_deps=False)
19171964

19181965
all_updates_flushed = True
19191966
coll_status = {}
@@ -2116,7 +2163,7 @@ def sync_processing(processing, agent_attributes, terminate=False, abort=False,
21162163
work.set_agent_attributes(agent_attributes, processing)
21172164

21182165
messages = []
2119-
input_output_maps = get_input_output_maps(transform_id, work)
2166+
input_output_maps = get_input_output_maps(transform_id, work, with_deps=False)
21202167
if processing['substatus'] in terminated_status or processing['substatus'] in terminated_status:
21212168
terminate = True
21222169
update_collections, all_updates_flushed, msgs = sync_collection_status(request_id, transform_id, workload_id, work,
@@ -2225,7 +2272,7 @@ def handle_resume_processing(processing, agent_attributes, logger=None, log_pref
22252272
'substatus': CollectionStatus.Open}
22262273
update_collections.append(u_collection)
22272274

2228-
input_output_maps = get_input_output_maps(transform_id, work)
2275+
input_output_maps = get_input_output_maps(transform_id, work, with_deps=False)
22292276
update_contents = reactive_contents(request_id, transform_id, workload_id, work, input_output_maps)
22302277

22312278
processing['status'] = ProcessingStatus.Running

main/lib/idds/agents/clerk/clerk.py

+4-2
Original file line numberDiff line numberDiff line change
@@ -1890,7 +1890,9 @@ def process_resume_request(self, event):
18901890

18911891
def clean_locks(self):
18921892
self.logger.info("clean locking")
1893-
core_requests.clean_locking()
1893+
health_items = self.get_health_items()
1894+
min_request_id = BaseAgent.min_request_id
1895+
core_requests.clean_locking(health_items=health_items, min_request_id=min_request_id, time_period=None)
18941896

18951897
def init_event_function_map(self):
18961898
self.event_func_map = {
@@ -1942,7 +1944,7 @@ def run(self):
19421944
self.add_task(task)
19431945
task = self.create_task(task_func=self.clean_min_request_id, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=3600, priority=1)
19441946
self.add_task(task)
1945-
task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=1800, priority=1)
1947+
task = self.create_task(task_func=self.clean_locks, task_output_queue=None, task_args=tuple(), task_kwargs={}, delay_time=60, priority=1)
19461948
self.add_task(task)
19471949

19481950
self.execute()

0 commit comments

Comments
 (0)