Skip to content

Commit 66fdc8c

Browse files
authored
Merge pull request #392 from HSF/dev
Dev
2 parents 1d84ade + 2916b53 commit 66fdc8c

File tree

2 files changed

+13
-2
lines changed

2 files changed

+13
-2
lines changed

doma/lib/idds/doma/workflowv2/domapandawork.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,8 @@ def depend_on(self, work):
383383

384384
for input_d in inputs_dependency:
385385
task_name = input_d['task']
386-
dependency_tasks.add(task_name)
386+
if task_name not in dependency_tasks:
387+
dependency_tasks.add(task_name)
387388
self.dependency_tasks = list(dependency_tasks)
388389

389390
if work.task_name in self.dependency_tasks:
@@ -881,7 +882,7 @@ def submit_panda_task(self, processing):
881882
task_param['processingType'] = p_type
882883
self.logger.warn(msg)
883884
else:
884-
if task_param['site']:
885+
if 'site' in task_param and task_param['site']:
885886
for q in queue_processing_type:
886887
if task_param['site'] in q or q in task_param['site']:
887888
p_type = queue_processing_type[q]

main/lib/idds/agents/common/eventbus/msgeventbusbackend.py

+10
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,9 @@ def send(self, event):
423423
if self.debug:
424424
self.logger.debug("MsgEventBusBackend send event: %s" % req)
425425

426+
if not self.manager_socket or self.manager_socket.closed:
427+
self.init_msg_channel()
428+
426429
self.manager_socket.send_string(req)
427430
if self.manager_socket.poll(self.socket_timeout * 1000):
428431
reply = self.manager_socket.recv_string()
@@ -465,6 +468,9 @@ def send_bulk(self, events):
465468
if self.debug:
466469
self.logger.debug("MsgEventBusBackend send bulk event: %s" % req)
467470

471+
if not self.manager_socket or self.manager_socket.closed:
472+
self.init_msg_channel()
473+
468474
self.manager_socket.send_string(req)
469475
if self.manager_socket.poll(self.socket_timeout * 1000):
470476
reply = self.manager_socket.recv_string()
@@ -510,6 +516,10 @@ def get(self, event_type, num_events=1, wait=0):
510516

511517
if self.debug:
512518
self.logger.debug("MsgEventBusBackend get event: %s" % req)
519+
520+
if not self.manager_socket or self.manager_socket.closed:
521+
self.init_msg_channel()
522+
513523
self.manager_socket.send_string(req)
514524

515525
if self.manager_socket.poll(10 * 1000):

0 commit comments

Comments
 (0)