Skip to content

Commit

Permalink
Merge pull request #5208 from fstagni/v7r3-fixes11
Browse files Browse the repository at this point in the history
[v7r3] Hackathon fixes
  • Loading branch information
fstagni authored Jun 17, 2021
2 parents e0b4c5e + 757f751 commit cf55bc4
Show file tree
Hide file tree
Showing 11 changed files with 58 additions and 39 deletions.
4 changes: 2 additions & 2 deletions environment-py3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ dependencies:
- elasticsearch-dsl
- future
- gitpython >=2.1.0
- m2crypto >=0.36
- m2crypto >=0.36,!=0.38.0
- matplotlib
- numpy
- pexpect >=4.0.1
Expand Down Expand Up @@ -93,5 +93,5 @@ dependencies:
- git+https://gitlab.cern.ch/cburr/fts-rest.git@python3
# Doesn't support Python 3.9
# - rucio-clients >=1.25.6
- git+https://github.com/rucio/rucio.git@master
- git+https://github.com/rucio/rucio.git@eafde85bfbfe51fb22c7774e2af8deda3fb05e81
- -e .
11 changes: 6 additions & 5 deletions src/DIRAC/Core/Utilities/Subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,11 @@ def __readFromFile(self, fd, baseLength):
nB = fd.read(1)
if not nB:
break
dataString += nB.decode()
try:
dataString += nB.decode()
except UnicodeDecodeError as e:
self.log.warn("Unicode decode error in readFromFile", "(%r): %r" % (e, dataString))
dataString += nB.decode("utf-8", "replace")
# break out of potential infinite loop, indicated by dataString growing beyond reason
if len(dataString) + baseLength > self.bufferLimit:
self.log.error("DataString is getting too long (%s): %s " % (len(dataString), dataString[-10000:]))
Expand Down Expand Up @@ -426,10 +430,7 @@ def systemCall(self, cmdSeq, callbackFunction=None, shell=False, env=None):

