Skip to content

Commit

Permalink
Merge pull request #5395 from chaen/v7r3_FEAT_pluginRequestTask
Browse files Browse the repository at this point in the history
[V7r3] Refactorise TaskManager
  • Loading branch information
Andrei Tsaregorodtsev authored Sep 19, 2021
2 parents 6d04d6a + 8432ddc commit 19b7af6
Show file tree
Hide file tree
Showing 9 changed files with 1,045 additions and 1,021 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ The complete list can be found in the `DIRAC project GitHub repository <https://

* **Clients**

* TaskManager: it contains WorkflowTasks and RequestTasks modules, for managing jobs and requests tasks, i.e. it contains classes wrapping the logic of how to 'transform' a Task in a job/request. WorkflowTaskAgent uses WorkflowTasks, RequestTaskAgent uses RequestTasks.
* TaskManager: it contains TaskBase, which is inherited by WorkflowTasks and RequestTasks modules, for managing jobs and requests tasks, i.e. it contains classes wrapping the logic of how to 'transform' a Task in a job/request. WorkflowTaskAgent uses WorkflowTasks, RequestTaskAgent uses RequestTasks.

* TransformationClient: class that contains client access to the transformation DB handler (main client to the service/DB). It exposes the functionalities available in the DIRAC/TransformationHandler. This inherits the DIRAC base Client for direct execution of server functionality

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ The complete list can be found in the `DIRAC project GitHub repository <https://

* **Clients**

* TaskManager: it contains WorkflowsTasks and RequestTasks modules, for managing jobs and requests tasks, i.e. it contains classes wrapping the logic of how to 'transform' a Task in a job/request. WorkflowTaskAgent uses WorkflowTasks, RequestTaskAgent uses RequestTasks.
* TaskManager: it contains TaskBase, inherited by WorkflowsTasks and RequestTasks modules, for managing jobs and requests tasks, i.e. it contains classes wrapping the logic of how to 'transform' a Task in a job/request. WorkflowTaskAgent uses WorkflowTasks, RequestTaskAgent uses RequestTasks.

* TransformationClient: class that contains client access to the transformation DB handler (main client to the service/DB). It exposes the functionalities available in the DIRAC/TransformationHandler. This inherits the DIRAC base Client for direct execution of server functionality

Expand Down
15 changes: 12 additions & 3 deletions src/DIRAC/TransformationSystem/Agent/RequestTaskAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from DIRAC import S_OK

from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.TransformationSystem.Agent.TaskManagerAgentBase import TaskManagerAgentBase
from DIRAC.TransformationSystem.Client.TaskManager import RequestTasks

__RCSID__ = "$Id$"

Expand All @@ -55,8 +55,17 @@ def initialize(self):
if not res['OK']:
return res

objLoader = ObjectLoader()
_class = objLoader.loadObject(
'TransformationSystem.Client.RequestTasks', 'RequestTasks')

if not _class['OK']:
raise Exception(_class['Message'])

self.requestTasksCls = _class['Value']

# clients
self.taskManager = RequestTasks(transClient=self.transClient)
self.taskManager = self.requestTasksCls(transClient=self.transClient)

