Skip to content

Commit

Permalink
Merge pull request #3837 from fstagni/v6r20-fixes37
Browse files Browse the repository at this point in the history
[v6r20] Bug fixes for pilot 3 files synchronization
  • Loading branch information
Andrei Tsaregorodtsev authored Sep 21, 2018
2 parents cff8ddd + d7d9cf2 commit 2551367
Show file tree
Hide file tree
Showing 7 changed files with 468 additions and 532 deletions.
230 changes: 115 additions & 115 deletions ConfigurationSystem/Agent/Bdii2CSAgent.py

Large diffs are not rendered by default.

168 changes: 84 additions & 84 deletions ConfigurationSystem/Agent/GOCDB2CSAgent.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
The agent is used to synchronize information between GOCDB and DIRAC configuration System (CS)
"""

__RCSID__ = "$Id$"

from DIRAC import S_OK, S_ERROR
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.LCG.GOCDBClient import GOCDBClient
Expand All @@ -12,113 +14,112 @@
from DIRAC.ConfigurationSystem.Client.CSAPI import CSAPI
from DIRAC.ConfigurationSystem.Client.Config import gConfig

__RCSID__ = "$Id: $"

class GOCDB2CSAgent ( AgentModule ):
class GOCDB2CSAgent (AgentModule):
""" Class to retrieve information about service endpoints
from GOCDB and update configuration stored by CS
"""

def __init__( self, *args, **kwargs ):
def __init__(self, *args, **kwargs):
""" c'tor
"""
super(GOCDB2CSAgent, self).__init__( *args, **kwargs )
super(GOCDB2CSAgent, self).__init__(*args, **kwargs)
self.GOCDBClient = None
self.csAPI = None
self.dryRun = False

def initialize( self ):
def initialize(self):
""" Run at the agent initialization (normally every 500 cycles)
"""
# client to connect to GOCDB
self.GOCDBClient = GOCDBClient()
self.dryRun = self.am_getOption( 'DryRun', self.dryRun )
self.dryRun = self.am_getOption('DryRun', self.dryRun)

# API needed to update configuration stored by CS
self.csAPI = CSAPI()
return self.csAPI.initialize()

def execute( self ):
def execute(self):
"""
Execute GOCDB queries according to the function map
and user request (options in configuration).
"""

# __functionMap is at the end of the class definition
for option, functionCall in GOCDB2CSAgent.__functionMap.iteritems():
optionValue = self.am_getOption( option, True )
optionValue = self.am_getOption(option, True)
if optionValue:
result = functionCall( self )
result = functionCall(self)
if not result['OK']:
self.log.error( "%s() failed with message: %s" % ( functionCall.__name__, result['Message'] ) )
self.log.error("%s() failed with message: %s" % (functionCall.__name__, result['Message']))
else:
self.log.info( "Successfully executed %s" % functionCall.__name__ )
self.log.info("Successfully executed %s" % functionCall.__name__)

return S_OK()

def updatePerfSONARConfiguration( self ):
def updatePerfSONARConfiguration(self):
"""
Get current status of perfSONAR endpoints from GOCDB
and update CS configuration accordingly.
"""
log = self.log.getSubLogger( 'updatePerfSONAREndpoints' )
log.debug( 'Begin function ...' )
log = self.log.getSubLogger('updatePerfSONAREndpoints')
log.debug('Begin function ...')

# get endpoints
result = self.__getPerfSONAREndpoints()
if not result['OK']:
log.error( "__getPerfSONAREndpoints() failed with message: %s" % result['Message'] )
return S_ERROR( 'Unable to fetch perfSONAR endpoints from GOCDB.' )
log.error("__getPerfSONAREndpoints() failed with message: %s" % result['Message'])
return S_ERROR('Unable to fetch perfSONAR endpoints from GOCDB.')
endpointList = result['Value']

# add DIRAC site name
result = self.__addDIRACSiteName( endpointList )
result = self.__addDIRACSiteName(endpointList)
if not result['OK']:
log.error( "__addDIRACSiteName() failed with message: %s" % result['Message'] )
return S_ERROR( 'Unable to extend the list with DIRAC site names.' )
log.error("__addDIRACSiteName() failed with message: %s" % result['Message'])
return S_ERROR('Unable to extend the list with DIRAC site names.')
extendedEndpointList = result['Value']

# prepare dictionary with new configuration
result = self.__preparePerfSONARConfiguration( extendedEndpointList )
result = self.__preparePerfSONARConfiguration(extendedEndpointList)
if not result['OK']:
log.error( "__preparePerfSONARConfiguration() failed with message: %s" % result['Message'] )
return S_ERROR( 'Unable to prepare a new perfSONAR configuration.' )
log.error("__preparePerfSONARConfiguration() failed with message: %s" % result['Message'])
return S_ERROR('Unable to prepare a new perfSONAR configuration.')
finalConfiguration = result['Value']

# update configuration according to the final status of endpoints
self.__updateConfiguration( finalConfiguration )
log.debug( "Configuration updated succesfully" )
self.__updateConfiguration(finalConfiguration)
log.debug("Configuration updated succesfully")

log.debug( 'End function.' )
log.debug('End function.')
return S_OK()

def __getPerfSONAREndpoints( self ):
def __getPerfSONAREndpoints(self):
"""
Retrieve perfSONAR endpoint information directly form GOCDB.
Retrieve perfSONAR endpoint information directly from GOCDB.
:return: List of perfSONAR endpoints (dictionaries) as stored by GOCDB.
"""

log = self.log.getSubLogger( '__getPerfSONAREndpoints' )
log.debug( 'Begin function ...' )
log = self.log.getSubLogger('__getPerfSONAREndpoints')
log.debug('Begin function ...')

# get perfSONAR endpoints (latency and bandwidth) form GOCDB
endpointList = []
for endpointType in ['Latency', 'Bandwidth']:
result = self.GOCDBClient.getServiceEndpointInfo( 'service_type', 'net.perfSONAR.%s' % endpointType )
result = self.GOCDBClient.getServiceEndpointInfo('service_type', 'net.perfSONAR.%s' % endpointType)

if not result['OK']:
log.error( "getServiceEndpointInfo() failed with message: %s" % result['Message'] )
return S_ERROR( 'Could not fetch %s endpoints from GOCDB' % endpointType.lower() )
log.error("getServiceEndpointInfo() failed with message: %s" % result['Message'])
return S_ERROR('Could not fetch %s endpoints from GOCDB' % endpointType.lower())

log.debug( 'Number of %s endpoints: %s' % ( endpointType.lower(), len( result['Value'] ) ) )
endpointList.extend( result['Value'] )
log.debug('Number of %s endpoints: %s' % (endpointType.lower(), len(result['Value'])))
endpointList.extend(result['Value'])

log.debug( 'Number of perfSONAR endpoints: %s' % len( endpointList ) )
log.debug( 'End function.' )
return S_OK( endpointList )
log.debug('Number of perfSONAR endpoints: %s' % len(endpointList))
log.debug('End function.')
return S_OK(endpointList)

def __preparePerfSONARConfiguration( self, endpointList ):
def __preparePerfSONARConfiguration(self, endpointList):
"""
Prepare a dictionary with a new CS configuration of perfSONAR endpoints.
Expand All @@ -127,8 +128,8 @@ def __preparePerfSONARConfiguration( self, endpointList ):
or None in case of a path pointing to a section.
"""

log = self.log.getSubLogger( '__preparePerfSONARConfiguration' )
log.debug( 'Begin function ...' )
log = self.log.getSubLogger('__preparePerfSONARConfiguration')
log.debug('Begin function ...')

# static elements of a path
rootPath = '/Resources/Sites'
Expand All @@ -142,25 +143,25 @@ def __preparePerfSONARConfiguration( self, endpointList ):
if endpoint['DIRACSITENAME'] is None:
continue

split = endpoint['DIRACSITENAME'].split( '.' )
path = cfgPath( rootPath, split[0], endpoint['DIRACSITENAME'], extPath, endpoint['HOSTNAME'] )
split = endpoint['DIRACSITENAME'].split('.')
path = cfgPath(rootPath, split[0], endpoint['DIRACSITENAME'], extPath, endpoint['HOSTNAME'])
for name, defaultValue in options.iteritems():
newConfiguration[cfgPath(path, name)] = defaultValue

# get current configuration
currentConfiguration = {}
for option in options.iterkeys():
result = gConfig.getConfigurationTree( rootPath, extPath + '/', '/' + option )
result = gConfig.getConfigurationTree(rootPath, extPath + '/', '/' + option)
if not result['OK']:
log.error( "getConfigurationTree() failed with message: %s" % result['Message'] )
return S_ERROR( 'Unable to fetch perfSONAR endpoints from CS.' )
log.error("getConfigurationTree() failed with message: %s" % result['Message'])
return S_ERROR('Unable to fetch perfSONAR endpoints from CS.')
currentConfiguration.update(result['Value'])

# disable endpoints that disappeared in GOCDB
removedElements = set( currentConfiguration ) - set( newConfiguration )
newElements = set( newConfiguration ) - set( currentConfiguration )
removedElements = set(currentConfiguration) - set(newConfiguration)
newElements = set(newConfiguration) - set(currentConfiguration)

addedEndpoints = len( newElements )/len( options )
addedEndpoints = len(newElements) / len(options)
disabledEndpoints = 0
for path in removedElements:
if baseOptionName in path:
Expand All @@ -170,18 +171,18 @@ def __preparePerfSONARConfiguration( self, endpointList ):

# inform what will be changed
if addedEndpoints > 0:
self.log.info( "%s new perfSONAR endpoints will be added to the configuration" % addedEndpoints )
self.log.info("%s new perfSONAR endpoints will be added to the configuration" % addedEndpoints)

if disabledEndpoints > 0:
self.log.info( "%s old perfSONAR endpoints will be disable in the configuration" % disabledEndpoints )
self.log.info("%s old perfSONAR endpoints will be disable in the configuration" % disabledEndpoints)

if addedEndpoints == 0 and disabledEndpoints == 0:
self.log.info( "perfSONAR configuration is up-to-date" )
self.log.info("perfSONAR configuration is up-to-date")

log.debug( 'End function.' )
return S_OK( newConfiguration )
log.debug('End function.')
return S_OK(newConfiguration)

def __addDIRACSiteName( self, inputList ):
def __addDIRACSiteName(self, inputList):
"""
Extend given list of GOCDB endpoints with DIRAC site name, i.e.
add an entry "DIRACSITENAME" in dictionaries that describe endpoints.
Expand All @@ -190,33 +191,33 @@ def __addDIRACSiteName( self, inputList ):
:return: List of perfSONAR endpoints (dictionaries).
"""

log = self.log.getSubLogger( '__addDIRACSiteName' )
log.debug( 'Begin function ...' )
log = self.log.getSubLogger('__addDIRACSiteName')
log.debug('Begin function ...')

# get site name dictionary
result = getDIRACGOCDictionary()
if not result['OK']:
log.error( "getDIRACGOCDictionary() failed with message: %s" % result['Message'] )
return S_ERROR( 'Could not get site name dictionary' )
log.error("getDIRACGOCDictionary() failed with message: %s" % result['Message'])
return S_ERROR('Could not get site name dictionary')

# reverse the dictionary (assume 1 to 1 relation)
DIRACGOCDict = result['Value']
GOCDIRACDict = dict( zip( DIRACGOCDict.values(), DIRACGOCDict.keys() ) )
GOCDIRACDict = dict(zip(DIRACGOCDict.values(), DIRACGOCDict.keys()))

# add DIRAC site names
outputList = []
for entry in inputList:
try:
entry['DIRACSITENAME'] = GOCDIRACDict[entry['SITENAME']]
except KeyError:
self.log.warn( "No dictionary entry for %s. " % entry['SITENAME'] )
self.log.warn("No dictionary entry for %s. " % entry['SITENAME'])
entry['DIRACSITENAME'] = None
outputList.append( entry )
outputList.append(entry)

log.debug( 'End function.' )
return S_OK( outputList )
log.debug('End function.')
return S_OK(outputList)

def __updateConfiguration( self, setElements = None, delElements = None ):
def __updateConfiguration(self, setElements=None, delElements=None):
"""
Update configuration stored by CS.
"""
Expand All @@ -225,59 +226,58 @@ def __updateConfiguration( self, setElements = None, delElements = None ):
if delElements is None:
delElements = []

log = self.log.getSubLogger( '__updateConfiguration' )
log.debug( 'Begin function ...' )
log = self.log.getSubLogger('__updateConfiguration')
log.debug('Begin function ...')

# assure existence and proper value of a section or an option
for path, value in setElements.iteritems():

if value is None:
section = path
else:
split = path.rsplit( '/', 1 )
split = path.rsplit('/', 1)
section = split[0]

try:
result = self.csAPI.createSection( section )
result = self.csAPI.createSection(section)
if not result['OK']:
log.error( "createSection() failed with message: %s" % result['Message'] )
log.error("createSection() failed with message: %s" % result['Message'])
except Exception as e:
log.error( "Exception in createSection(): %s" % repr( e ).replace( ',)', ')' ) )
log.error("Exception in createSection(): %s" % repr(e).replace(',)', ')'))

if value is not None:
try:
result = self.csAPI.setOption( path, value )
result = self.csAPI.setOption(path, value)
if not result['OK']:
log.error( "setOption() failed with message: %s" % result['Message'] )
log.error("setOption() failed with message: %s" % result['Message'])
except Exception as e:
log.error( "Exception in setOption(): %s" % repr( e ).replace( ',)', ')' ) )
log.error("Exception in setOption(): %s" % repr(e).replace(',)', ')'))

# delete elements in the configuration
for path in delElements:
result = self.csAPI.delOption( path )
result = self.csAPI.delOption(path)
if not result['OK']:
log.warn( "delOption() failed with message: %s" % result['Message'] )
log.warn("delOption() failed with message: %s" % result['Message'])

result = self.csAPI.delSection( path )
result = self.csAPI.delSection(path)
if not result['OK']:
log.warn( "delSection() failed with message: %s" % result['Message'] )
log.warn("delSection() failed with message: %s" % result['Message'])

if self.dryRun:
log.info( "Dry Run: CS won't be updated" )
log.info("Dry Run: CS won't be updated")
self.csAPI.showDiff()
else:
# update configuration stored by CS
result = self.csAPI.commit()
if not result['OK']:
log.error( "commit() failed with message: %s" % result['Message'] )
return S_ERROR( "Could not commit changes to CS." )
log.error("commit() failed with message: %s" % result['Message'])
return S_ERROR("Could not commit changes to CS.")
else:
log.info("Committed changes to CS")

log.debug( 'End function.' )
log.debug('End function.')
return S_OK()


# define mapping between an agent option in the configuration and a function call
__functionMap = { 'UpdatePerfSONARS': updatePerfSONARConfiguration,
}
__functionMap = {'UpdatePerfSONARS': updatePerfSONARConfiguration,
}
4 changes: 3 additions & 1 deletion ConfigurationSystem/Service/ConfigurationHandler.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
gServiceInterface = None
gPilotSynchronizer = None


def initializeConfigurationHandler(serviceInfo):
global gServiceInterface
gServiceInterface = ServiceInterface(serviceInfo['URL'])
Expand Down Expand Up @@ -59,7 +60,8 @@ def export_commitNewData(self, sData):
return res

# Check the flag for updating the pilot 3 JSON file
if self.srv_getCSOption('UpdatePilotCStoJSONFile', False) and gServiceInterface.isMaster():
updatePilotCStoJSONFileFlag = self.srv_getCSOption('UpdatePilotCStoJSONFile', False)
if updatePilotCStoJSONFileFlag and gServiceInterface.isMaster():
if gPilotSynchronizer is None:
try:
# This import is only needed for the Master CS service, making it conditional avoids
Expand Down
Loading

0 comments on commit 2551367

Please sign in to comment.