Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V7r3] Refactorise TaskManager #5395

Merged
merged 2 commits into from
Sep 19, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ The complete list can be found in the `DIRAC project GitHub repository <https://

* **Clients**

* TaskManager: it contains WorkflowTasks and RequestTasks modules, for managing jobs and requests tasks, i.e. it contains classes wrapping the logic of how to 'transform' a Task in a job/request. WorkflowTaskAgent uses WorkflowTasks, RequestTaskAgent uses RequestTasks.
* TaskManager: it contains TaskBase, which is inherited by WorkflowTasks and RequestTasks modules, for managing jobs and requests tasks, i.e. it contains classes wrapping the logic of how to 'transform' a Task in a job/request. WorkflowTaskAgent uses WorkflowTasks, RequestTaskAgent uses RequestTasks.

* TransformationClient: class that contains client access to the transformation DB handler (main client to the service/DB). It exposes the functionalities available in the DIRAC/TransformationHandler. This inherits the DIRAC base Client for direct execution of server functionality

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ The complete list can be found in the `DIRAC project GitHub repository <https://

* **Clients**

* TaskManager: it contains WorkflowsTasks and RequestTasks modules, for managing jobs and requests tasks, i.e. it contains classes wrapping the logic of how to 'transform' a Task in a job/request. WorkflowTaskAgent uses WorkflowTasks, RequestTaskAgent uses RequestTasks.
* TaskManager: it contains TaskBase, inherited by WorkflowsTasks and RequestTasks modules, for managing jobs and requests tasks, i.e. it contains classes wrapping the logic of how to 'transform' a Task in a job/request. WorkflowTaskAgent uses WorkflowTasks, RequestTaskAgent uses RequestTasks.

* TransformationClient: class that contains client access to the transformation DB handler (main client to the service/DB). It exposes the functionalities available in the DIRAC/TransformationHandler. This inherits the DIRAC base Client for direct execution of server functionality

Expand Down
15 changes: 12 additions & 3 deletions src/DIRAC/TransformationSystem/Agent/RequestTaskAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
from DIRAC import S_OK

from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.TransformationSystem.Agent.TaskManagerAgentBase import TaskManagerAgentBase
from DIRAC.TransformationSystem.Client.TaskManager import RequestTasks

__RCSID__ = "$Id$"

Expand All @@ -55,8 +55,17 @@ def initialize(self):
if not res['OK']:
return res

objLoader = ObjectLoader()
_class = objLoader.loadObject(
'TransformationSystem.Client.RequestTasks', 'RequestTasks')

if not _class['OK']:
raise Exception(_class['Message'])

self.requestTasksCls = _class['Value']

# clients
self.taskManager = RequestTasks(transClient=self.transClient)
self.taskManager = self.requestTasksCls(transClient=self.transClient)

agentTSTypes = self.am_getOption('TransType', [])
if agentTSTypes:
Expand All @@ -74,6 +83,6 @@ def _getClients(self, ownerDN=None, ownerGroup=None):
See :func:`DIRAC.TransformationSystem.TaskManagerAgentBase._getClients`.
"""
res = super(RequestTaskAgent, self)._getClients(ownerDN=ownerDN, ownerGroup=ownerGroup)
threadTaskManager = RequestTasks(ownerDN=ownerDN, ownerGroup=ownerGroup)
threadTaskManager = self.requestTasksCls(ownerDN=ownerDN, ownerGroup=ownerGroup)
res.update({'TaskManager': threadTaskManager})
return res
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername, getUsernameForDN
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
from DIRAC.TransformationSystem.Client.FileReport import FileReport
from DIRAC.TransformationSystem.Client.TaskManager import WorkflowTasks
from DIRAC.TransformationSystem.Client.WorkflowTasks import WorkflowTasks
from DIRAC.TransformationSystem.Client.TransformationClient import TransformationClient
from DIRAC.TransformationSystem.Agent.TransformationAgentsUtilities import TransformationAgentsUtilities
from DIRAC.WorkloadManagementSystem.Client import JobStatus
Expand Down
330 changes: 330 additions & 0 deletions src/DIRAC/TransformationSystem/Client/RequestTasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,330 @@
import six
import time
import json

from DIRAC import S_OK, S_ERROR, gLogger

from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getDNForUsername
from DIRAC.Core.Security.ProxyInfo import getProxyInfo

from DIRAC.RequestManagementSystem.Client.ReqClient import ReqClient
from DIRAC.RequestManagementSystem.Client.Request import Request
from DIRAC.RequestManagementSystem.Client.Operation import Operation
from DIRAC.RequestManagementSystem.Client.File import File
from DIRAC.RequestManagementSystem.private.RequestValidator import RequestValidator
from DIRAC.TransformationSystem.Client import TransformationFilesStatus
from DIRAC.TransformationSystem.Client.TaskManager import TaskBase


class RequestTasks(TaskBase):
"""
Class for handling tasks for the RMS
"""

def __init__(self, transClient=None, logger=None, requestClient=None,
requestClass=None, requestValidator=None,
ownerDN=None, ownerGroup=None):
""" c'tor