agentTSTypes = self.am_getOption('TransType', [])
if agentTSTypes:
Expand All @@ -74,6 +83,6 @@ def _getClients(self, ownerDN=None, ownerGroup=None):
See :func:`DIRAC.TransformationSystem.TaskManagerAgentBase._getClients`.
"""
res = super(RequestTaskAgent, self)._getClients(ownerDN=ownerDN, ownerGroup=ownerGroup)
threadTaskManager = RequestTasks(ownerDN=ownerDN, ownerGroup=ownerGroup)
threadTaskManager = self.requestTasksCls(ownerDN=ownerDN, ownerGroup=ownerGroup)
res.update({'TaskManager': threadTaskManager})
return res
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getUsernameForDN
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
from DIRAC.TransformationSystem.Client.FileReport import FileReport
from DIRAC.TransformationSystem.Client.TaskManager import WorkflowTasks
from DIRAC.TransformationSystem.Client.WorkflowTasks import WorkflowTasks
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
from DIRAC.TransformationSystem.Agent.TransformationAgentsUtilities import TransformationAgentsUtilities
from DIRAC.WorkloadManagementSystem.Client import JobStatus
Expand Down
330 changes: 330 additions & 0 deletions src/DIRAC/TransformationSystem/Client/RequestTasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
import six
import time
import json

from DIRAC import S_OK, S_ERROR, gLogger

from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.Core.Security.ProxyInfo import getProxyInfo

from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
from DIRAC.TransformationSystem.Client import TransformationFilesStatus
from DIRAC.TransformationSystem.Client.TaskManager import TaskBase


class RequestTasks(TaskBase):
"""
Class for handling tasks for the RMS
"""

def __init__(self, transClient=None, logger=None, requestClient=None,
requestClass=None, requestValidator=None,
ownerDN=None, ownerGroup=None):
""" c'tor
the requestClass is by default Request.
If extensions want to use an extended type, they can pass it as a parameter.
This is the same behavior as WorfkloTasks and jobClass
"""

if not logger:
logger = gLogger.getSubLogger('RequestTasks')

super(RequestTasks, self).__init__(transClient, logger)
useCertificates = True if (bool(ownerDN) and bool(ownerGroup)) else False

if not requestClient:
self.requestClient = ReqClient(useCertificates=useCertificates,
delegatedDN=ownerDN,
delegatedGroup=ownerGroup)
else:
self.requestClient = requestClient

if not requestClass:
self.requestClass = Request
else:
self.requestClass = requestClass

if not requestValidator:
self.requestValidator = RequestValidator()
else:
self.requestValidator = requestValidator

def prepareTransformationTasks(self, transBody, taskDict, owner='', ownerGroup='', ownerDN='',
bulkSubmissionFlag=False):
""" Prepare tasks, given a taskDict, that is created (with some manipulation) by the DB
"""
if not taskDict:
return S_OK({})

if (not owner) or (not ownerGroup):
res = getProxyInfo(False, False)
if not res['OK']:
return res
proxyInfo = res['Value']
owner = proxyInfo['username']
ownerGroup = proxyInfo['group']

if not ownerDN:
res = getDNForUsername(owner)
if not res['OK']:
return res
ownerDN = res['Value'][0]

try:
transJson = json.loads(transBody)
self._multiOperationsBody(transJson, taskDict, ownerDN, ownerGroup)
except ValueError: # #json couldn't load
self._singleOperationsBody(transBody, taskDict, ownerDN, ownerGroup)

return S_OK(taskDict)

def _multiOperationsBody(self, transJson, taskDict, ownerDN, ownerGroup):
""" deal with a Request that has multiple operations
:param transJson: list of lists of string and dictionaries, e.g.:
.. code :: python
body = [ ( "ReplicateAndRegister", { "SourceSE":"FOO-SRM", "TargetSE":"BAR-SRM" }),
( "RemoveReplica", { "TargetSE":"FOO-SRM" } ),
]
:param dict taskDict: dictionary of tasks, modified in this function
:param str ownerDN: certificate DN used for the requests
:param str onwerGroup: dirac group used for the requests
:returns: None
"""
failedTasks = []
for taskID, task in list(taskDict.items()):
transID = task['TransformationID']
if not task.get('InputData'):
self._logError("Error creating request for task", "%s, No input data" % taskID, transID=transID)
taskDict.pop(taskID)
continue
files = []

oRequest = Request()
if isinstance(task['InputData'], list):
files = task['InputData']
elif isinstance(task['InputData'], six.string_types):
files = task['InputData'].split(';')

# create the operations from the json structure
for operationTuple in transJson:
op = Operation()
op.Type = operationTuple[0]
for parameter, value in operationTuple[1].items():
setattr(op, parameter, value)

for lfn in files:
opFile = File()
opFile.LFN = lfn
op.addFile(opFile)

oRequest.addOperation(op)

result = self._assignRequestToTask(oRequest, taskDict, transID, taskID, ownerDN, ownerGroup)
if not result['OK']:
failedTasks.append(taskID)
# Remove failed tasks
for taskID in failedTasks:
taskDict.pop(taskID)

def _singleOperationsBody(self, transBody, taskDict, ownerDN, ownerGroup):
""" deal with a Request that has just one operation, as it was sofar
:param transBody: string, can be an empty string
:param dict taskDict: dictionary of tasks, modified in this function
:param str ownerDN: certificate DN used for the requests
:param str onwerGroup: dirac group used for the requests
:returns: None
"""

requestOperation = 'ReplicateAndRegister'
if transBody:
try:
_requestType, requestOperation = transBody.split(';')
except AttributeError:
pass
failedTasks = []
# Do not remove sorted, we might pop elements in the loop
for taskID, task in taskDict.items():

transID = task['TransformationID']

oRequest = Request()
transfer = Operation()
transfer.Type = requestOperation
transfer.TargetSE = task['TargetSE']

# If there are input files
if task.get('InputData'):
if isinstance(task['InputData'], list):
files = task['InputData']
elif isinstance(task['InputData'], six.string_types):
files = task['InputData'].split(';')
for lfn in files:
trFile = File()
trFile.LFN = lfn

transfer.addFile(trFile)

oRequest.addOperation(transfer)
result = self._assignRequestToTask(oRequest, taskDict, transID, taskID, ownerDN, ownerGroup)
if not result['OK']:
failedTasks.append(taskID)
# Remove failed tasks
for taskID in failedTasks:
taskDict.pop(taskID)

def _assignRequestToTask(self, oRequest, taskDict, transID, taskID, ownerDN, ownerGroup):
"""set ownerDN and group to request, and add the request to taskDict if it is
valid, otherwise remove the task from the taskDict
:param oRequest: Request
:param dict taskDict: dictionary of tasks, modified in this function
:param int transID: Transformation ID
:param int taskID: Task ID
:param str ownerDN: certificate DN used for the requests
:param str onwerGroup: dirac group used for the requests
:returns: None
"""

oRequest.RequestName = self._transTaskName(transID, taskID)
oRequest.OwnerDN = ownerDN
oRequest.OwnerGroup = ownerGroup

isValid = self.requestValidator.validate(oRequest)
if not isValid['OK']:
self._logError("Error creating request for task", "%s %s" % (taskID, isValid),
transID=transID)
return S_ERROR('Error creating request')
taskDict[taskID]['TaskObject'] = oRequest
return S_OK()

def submitTransformationTasks(self, taskDict):
""" Submit requests one by one
"""
submitted = 0
failed = 0
startTime = time.time()
method = 'submitTransformationTasks'
for task in taskDict.values():
# transID is the same for all tasks, so pick it up every time here
transID = task['TransformationID']
if not task['TaskObject']:
task['Success'] = False
failed += 1
continue
res = self.submitTaskToExternal(task['TaskObject'])
if res['OK']:
task['ExternalID'] = res['Value']
task['Success'] = True
submitted += 1
else:
self._logError("Failed to submit task to RMS", res['Message'], transID=transID)
task['Success'] = False
failed += 1
if submitted:
self._logInfo('Submitted %d tasks to RMS in %.1f seconds' % (submitted, time.time() - startTime),
transID=transID, method=method)
if failed:
self._logWarn('Failed to submit %d tasks to RMS.' % (failed),
transID=transID, method=method)
return S_OK(taskDict)

def submitTaskToExternal(self, oRequest):
"""
Submits a request to RMS
"""
if isinstance(oRequest, self.requestClass):
return self.requestClient.putRequest(oRequest, useFailoverProxy=False, retryMainService=2)
return S_ERROR("Request should be a Request object")

def updateTransformationReservedTasks(self, taskDicts):
requestNameIDs = {}
noTasks = []
for taskDict in taskDicts:
requestName = self._transTaskName(taskDict['TransformationID'], taskDict['TaskID'])
reqID = taskDict['ExternalID']
if reqID and int(reqID):
requestNameIDs[requestName] = reqID
else:
noTasks.append(requestName)
return S_OK({'NoTasks': noTasks, 'TaskNameIDs': requestNameIDs})

def getSubmittedTaskStatus(self, taskDicts):
"""
Check if tasks changed status, and return a list of tasks per new status
"""
updateDict = {}
badRequestID = 0
for taskDict in taskDicts:
oldStatus = taskDict['ExternalStatus']
# ExternalID is normally a string
if taskDict['ExternalID'] and int(taskDict['ExternalID']):
newStatus = self.requestClient.getRequestStatus(taskDict['ExternalID'])
if not newStatus['OK']:
log = self._logVerbose if 'not exist' in newStatus['Message'] else self._logWarn
log("getSubmittedTaskStatus: Failed to get requestID for request", newStatus['Message'],
transID=taskDict['TransformationID'])
else:
newStatus = newStatus['Value']
# We don't care updating the tasks to Assigned while the request is being processed
if newStatus != oldStatus and newStatus != 'Assigned':
updateDict.setdefault(newStatus, []).append(taskDict['TaskID'])
else:
badRequestID += 1
if badRequestID:
self._logWarn("%d requests have identifier 0" % badRequestID)
return S_OK(updateDict)

def getSubmittedFileStatus(self, fileDicts):
"""
Check if transformation files changed status, and return a list of taskIDs per new status
"""
# Don't try and get status of not submitted tasks!
transID = None
taskFiles = {}
for fileDict in fileDicts:
# There is only one transformation involved, get however the transID in the loop
transID = fileDict['TransformationID']
taskID = int(fileDict['TaskID'])
taskFiles.setdefault(taskID, []).append(fileDict['LFN'])
# Should not happen, but just in case there are no files, return
if transID is None:
return S_OK({})

res = self.transClient.getTransformationTasks({'TransformationID': transID, 'TaskID': list(taskFiles)})
if not res['OK']:
return res
requestFiles = {}
for taskDict in res['Value']:
taskID = taskDict['TaskID']
externalID = taskDict['ExternalID']
# Only consider tasks that are submitted, ExternalID is a string
if taskDict['ExternalStatus'] != 'Created' and externalID and int(externalID):
requestFiles[externalID] = taskFiles[taskID]

updateDict = {}
for requestID, lfnList in requestFiles.items():
statusDict = self.requestClient.getRequestFileStatus(requestID, lfnList)
if not statusDict['OK']:
log = self._logVerbose if 'not exist' in statusDict['Message'] else self._logWarn
log("Failed to get files status for request", statusDict['Message'],
transID=transID, method='getSubmittedFileStatus')
else:
for lfn, newStatus in statusDict['Value'].items():
if newStatus == 'Done':
updateDict[lfn] = TransformationFilesStatus.PROCESSED
elif newStatus == 'Failed':
updateDict[lfn] = TransformationFilesStatus.PROBLEMATIC
return S_OK(updateDict)
Loading

0 comments on commit 19b7af6

Please sign in to comment.