Skip to content

Commit b1ac52a

Browse files
authored
Merge pull request #394 from HSF/dev
fix core to processing type map
2 parents 66fdc8c + beecada commit b1ac52a

File tree

3 files changed

+21
-7
lines changed

3 files changed

+21
-7
lines changed

doma/lib/idds/doma/workflowv2/domapandawork.py

+11-4
Original file line numberDiff line numberDiff line change
@@ -845,7 +845,7 @@ def submit_panda_task(self, processing):
845845
if self.additional_task_parameters_per_site:
846846
try:
847847
for site in self.additional_task_parameters_per_site:
848-
if (task_param['PandaSite'] and site in task_param['PandaSite']) or (task_param['site'] and site in task_param['site']):
848+
if ('PandaSite' in task_param and task_param['PandaSite'] and site in task_param['PandaSite']) or ('site' in task_param and task_param['site'] and site in task_param['site']):
849849
for key, value in self.additional_task_parameters_per_site[site].items():
850850
if key not in task_param:
851851
task_param[key] = value
@@ -857,7 +857,7 @@ def submit_panda_task(self, processing):
857857
# core_to_queues = {"1": {"queues": ["Rubin", "Rubin_Extra_Himem"], "processing_type": ""},
858858
# "Rubin_Multi": {"queues": ["Rubin_Multi"], "processing_type": "Rubin_Multi"},
859859
# "Rubin_Merge": {"queues": ["Rubin_Merge"], "processing_type": "Rubin_Merge"},
860-
# "any": "Rubin_Multi"}
860+
# "any": {"queues": ["Rubin_Multi"], "processing_type": "Rubin_Multi"}}
861861

862862
if task_param['processingType']:
863863
msg = f"processingType {task_param['processingType']} is already set, do nothing"
@@ -867,8 +867,7 @@ def submit_panda_task(self, processing):
867867
queue_processing_type = {}
868868
for k in self.core_to_queues:
869869
key = str(k)
870-
if not key.isdigit():
871-
num_cores.append(key)
870+
num_cores.append(key)
872871
if key not in ['any']:
873872
queues = self.core_to_queues[k].get('queues', [])
874873
processing_type = self.core_to_queues[k].get('processing_type', '')
@@ -881,6 +880,14 @@ def submit_panda_task(self, processing):
881880
msg = f"processingType is not defined, set it to {p_type} based on coreCount {task_param['coreCount']}"
882881
task_param['processingType'] = p_type
883882
self.logger.warn(msg)
883+
if 'site' in task_param and task_param['site']:
884+
for q in queue_processing_type:
885+
if task_param['site'] in q or q in task_param['site']:
886+
p_type = queue_processing_type[q]
887+
if p_type:
888+
msg = f"processingType is not defined, set it to {p_type} based on site {task_param['site']}"
889+
task_param['processingType'] = p_type
890+
self.logger.debug(msg)
884891
else:
885892
if 'site' in task_param and task_param['site']:
886893
for q in queue_processing_type:

main/lib/idds/agents/transformer/transformer.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -1175,7 +1175,11 @@ def process_update_transform(self, event):
11751175
ret, is_terminated, ret_processing_id = self.handle_update_transform(tf, event)
11761176
new_pr_ids, update_pr_ids = self.update_transform(ret)
11771177

1178-
if is_terminated or (event._content and 'event' in event._content and event._content['event'] == 'submitted'):
1178+
has_update_workload_id = False
1179+
new_workload_id = ret.get('transform_parameters', {}).get('workload_id', None)
1180+
if new_workload_id and tf['workload_id'] != new_workload_id:
1181+
has_update_workload_id = True
1182+
if has_update_workload_id or is_terminated or (event._content and 'event' in event._content and event._content['event'] == 'submitted'):
11791183
self.logger.info(log_pre + "UpdateRequestEvent(request_id: %s)" % tf['request_id'])
11801184
event = UpdateRequestEvent(publisher_id=self.id, request_id=tf['request_id'], content=event._content)
11811185
self.event_bus.send(event)

main/lib/idds/tests/panda_test.py

+5-2
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
# os.environ['PANDA_URL_SSL'] = 'https://panda-doma-k8s-panda.cern.ch/server/panda'
1515
# os.environ['PANDA_URL'] = 'http://panda-doma-k8s-panda.cern.ch:25080/server/panda'
1616

17-
os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'
18-
os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'
17+
# os.environ['PANDA_URL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'
18+
# os.environ['PANDA_URL_SSL'] = 'https://usdf-panda-server.slac.stanford.edu:8443/server/panda'
1919

2020
# os.environ['PANDA_URL_SSL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'
2121
# os.environ['PANDA_URL'] = 'https://pandaserver01.sdcc.bnl.gov:25443/server/panda'
@@ -87,6 +87,9 @@
8787
task_ids = [124, 619]
8888
task_ids = [22707, 22708, 22709, 22710, 23211, 23212, 22155, 22158]
8989
task_ids = [24483, 24484, 25895, 26126, 26450, 26451, 26452, 26454, 26994, 27025, 27029]
90+
task_ids = [161489, 161496, 161502, 161508, 161514, 161520, 161526, 161532, 161538, 161544, 161550, 161556, 161562]
91+
task_ids = [3174, 3198, 3209, 3230, 3252, 3266, 3284, 3292, 3300, 3312, 3350, 3379, 3387]
92+
task_ids = [1548, 1555]
9093
for task_id in task_ids:
9194
print("Killing %s" % task_id)
9295
ret = Client.killTask(task_id, verbose=True)

0 commit comments

Comments
 (0)