the requestClass is by default Request.
If extensions want to use an extended type, they can pass it as a parameter.
This is the same behavior as WorfkloTasks and jobClass
"""

if not logger:
logger = gLogger.getSubLogger('RequestTasks')

super(RequestTasks, self).__init__(transClient, logger)
useCertificates = True if (bool(ownerDN) and bool(ownerGroup)) else False

if not requestClient:
self.requestClient = ReqClient(useCertificates=useCertificates,
delegatedDN=ownerDN,
delegatedGroup=ownerGroup)
else:
self.requestClient = requestClient

if not requestClass:
self.requestClass = Request
else:
self.requestClass = requestClass

if not requestValidator:
self.requestValidator = RequestValidator()
else:
self.requestValidator = requestValidator

def prepareTransformationTasks(self, transBody, taskDict, owner='', ownerGroup='', ownerDN='',
bulkSubmissionFlag=False):
""" Prepare tasks, given a taskDict, that is created (with some manipulation) by the DB
"""
if not taskDict:
return S_OK({})

if (not owner) or (not ownerGroup):
res = getProxyInfo(False, False)
if not res['OK']:
return res
proxyInfo = res['Value']
owner = proxyInfo['username']
ownerGroup = proxyInfo['group']

if not ownerDN:
res = getDNForUsername(owner)
if not res['OK']:
return res
ownerDN = res['Value'][0]

try:
transJson = json.loads(transBody)
self._multiOperationsBody(transJson, taskDict, ownerDN, ownerGroup)
except ValueError: # #json couldn't load
self._singleOperationsBody(transBody, taskDict, ownerDN, ownerGroup)

return S_OK(taskDict)

def _multiOperationsBody(self, transJson, taskDict, ownerDN, ownerGroup):
""" deal with a Request that has multiple operations

:param transJson: list of lists of string and dictionaries, e.g.:

.. code :: python

body = [ ( "ReplicateAndRegister", { "SourceSE":"FOO-SRM", "TargetSE":"BAR-SRM" }),
( "RemoveReplica", { "TargetSE":"FOO-SRM" } ),
]

:param dict taskDict: dictionary of tasks, modified in this function
:param str ownerDN: certificate DN used for the requests
:param str onwerGroup: dirac group used for the requests

:returns: None
"""
failedTasks = []
for taskID, task in list(taskDict.items()):
transID = task['TransformationID']
if not task.get('InputData'):
self._logError("Error creating request for task", "%s, No input data" % taskID, transID=transID)
taskDict.pop(taskID)
continue
files = []

oRequest = Request()
if isinstance(task['InputData'], list):
files = task['InputData']
elif isinstance(task['InputData'], six.string_types):
files = task['InputData'].split(';')

# create the operations from the json structure
for operationTuple in transJson:
op = Operation()
op.Type = operationTuple[0]
for parameter, value in operationTuple[1].items():
setattr(op, parameter, value)

for lfn in files:
opFile = File()
opFile.LFN = lfn
op.addFile(opFile)

oRequest.addOperation(op)

result = self._assignRequestToTask(oRequest, taskDict, transID, taskID, ownerDN, ownerGroup)
if not result['OK']:
failedTasks.append(taskID)
# Remove failed tasks
for taskID in failedTasks:
taskDict.pop(taskID)

def _singleOperationsBody(self, transBody, taskDict, ownerDN, ownerGroup):
""" deal with a Request that has just one operation, as it was sofar

:param transBody: string, can be an empty string
:param dict taskDict: dictionary of tasks, modified in this function
:param str ownerDN: certificate DN used for the requests
:param str onwerGroup: dirac group used for the requests

:returns: None
"""

requestOperation = 'ReplicateAndRegister'
if transBody:
try:
_requestType, requestOperation = transBody.split(';')
except AttributeError:
pass
failedTasks = []
# Do not remove sorted, we might pop elements in the loop
for taskID, task in taskDict.items():

transID = task['TransformationID']

oRequest = Request()
transfer = Operation()
transfer.Type = requestOperation
transfer.TargetSE = task['TargetSE']

# If there are input files
if task.get('InputData'):
if isinstance(task['InputData'], list):
files = task['InputData']
elif isinstance(task['InputData'], six.string_types):
files = task['InputData'].split(';')
for lfn in files:
trFile = File()
trFile.LFN = lfn

transfer.addFile(trFile)

oRequest.addOperation(transfer)
result = self._assignRequestToTask(oRequest, taskDict, transID, taskID, ownerDN, ownerGroup)
if not result['OK']:
failedTasks.append(taskID)
# Remove failed tasks
for taskID in failedTasks:
taskDict.pop(taskID)

def _assignRequestToTask(self, oRequest, taskDict, transID, taskID, ownerDN, ownerGroup):
"""set ownerDN and group to request, and add the request to taskDict if it is
valid, otherwise remove the task from the taskDict

:param oRequest: Request
:param dict taskDict: dictionary of tasks, modified in this function
:param int transID: Transformation ID
:param int taskID: Task ID
:param str ownerDN: certificate DN used for the requests
:param str onwerGroup: dirac group used for the requests