self.cmdSeq = cmdSeq
self.callback = callbackFunction
if sys.platform.find("win") == 0:
closefd = False
else:
closefd = True
closefd = sys.platform.find("win") != 0
try:
self.child = subprocess.Popen(self.cmdSeq,
shell=shell,
Expand Down
17 changes: 16 additions & 1 deletion src/DIRAC/Core/Utilities/test/Test_Subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from subprocess import Popen

# SUT
from DIRAC.Core.Utilities.Subprocess import systemCall, shellCall, pythonCall, getChildrenPIDs
from DIRAC.Core.Utilities.Subprocess import systemCall, shellCall, pythonCall, getChildrenPIDs, Subprocess

# Mark this entire module as slow
pytestmark = pytest.mark.slow
Expand Down Expand Up @@ -66,3 +66,18 @@ def test_getChildrenPIDs():
assert isinstance(p, int)

mainProcess.wait()


def test_decodingCommandOutput():
retVal = systemCall(10, ["echo", "-e", "-n", r"\xdf"])
assert retVal["OK"]
assert retVal["Value"] == (0, u"\ufffd", "")

retVal = systemCall(10, ["echo", "-e", r"\xdf"])
assert retVal["OK"]
assert retVal["Value"] == (0, u"\ufffd\n", "")

sp = Subprocess()
retVal = sp.systemCall(r"""python -c 'import os; os.fdopen(2, "wb").write(b"\xdf")'""", shell=True)
assert retVal["OK"]
assert retVal["Value"] == (0, "", u"\ufffd")
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ def execute(self):
try:
future.result()
except Exception as exc:
self.log.error('%d generated an exception: %s' % (transID, exc))
self.log.exception('%s generated an exception: %s' % (transID, exc))
else:
self.log.info('Processed', transID)

Expand Down
32 changes: 15 additions & 17 deletions src/DIRAC/ResourceStatusSystem/Agent/SiteInspectorAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,27 +51,27 @@ def __init__(self, *args, **kwargs):

AgentModule.__init__(self, *args, **kwargs)

self.siteClient = None
self.rsClient = None
self.clients = {}

def initialize(self):
""" Standard initialize.
"""

res = ObjectLoader().loadObject('DIRAC.ResourceStatusSystem.Client.SiteStatus')
if not res['OK']:
self.log.error('Failed to load SiteStatus class: %s' % res['Message'])
return res
siteStatusClass = res['Value']

res = ObjectLoader().loadObject('DIRAC.ResourceStatusSystem.Client.ResourceManagementClient')
if not res['OK']:
self.log.error('Failed to load ResourceManagementClient class: %s' % res['Message'])
return res
rmClass = res['Value']

self.siteClient = siteStatusClass()
self.clients['SiteStatus'] = siteStatusClass()
res = ObjectLoader().loadObject('DIRAC.ResourceStatusSystem.Client.ResourceStatusClient')
if not res['OK']:
self.log.error('Failed to load ResourceStatusClient class: %s' % res['Message'])
return res
rsClass = res['Value']

self.rsClient = rsClass()
self.clients['ResourceStatusClient'] = rsClass()
self.clients['ResourceManagementClient'] = rmClass()

maxNumberOfThreads = self.am_getOption('maxNumberOfThreads', 15)
Expand All @@ -85,15 +85,11 @@ def execute(self):
It gets the sites from the Database which are eligible to be re-checked.
"""

res = self.siteClient.getSites('All')
if not res['OK']:
return res

utcnow = datetime.datetime.utcnow().replace(microsecond=0)
future_to_element = {}

# get the current status
res = self.siteClient.getSiteStatuses(res['Value'])
res = self.rsClient.selectStatusElement('Site', 'Status')
if not res['OK']:
return res

Expand All @@ -119,7 +115,8 @@ def execute(self):
self.log.verbose('"%s" # %s # %s' % (siteDict['Name'],
siteDict['Status'],
siteDict['LastCheckTime']))
lowerElementDict = {}

lowerElementDict = {'element': 'Site'}
for key, value in siteDict.items():
if len(key) >= 2: # VO !
lowerElementDict[key[0].lower() + key[1:]] = value
Expand All @@ -132,7 +129,7 @@ def execute(self):
try:
future.result()
except Exception as exc:
self.log.error('%d generated an exception: %s' % (transID, exc))
self.log.exception('%s generated an exception: %s' % (transID, exc))
else:
self.log.info('Processed', transID)

Expand All @@ -147,8 +144,9 @@ def _execute(self, site):
pep = PEP(clients=self.clients)

self.log.verbose(
'%s ( status=%s / statusType=%s ) being processed' % (
'%s ( VO=%s / status=%s / statusType=%s ) being processed' % (
site['name'],
site['vO'],
site['status'],
site['statusType']))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
site = {
'status': 'status',
'name': 'site',
'vO': 'some_vo',
'site': 'site',
'element': 'Site',
'statusType': 'all',
Expand Down
1 change: 1 addition & 0 deletions src/DIRAC/ResourceStatusSystem/Client/SiteStatus.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class SiteStatus(object):
* getUsableSites
* getSites
"""

def __init__(self):
"""
Constructor, initializes the rssClient.
Expand Down
4 changes: 2 additions & 2 deletions src/DIRAC/ResourceStatusSystem/Command/PropagationCommand.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ def doNew(self, masterParams=None):

def doCache(self):

if not self.args['site']:
if not self.args['name']:
return S_ERROR('site was not found in args')

site = self.args['site']
site = self.args['name']

elements = CSHelpers.getSiteElements(site)

Expand Down
21 changes: 12 additions & 9 deletions src/DIRAC/WorkloadManagementSystem/Agent/JobAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,18 @@ def execute(self):
return self._rescheduleFailedJob(
jobID, errorMsg, self.stopOnApplicationFailure)

gridCE = gConfig.getValue('/LocalSite/GridCE', '')
if gridCE:
jobReport.setJobParameter(par_name='GridCE',
par_value=gridCE,
sendFlag=False)

queue = gConfig.getValue('/LocalSite/CEQueue', '')
if queue:
jobReport.setJobParameter(par_name='CEQueue',
par_value=queue,
sendFlag=False)

self.log.debug('Before self._submitJob() (%sCE)' % (self.ceName))
result_submitJob = self._submitJob(
jobID=jobID,
Expand Down Expand Up @@ -573,15 +585,6 @@ def _submitJob(self, jobID, jobParams, resourceParams, optimizerParams,
jobReport.setJobStatus(status=JobStatus.MATCHED,
minorStatus='Submitting To CE')

gridCE = gConfig.getValue('/LocalSite/GridCE', '')
queue = gConfig.getValue('/LocalSite/CEQueue', '')
jobReport.setJobParameter(par_name='GridCE',
par_value=gridCE,
sendFlag=False)
jobReport.setJobParameter(par_name='CEQueue',
par_value=queue,
sendFlag=False)

self.log.info('Submitting JobWrapper',
'%s to %sCE' % (os.path.basename(wrapperFile), self.ceName))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ def test_JobStateUpdateAndJobMonitoringMultuple(self):
resJG_olderThanOneYear)
res = jobMonitor.getStates()
self.assertTrue(res['OK'], res.get('Message'))
self.assertTrue(sorted(res['Value']) in [[JobStatus.RECEIVED], sorted([JobStatus.RECEIVED, JobStatus.WAITING])],
self.assertTrue(sorted(res['Value']) in [[JobStatus.RECEIVED], sorted([JobStatus.RECEIVED, JobStatus.KILLED])],
res['Value'])
res = jobMonitor.getMinorStates()
self.assertTrue(res['OK'], res.get('Message'))
Expand Down
2 changes: 1 addition & 1 deletion tests/Jenkins/dirac_ci.sh
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ fullInstallDIRAC() {
dirac-admin-allow-se SE-1 SE-2 S3-DIRECT S3-INDIRECT --All

#agents
findAgents
findAgents 'exclude' 'Rucio'
if ! diracAgents; then
echo "ERROR: diracAgents failed"
exit 1
Expand Down

0 comments on commit cf55bc4

Please sign in to comment.