Skip to content

Commit

Permalink
Merge pull request #5214 from fstagni/v7r3-fixes12
Browse files Browse the repository at this point in the history
[v7r3] tornado-based https JobStateUpdate handler
  • Loading branch information
fstagni authored Aug 4, 2021
2 parents c658c80 + e556c52 commit a7cd23f
Show file tree
Hide file tree
Showing 22 changed files with 259 additions and 118 deletions.
3 changes: 0 additions & 3 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@ jobs:
- TEST_NAME: "Elasticsearch 6.6.0"
ARGS: ES_VER=6.6.0
###### Thread pool
- TEST_NAME: "New thread pool"
ARGS: DIRAC_USE_NEWTHREADPOOL=No
###### Thread pool
- TEST_NAME: "Fewer cfg locks"
ARGS: DIRAC_FEWER_CFG_LOCKS=Yes
###### HTTPS tests
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ WorkloadManagement services are:

JobManager/index
JobMonitoring/index
JobStateUpdate/index
Matcher/index
SandboxStore/index
WMSAdministrator/index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Show setup command allows administrators to know which components, Services and

mardirac1.in2p3.fr >show setup
{'Agents': {'Configuration': ['CE2CSAgent'],
'Framework': ['CAUpdateAgent'],
'Framework': ['CAUpdateAgent'],
'WorkloadManagement': ['JobHistoryAgent',
'InputDataAgent',
'StalledJobAgent',
Expand All @@ -100,7 +100,7 @@ Show setup command allows administrators to know which components, Services and
'Notification',
'UserProfileManager',
'SystemAdministrator',
'ProxyManager'],
'ProxyManager'],
'RequestManagement': ['RequestManager'],
'WorkloadManagement': ['JobMonitoring',
'WMSAdministrator',
Expand Down
2 changes: 1 addition & 1 deletion docs/source/DeveloperGuide/TornadoServices/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ The interface of the DISET request handler was preserved, in particular:
How to start server
*******************

The easy way, use ``DIRAC/Core/Tornado/script/tornado-start-all.py`` it will start all services registered in configuration ! To register a service you just have to add the service in the CS and ``Protocol = https``. It may look like this::
The easy way is to use command ``tornado-start-all`` which will start all services registered in configuration. To register a service you just have to add the service in the CS and ``Protocol = https``. It may look like this::

DIRAC
{
Expand Down
1 change: 0 additions & 1 deletion integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"DIRACOS_TARBALL_PATH": None,
"TEST_HTTPS": "No",
"DIRAC_FEWER_CFG_LOCKS": None,
"DIRAC_USE_NEWTHREADPOOL": None,
"USE_PYTHON3": None,
}
DEFAULT_MODULES = {
Expand Down
27 changes: 12 additions & 15 deletions src/DIRAC/Core/DISET/private/Service.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,21 @@

import DIRAC
from DIRAC import gConfig, gLogger, S_OK, S_ERROR
from DIRAC.Core.Utilities.DErrno import ENOAUTH
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
from DIRAC.Core.Utilities import Time, MemStat, Network
from DIRAC.ConfigurationSystem.Client import PathFinder
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.DISET.private.LockManager import LockManager
from DIRAC.FrameworkSystem.Client.MonitoringClient import MonitoringClient
from DIRAC.Core.DISET.private.ServiceConfiguration import ServiceConfiguration
from DIRAC.Core.DISET.private.TransportPool import getGlobalTransportPool
from DIRAC.Core.DISET.private.MessageBroker import MessageBroker, MessageSender
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
from DIRAC.Core.Utilities.ReturnValues import isReturnStructure
from DIRAC.Core.DISET.AuthManager import AuthManager
from DIRAC.FrameworkSystem.Client.SecurityLogClient import SecurityLogClient
from DIRAC.ConfigurationSystem.Client import PathFinder
from DIRAC.ConfigurationSystem.Client.Config import gConfig
from DIRAC.Core.DISET.RequestHandler import getServiceOption
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.Core.Utilities import Time, MemStat, Network
from DIRAC.Core.Utilities.DErrno import ENOAUTH
from DIRAC.Core.Utilities.ReturnValues import isReturnStructure
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
from DIRAC.FrameworkSystem.Client.MonitoringClient import gMonitor
from DIRAC.FrameworkSystem.Client.MonitoringClient import MonitoringClient
from DIRAC.FrameworkSystem.Client.SecurityLogClient import SecurityLogClient

__RCSID__ = "$Id$"

Expand Down Expand Up @@ -114,10 +113,9 @@ def initialize(self):
}
# Initialize Monitoring
# This is a flag used to check whether "EnableActivityMonitoring" is enabled or not from the config file.
self.activityMonitoring = (
Operations().getValue("EnableActivityMonitoring", False) or
getServiceOption(self._serviceInfoDict, "EnableActivityMonitoring", False)
)
self.activityMonitoring = Operations().getValue(
"EnableActivityMonitoring", False
) or getServiceOption(self._serviceInfoDict, "EnableActivityMonitoring", False)
if self.activityMonitoring:
# The import needs to be here because of the CS must be initialized before importing
# this class (see https://github.com/DIRACGrid/DIRAC/issues/4793)
Expand Down Expand Up @@ -664,7 +662,6 @@ def __startReportToMonitoring(self):
if now - self.__monitorLastStatsUpdate < 0:
return (now, cpuTime)
# Send CPU consumption mark
wallClock = now - self.__monitorLastStatsUpdate
self.__monitorLastStatsUpdate = now
# Send Memory consumption mark
membytes = MemStat.VmB('VmRSS:')
Expand Down
12 changes: 8 additions & 4 deletions src/DIRAC/Core/Tornado/scripts/tornado_start_all.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
@Script()
def main():

if os.environ.get('DIRAC_USE_TORNADO_IOLOOP', 'false').lower() not in ('yes', 'true'):
if os.environ.get("DIRAC_USE_TORNADO_IOLOOP", "false").lower() not in ("yes", "true"):
raise RuntimeError(
"DIRAC_USE_TORNADO_IOLOOP is not defined in the environment." + "\n" +
"It is necessary to run with Tornado." + "\n" +
"https://dirac.readthedocs.io/en/latest/DeveloperGuide/TornadoServices/index.html"
"DIRAC_USE_TORNADO_IOLOOP is not defined in the environment."
+ "\n"
+ "It is necessary to run with Tornado."
+ "\n"
+ "https://dirac.readthedocs.io/en/latest/DeveloperGuide/TornadoServices/index.html"
)

from DIRAC import gConfig
Expand All @@ -34,6 +36,8 @@ def main():
from DIRAC.Core.Utilities.DErrno import includeExtensionErrors
from DIRAC.FrameworkSystem.Client.Logger import gLogger

Script.parseCommandLine()

# We check if there is no configuration server started as master
# If you want to start a master CS you should use Configuration_Server.cfg and
# use tornado-start-CS.py
Expand Down
5 changes: 4 additions & 1 deletion src/DIRAC/Core/Utilities/test/Test_ObjectLoader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Tornado.Server.TornadoService import TornadoService


def _check(result):
Expand All @@ -17,4 +18,6 @@ def test_load():
assert _check(ObjectLoader().loadObject("Core.Utilities.ObjectLoader")) is ObjectLoader
dataFilter = _check(ObjectLoader().getObjects("WorkloadManagementSystem.Service", ".*Handler"))
dataClass = _check(ObjectLoader().getObjects("WorkloadManagementSystem.Service", parentClass=RequestHandler))
assert sorted(dataFilter) == sorted(dataClass)
tornadoDataClass = _check(ObjectLoader().getObjects("WorkloadManagementSystem.Service", parentClass=TornadoService))

assert sorted(dataFilter) == sorted(set(dataClass).union(set(tornadoDataClass)))
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ def main():

system = args[0]
component = args[1]
compOrMod = module or component

result = gComponentInstaller.addDefaultOptionsToCS(gConfig, 'service', system, component,
extensionsByPriority(),
Expand Down
47 changes: 47 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/Agent/TaskQueuesAgent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
""" The TaskQueuesAgent will update TQs
.. literalinclude:: ../ConfigTemplate.cfg
:start-after: ##BEGIN TaskQueuesAgent
:end-before: ##END
:dedent: 2
:caption: TaskQueuesAgent options
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

__RCSID__ = "$Id$"

from DIRAC import S_OK
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.WorkloadManagementSystem.DB.TaskQueueDB import TaskQueueDB


class TaskQueuesAgent(AgentModule):
"""Agent for recalculating TQ shares
"""

def __init__(self, *args, **kwargs):
""" c'tor
"""
AgentModule.__init__(self, *args, **kwargs)

# clients
self.tqDB = None

def initialize(self):
""" just initialize TQDB
"""
self.tqDB = TaskQueueDB()
return S_OK()

def execute(self):
""" calls TQDB.recalculateTQSharesForAll
"""
res = self.tqDB.recalculateTQSharesForAll()
if not res['OK']:
self.log.error("Error recalculating TQ shares", res['Message'])

return S_OK()
Original file line number Diff line number Diff line change
Expand Up @@ -75,23 +75,23 @@ def test__getJDLParameters(mocker):
assert result['Value']['Tags'] == ['16Processors', 'MultiProcessor']


@pytest.mark.parametrize("mockJMInput, expected", [({'OK': True}, {'OK': True, 'Value': 'Job Rescheduled'}), ({
'OK': False, 'Message': "Test"}, {'OK': True, 'Value': 'Problem Rescheduling Job'})])
@pytest.mark.parametrize(
"mockJMInput, expected",
[
({"OK": True}, {"OK": True, "Value": "Problem Rescheduling Job"}),
(
{"OK": False, "Message": "Test"},
{"OK": True, "Value": "Problem Rescheduling Job"},
),
],
)
def test__rescheduleFailedJob(mocker, mockJMInput, expected):
""" Testing JobAgent()._rescheduleFailedJob()
"""

mockJM.return_value = mockJMInput

mocker.patch("DIRAC.WorkloadManagementSystem.Agent.JobAgent.AgentModule.__init__")
mocker.patch(
"DIRAC.WorkloadManagementSystem.Client.JobStateUpdateClient.JobStateUpdateClient.setJobStatusBulk",
side_effect=mockJM,
)
mocker.patch(
"DIRAC.WorkloadManagementSystem.Client.JobManagerClient.JobManagerClient.rescheduleJob",
side_effect=mockJM,
)

jobAgent = JobAgent('Test', 'Test1')

Expand All @@ -107,16 +107,26 @@ def test__rescheduleFailedJob(mocker, mockJMInput, expected):


@pytest.mark.parametrize(
"mockGCReplyInput, mockPMReplyInput, expected", [
(True, {
'OK': True, 'Value': 'Test'}, {
'OK': True, 'Value': 'Test'}), (True, {
'OK': False, 'Message': 'Test'}, {
'OK': False, 'Message': 'Failed to setup proxy: Error retrieving proxy'}), (False, {
'OK': True, 'Value': 'Test'}, {
'OK': False, 'Message': 'Invalid Proxy'}), (False, {
'OK': False, 'Message': 'Test'}, {
'OK': False, 'Message': 'Invalid Proxy'})])
"mockGCReplyInput, mockPMReplyInput, expected",
[
(True, {"OK": True, "Value": "Test"}, {"OK": True, "Value": "Test"}),
(
True,
{"OK": False, "Message": "Test"},
{"OK": False, "Message": "Failed to setup proxy: Error retrieving proxy"},
),
(
False,
{"OK": True, "Value": "Test"},
{"OK": False, "Message": "Invalid Proxy"},
),
(
False,
{"OK": False, "Message": "Test"},
{"OK": False, "Message": "Invalid Proxy"},
),
],
)
def test__setupProxy(mocker, mockGCReplyInput, mockPMReplyInput, expected):
""" Testing JobAgent()._setupProxy()
"""
Expand Down Expand Up @@ -166,16 +176,22 @@ def test__getCPUWorkLeft(mocker):


@pytest.mark.parametrize(
"mockGCReplyInput, mockPMReplyInput, expected", [
(True, {
'OK': True, 'Value': 'Test'}, {
'OK': True, 'Value': 'Test'}), (True, {
'OK': False, 'Message': 'Test'}, {
'OK': False, 'Message': 'Error retrieving proxy'}), (False, {
'OK': True, 'Value': 'Test'}, {
'OK': True, 'Value': 'Test'}), (False, {
'OK': False, 'Message': 'Test'}, {
'OK': False, 'Message': 'Error retrieving proxy'})])
"mockGCReplyInput, mockPMReplyInput, expected",
[
(True, {"OK": True, "Value": "Test"}, {"OK": True, "Value": "Test"}),
(
True,
{"OK": False, "Message": "Test"},
{"OK": False, "Message": "Error retrieving proxy"},
),
(False, {"OK": True, "Value": "Test"}, {"OK": True, "Value": "Test"}),
(
False,
{"OK": False, "Message": "Test"},
{"OK": False, "Message": "Error retrieving proxy"},
),
],
)
def test__requestProxyFromProxyManager(mocker, mockGCReplyInput, mockPMReplyInput, expected):
""" Testing JobAgent()._requestProxyFromProxyManager()
"""
Expand Down Expand Up @@ -224,8 +240,10 @@ def test__checkInstallSoftware(mocker):
assert result['Value'] == 'Job has no software installation requirement'


@pytest.mark.parametrize("mockJWInput, expected", [(
{'OK': False, 'Message': 'Test'}, {'OK': False, 'Message': 'Test'})])
@pytest.mark.parametrize(
"mockJWInput, expected",
[({'OK': False, 'Message': 'Test'}, {'OK': False, 'Message': 'Test'})]
)
def test_submitJob(mocker, mockJWInput, expected):
""" Testing JobAgent()._submitJob()
"""
Expand Down
Loading

0 comments on commit a7cd23f

Please sign in to comment.