:returns: None
"""

oRequest.RequestName = self._transTaskName(transID, taskID)
oRequest.OwnerDN = ownerDN
oRequest.OwnerGroup = ownerGroup

isValid = self.requestValidator.validate(oRequest)
if not isValid['OK']:
self._logError("Error creating request for task", "%s %s" % (taskID, isValid),
transID=transID)
return S_ERROR('Error creating request')
taskDict[taskID]['TaskObject'] = oRequest
return S_OK()

def submitTransformationTasks(self, taskDict):
""" Submit requests one by one
"""
submitted = 0
failed = 0
startTime = time.time()
method = 'submitTransformationTasks'
for task in taskDict.values():
# transID is the same for all tasks, so pick it up every time here
transID = task['TransformationID']
if not task['TaskObject']:
task['Success'] = False
failed += 1
continue
res = self.submitTaskToExternal(task['TaskObject'])
if res['OK']:
task['ExternalID'] = res['Value']
task['Success'] = True
submitted += 1
else:
self._logError("Failed to submit task to RMS", res['Message'], transID=transID)
task['Success'] = False
failed += 1
if submitted:
self._logInfo('Submitted %d tasks to RMS in %.1f seconds' % (submitted, time.time() - startTime),
transID=transID, method=method)
if failed:
self._logWarn('Failed to submit %d tasks to RMS.' % (failed),
transID=transID, method=method)
return S_OK(taskDict)

def submitTaskToExternal(self, oRequest):
"""
Submits a request to RMS
"""
if isinstance(oRequest, self.requestClass):
return self.requestClient.putRequest(oRequest, useFailoverProxy=False, retryMainService=2)
return S_ERROR("Request should be a Request object")

def updateTransformationReservedTasks(self, taskDicts):
requestNameIDs = {}
noTasks = []
for taskDict in taskDicts:
requestName = self._transTaskName(taskDict['TransformationID'], taskDict['TaskID'])
reqID = taskDict['ExternalID']
if reqID and int(reqID):
requestNameIDs[requestName] = reqID
else:
noTasks.append(requestName)
return S_OK({'NoTasks': noTasks, 'TaskNameIDs': requestNameIDs})

def getSubmittedTaskStatus(self, taskDicts):
"""
Check if tasks changed status, and return a list of tasks per new status
"""
updateDict = {}
badRequestID = 0
for taskDict in taskDicts:
oldStatus = taskDict['ExternalStatus']
# ExternalID is normally a string
if taskDict['ExternalID'] and int(taskDict['ExternalID']):
newStatus = self.requestClient.getRequestStatus(taskDict['ExternalID'])
if not newStatus['OK']:
log = self._logVerbose if 'not exist' in newStatus['Message'] else self._logWarn
log("getSubmittedTaskStatus: Failed to get requestID for request", newStatus['Message'],
transID=taskDict['TransformationID'])
else:
newStatus = newStatus['Value']
# We don't care updating the tasks to Assigned while the request is being processed
if newStatus != oldStatus and newStatus != 'Assigned':
updateDict.setdefault(newStatus, []).append(taskDict['TaskID'])
else:
badRequestID += 1
if badRequestID:
self._logWarn("%d requests have identifier 0" % badRequestID)
return S_OK(updateDict)

def getSubmittedFileStatus(self, fileDicts):
"""
Check if transformation files changed status, and return a list of taskIDs per new status
"""
# Don't try and get status of not submitted tasks!
transID = None
taskFiles = {}
for fileDict in fileDicts:
# There is only one transformation involved, get however the transID in the loop
transID = fileDict['TransformationID']
taskID = int(fileDict['TaskID'])
taskFiles.setdefault(taskID, []).append(fileDict['LFN'])
# Should not happen, but just in case there are no files, return
if transID is None:
return S_OK({})

res = self.transClient.getTransformationTasks({'TransformationID': transID, 'TaskID': list(taskFiles)})
if not res['OK']:
return res
requestFiles = {}
for taskDict in res['Value']:
taskID = taskDict['TaskID']
externalID = taskDict['ExternalID']
# Only consider tasks that are submitted, ExternalID is a string
if taskDict['ExternalStatus'] != 'Created' and externalID and int(externalID):
requestFiles[externalID] = taskFiles[taskID]

updateDict = {}
for requestID, lfnList in requestFiles.items():
statusDict = self.requestClient.getRequestFileStatus(requestID, lfnList)
if not statusDict['OK']:
log = self._logVerbose if 'not exist' in statusDict['Message'] else self._logWarn
log("Failed to get files status for request", statusDict['Message'],
transID=transID, method='getSubmittedFileStatus')
else:
for lfn, newStatus in statusDict['Value'].items():
if newStatus == 'Done':
updateDict[lfn] = TransformationFilesStatus.PROCESSED
elif newStatus == 'Failed':
updateDict[lfn] = TransformationFilesStatus.PROBLEMATIC
return S_OK(updateDict)
Loading