diff --git a/ConfigurationSystem/Agent/Bdii2CSAgent.py b/ConfigurationSystem/Agent/Bdii2CSAgent.py index 163ea854cc9..54475342723 100644 --- a/ConfigurationSystem/Agent/Bdii2CSAgent.py +++ b/ConfigurationSystem/Agent/Bdii2CSAgent.py @@ -5,26 +5,27 @@ if necessary settings which were changed in the BDII recently """ -from DIRAC import S_OK, S_ERROR, gConfig -from DIRAC.Core.Base.AgentModule import AgentModule -from DIRAC.Core.Utilities.Grid import getBdiiCEInfo, getBdiiSEInfo -from DIRAC.FrameworkSystem.Client.NotificationClient import NotificationClient -from DIRAC.ConfigurationSystem.Client.CSAPI import CSAPI -from DIRAC.ConfigurationSystem.Client.Helpers.Path import cfgPath -from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs, getVOOption -from DIRAC.ConfigurationSystem.Client.Utilities import getGridCEs, getSiteUpdates, getSRMUpdates, \ - getCEsFromCS, getSEsFromCS, getGridSRMs +__RCSID__ = "$Id$" + +from DIRAC import S_OK, S_ERROR, gConfig +from DIRAC.Core.Base.AgentModule import AgentModule +from DIRAC.Core.Utilities.Grid import getBdiiCEInfo, getBdiiSEInfo +from DIRAC.FrameworkSystem.Client.NotificationClient import NotificationClient +from DIRAC.ConfigurationSystem.Client.CSAPI import CSAPI +from DIRAC.ConfigurationSystem.Client.Helpers.Path import cfgPath +from DIRAC.ConfigurationSystem.Client.Helpers.Registry import getVOs, getVOOption +from DIRAC.ConfigurationSystem.Client.Utilities import getGridCEs, getSiteUpdates, getSRMUpdates, \ + getCEsFromCS, getSEsFromCS, getGridSRMs from DIRAC.Core.Utilities.SiteCEMapping import getSiteForCE -__RCSID__ = "$Id$" -class Bdii2CSAgent( AgentModule ): +class Bdii2CSAgent(AgentModule): - def __init__( self, *args, **kwargs ): + def __init__(self, *args, **kwargs): """ Defines default parameters """ - super(Bdii2CSAgent, self).__init__( *args, **kwargs ) + super(Bdii2CSAgent, self).__init__(*args, **kwargs) self.addressTo = '' self.addressFrom = '' @@ -47,57 +48,57 @@ def __init__( self, *args, **kwargs ): # Update the CS or not? self.dryRun = False - def initialize( self ): + def initialize(self): """ Gets run paramaters from the configuration """ - self.addressTo = self.am_getOption( 'MailTo', self.addressTo ) - self.addressFrom = self.am_getOption( 'MailFrom', self.addressFrom ) + self.addressTo = self.am_getOption('MailTo', self.addressTo) + self.addressFrom = self.am_getOption('MailFrom', self.addressFrom) # Create a list of alternative bdii urls - self.alternativeBDIIs = self.am_getOption( 'AlternativeBDIIs', self.alternativeBDIIs ) + self.alternativeBDIIs = self.am_getOption('AlternativeBDIIs', self.alternativeBDIIs) self.host = self.am_getOption('Host', self.host) self.glue2URLs = self.am_getOption('GLUE2URLs', self.glue2URLs) self.glue2Only = self.am_getOption('GLUE2Only', self.glue2Only) # Check if the bdii url is appended by a port number, if not append the default 2170 - for index, url in enumerate( self.alternativeBDIIs ): - if not url.split( ':' )[-1].isdigit(): + for index, url in enumerate(self.alternativeBDIIs): + if not url.split(':')[-1].isdigit(): self.alternativeBDIIs[index] += ':2170' if self.addressTo and self.addressFrom: - self.log.info( "MailTo", self.addressTo ) - self.log.info( "MailFrom", self.addressFrom ) - if self.alternativeBDIIs : - self.log.info( "AlternativeBDII URLs:", self.alternativeBDIIs ) + self.log.info("MailTo", self.addressTo) + self.log.info("MailFrom", self.addressFrom) + if self.alternativeBDIIs: + self.log.info("AlternativeBDII URLs:", self.alternativeBDIIs) - self.processCEs = self.am_getOption( 'ProcessCEs', self.processCEs ) - self.processSEs = self.am_getOption( 'ProcessSEs', self.processSEs ) + self.processCEs = self.am_getOption('ProcessCEs', self.processCEs) + self.processSEs = self.am_getOption('ProcessSEs', self.processSEs) self.selectedSites = self.am_getOption('SelectedSites', []) - self.dryRun = self.am_getOption( 'DryRun', self.dryRun ) + self.dryRun = self.am_getOption('DryRun', self.dryRun) - self.voName = self.am_getOption( 'VirtualOrganization', self.voName ) + self.voName = self.am_getOption('VirtualOrganization', self.voName) if not self.voName: - self.voName = self.am_getOption( 'VO', [] ) - if not self.voName or ( len( self.voName ) == 1 and self.voName[0].lower() == 'all' ): + self.voName = self.am_getOption('VO', []) + if not self.voName or (len(self.voName) == 1 and self.voName[0].lower() == 'all'): # Get all VOs defined in the configuration self.voName = [] result = getVOs() if result['OK']: vos = result['Value'] for vo in vos: - vomsVO = getVOOption( vo, "VOMSName" ) + vomsVO = getVOOption(vo, "VOMSName") if vomsVO: - self.voName.append( vomsVO ) + self.voName.append(vomsVO) if self.voName: - self.log.info( "Agent will manage VO(s) %s" % self.voName ) + self.log.info("Agent will manage VO(s) %s" % self.voName) else: - self.log.fatal( "VirtualOrganization option not defined for agent" ) + self.log.fatal("VirtualOrganization option not defined for agent") return S_ERROR() self.csAPI = CSAPI() return self.csAPI.initialize() - def execute( self ): + def execute(self): """ General agent execution method """ self.voBdiiCEDict = {} @@ -106,10 +107,10 @@ def execute( self ): # Get a "fresh" copy of the CS data result = self.csAPI.downloadCSData() if not result['OK']: - self.log.warn( "Could not download a fresh copy of the CS data", result[ 'Message' ] ) + self.log.warn("Could not download a fresh copy of the CS data", result['Message']) # Refresh the configuration from the master server - gConfig.forceRefresh( fromMaster = True ) + gConfig.forceRefresh(fromMaster=True) if self.processCEs: self.__lookForNewCEs() @@ -119,25 +120,25 @@ def execute( self ): self.__updateSEs() return S_OK() - def __lookForNewCEs( self ): + def __lookForNewCEs(self): """ Look up BDII for CEs not yet present in the DIRAC CS """ - bannedCEs = self.am_getOption( 'BannedCEs', [] ) + bannedCEs = self.am_getOption('BannedCEs', []) result = getCEsFromCS() if not result['OK']: return result - knownCEs = set( result['Value'] ) - knownCEs = knownCEs.union( set( bannedCEs ) ) + knownCEs = set(result['Value']) + knownCEs = knownCEs.union(set(bannedCEs)) for vo in self.voName: - result = self.__getBdiiCEInfo( vo ) + result = self.__getBdiiCEInfo(vo) if not result['OK']: continue bdiiInfo = result['Value'] - result = getGridCEs( vo, bdiiInfo = bdiiInfo, ceBlackList = knownCEs ) + result = getGridCEs(vo, bdiiInfo=bdiiInfo, ceBlackList=knownCEs) if not result['OK']: - self.log.error( 'Failed to get unused CEs', result['Message'] ) + self.log.error('Failed to get unused CEs', result['Message']) siteDict = result['Value'] body = '' for site in siteDict: @@ -149,15 +150,15 @@ def __lookForNewCEs( self ): for ce in newCEs: queueString = '' ceInfo = bdiiInfo[site]['CEs'][ce] - newCEString = "CE: %s, GOCDB Site Name: %s" % ( ce, site ) + newCEString = "CE: %s, GOCDB Site Name: %s" % (ce, site) systemTuple = siteDict[site][ce]['System'] - osString = "%s_%s_%s" % ( systemTuple ) - newCEString = "\n%s\n%s\n" % ( newCEString, osString ) + osString = "%s_%s_%s" % (systemTuple) + newCEString = "\n%s\n%s\n" % (newCEString, osString) for queue in ceInfo['Queues']: - queueStatus = ceInfo['Queues'][queue].get( 'GlueCEStateStatus', 'UnknownStatus' ) + queueStatus = ceInfo['Queues'][queue].get('GlueCEStateStatus', 'UnknownStatus') if 'production' in queueStatus.lower(): - ceType = ceInfo['Queues'][queue].get( 'GlueCEImplementationName', '' ) - queueString += " %s %s %s\n" % ( queue, queueStatus, ceType ) + ceType = ceInfo['Queues'][queue].get('GlueCEImplementationName', '') + queueString += " %s %s %s\n" % (queue, queueStatus, ceType) if queueString: ceString += newCEString ceString += "Queues:\n" @@ -171,35 +172,36 @@ def __lookForNewCEs( self ): body += "\n\nTo suppress information about CE add its name to BannedCEs list.\n" body += "Add new Sites/CEs for vo %s with the command:\n" % vo body += "dirac-admin-add-resources --vo %s --ce\n" % vo - self.log.info( body ) + self.log.info(body) if self.addressTo and self.addressFrom: notification = NotificationClient() - result = notification.sendMail( self.addressTo, self.subject, body, self.addressFrom, localAttempt = False ) + result = notification.sendMail(self.addressTo, self.subject, body, self.addressFrom, + localAttempt=False, avoidSpam=True) if not result['OK']: - self.log.error( 'Can not send new site notification mail', result['Message'] ) + self.log.error('Can not send new site notification mail', result['Message']) return S_OK() - def __getBdiiCEInfo( self, vo ): + def __getBdiiCEInfo(self, vo): if vo in self.voBdiiCEDict: - return S_OK( self.voBdiiCEDict[vo] ) - self.log.info( "Check for available CEs for VO", vo ) - totalResult = S_OK( {} ) + return S_OK(self.voBdiiCEDict[vo]) + self.log.info("Check for available CEs for VO", vo) + totalResult = S_OK({}) message = '' mainResult = getBdiiCEInfo(vo, host=self.host, glue2=self.glue2Only) if not mainResult['OK']: - self.log.error( "Failed getting information from default bdii", mainResult['Message'] ) + self.log.error("Failed getting information from default bdii", mainResult['Message']) message = mainResult['Message'] - for bdii in reversed( self.alternativeBDIIs ): + for bdii in reversed(self.alternativeBDIIs): resultAlt = getBdiiCEInfo(vo, host=bdii, glue2=self.glue2Only) if resultAlt['OK']: - totalResult['Value'].update( resultAlt['Value'] ) + totalResult['Value'].update(resultAlt['Value']) else: - self.log.error( "Failed getting information from %s " % bdii, resultAlt['Message'] ) - message = ( message + "\n" + resultAlt['Message'] ).strip() + self.log.error("Failed getting information from %s " % bdii, resultAlt['Message']) + message = (message + "\n" + resultAlt['Message']).strip() for glue2URL in self.glue2URLs: if self.glue2Only: @@ -213,56 +215,56 @@ def __getBdiiCEInfo( self, vo ): message = (message + "\n" + resultGlue2['Message']).strip() if mainResult['OK']: - totalResult['Value'].update( mainResult['Value'] ) + totalResult['Value'].update(mainResult['Value']) - if not totalResult['Value'] and message: ## Dict is empty and we have an error message - self.log.error( "Error during BDII request", message ) - totalResult = S_ERROR( message ) + if not totalResult['Value'] and message: # Dict is empty and we have an error message + self.log.error("Error during BDII request", message) + totalResult = S_ERROR(message) else: self.voBdiiCEDict[vo] = totalResult['Value'] return totalResult - def __getBdiiSEInfo( self, vo ): + def __getBdiiSEInfo(self, vo): if vo in self.voBdiiSEDict: - return S_OK( self.voBdiiSEDict[vo] ) - self.log.info( "Check for available SEs for VO", vo ) - result = getBdiiSEInfo( vo ) + return S_OK(self.voBdiiSEDict[vo]) + self.log.info("Check for available SEs for VO", vo) + result = getBdiiSEInfo(vo) message = '' if not result['OK']: message = result['Message'] - for bdii in self.alternativeBDIIs : - result = getBdiiSEInfo( vo, host = bdii ) + for bdii in self.alternativeBDIIs: + result = getBdiiSEInfo(vo, host=bdii) if result['OK']: break if not result['OK']: if message: - self.log.error( "Error during BDII request", message ) + self.log.error("Error during BDII request", message) else: - self.log.error( "Error during BDII request", result['Message'] ) + self.log.error("Error during BDII request", result['Message']) else: self.voBdiiSEDict[vo] = result['Value'] return result - def __updateCEs( self ): + def __updateCEs(self): """ Update the Site/CE/queue settings in the CS if they were changed in the BDII """ bdiiChangeSet = set() for vo in self.voName: - result = self.__getBdiiCEInfo( vo ) + result = self.__getBdiiCEInfo(vo) if not result['OK']: continue ceBdiiDict = result['Value'] self.__purgeSites(ceBdiiDict) - result = getSiteUpdates( vo, bdiiInfo = ceBdiiDict, log = self.log ) + result = getSiteUpdates(vo, bdiiInfo=ceBdiiDict, log=self.log) if not result['OK']: continue - bdiiChangeSet = bdiiChangeSet.union( result['Value'] ) + bdiiChangeSet = bdiiChangeSet.union(result['Value']) # We have collected all the changes, consolidate VO settings - result = self.__updateCS( bdiiChangeSet ) + result = self.__updateCS(bdiiChangeSet) return result def __purgeSites(self, ceBdiiDict): @@ -288,70 +290,68 @@ def __purgeSites(self, ceBdiiDict): ceBdiiDict.pop(site) return - - def __updateCS( self, bdiiChangeSet ): + def __updateCS(self, bdiiChangeSet): queueVODict = {} changeSet = set() for entry in bdiiChangeSet: - section, option , _value, new_value = entry + section, option, _value, new_value = entry if option == "VO": - queueVODict.setdefault( section, set() ) - queueVODict[section] = queueVODict[section].union( set( new_value.split( ',' ) ) ) + queueVODict.setdefault(section, set()) + queueVODict[section] = queueVODict[section].union(set(new_value.split(','))) else: - changeSet.add( entry ) + changeSet.add(entry) for section, VOs in queueVODict.items(): - changeSet.add( ( section, 'VO', '', ','.join( VOs ) ) ) + changeSet.add((section, 'VO', '', ','.join(VOs))) if changeSet: - changeList = list( changeSet ) - changeList.sort() - body = '\n'.join( [ "%s/%s %s -> %s" % entry for entry in changeList ] ) + changeList = sorted(changeSet) + body = '\n'.join(["%s/%s %s -> %s" % entry for entry in changeList]) if body and self.addressTo and self.addressFrom: notification = NotificationClient() - result = notification.sendMail( self.addressTo, self.subject, body, self.addressFrom, localAttempt = False ) + result = notification.sendMail(self.addressTo, self.subject, body, self.addressFrom, localAttempt=False) if body: - self.log.info( 'The following configuration changes were detected:' ) - self.log.info( body ) + self.log.info('The following configuration changes were detected:') + self.log.info(body) for section, option, value, new_value in changeSet: if value == 'Unknown' or not value: - self.csAPI.setOption( cfgPath( section, option ), new_value ) + self.csAPI.setOption(cfgPath(section, option), new_value) else: - self.csAPI.modifyValue( cfgPath( section, option ), new_value ) + self.csAPI.modifyValue(cfgPath(section, option), new_value) if self.dryRun: - self.log.info( "Dry Run: CS won't be updated" ) + self.log.info("Dry Run: CS won't be updated") self.csAPI.showDiff() else: result = self.csAPI.commit() if not result['OK']: - self.log.error( "Error while committing to CS", result['Message'] ) + self.log.error("Error while committing to CS", result['Message']) else: - self.log.info( "Successfully committed %d changes to CS" % len( changeList ) ) + self.log.info("Successfully committed %d changes to CS" % len(changeList)) return result else: - self.log.info( "No changes found" ) + self.log.info("No changes found") return S_OK() - def __lookForNewSEs( self ): + def __lookForNewSEs(self): """ Look up BDII for SEs not yet present in the DIRAC CS """ - bannedSEs = self.am_getOption( 'BannedSEs', [] ) + bannedSEs = self.am_getOption('BannedSEs', []) result = getSEsFromCS() if not result['OK']: return result - knownSEs = set( result['Value'] ) - knownSEs = knownSEs.union( set( bannedSEs ) ) + knownSEs = set(result['Value']) + knownSEs = knownSEs.union(set(bannedSEs)) for vo in self.voName: - result = self.__getBdiiSEInfo( vo ) + result = self.__getBdiiSEInfo(vo) if not result['OK']: continue bdiiInfo = result['Value'] - result = getGridSRMs( vo, bdiiInfo = bdiiInfo, srmBlackList = knownSEs ) + result = getGridSRMs(vo, bdiiInfo=bdiiInfo, srmBlackList=knownSEs) if not result['OK']: continue siteDict = result['Value'] @@ -361,41 +361,41 @@ def __lookForNewSEs( self ): if not newSEs: continue for se in newSEs: - body += '\n New SE %s available at site %s:\n' % ( se, site ) - backend = siteDict[site][se]['SE'].get( 'GlueSEImplementationName', 'Unknown' ) - size = siteDict[site][se]['SE'].get( 'GlueSESizeTotal', 'Unknown' ) - body += ' Backend %s, Size %s' % ( backend, size ) + body += '\n New SE %s available at site %s:\n' % (se, site) + backend = siteDict[site][se]['SE'].get('GlueSEImplementationName', 'Unknown') + size = siteDict[site][se]['SE'].get('GlueSESizeTotal', 'Unknown') + body += ' Backend %s, Size %s' % (backend, size) if body: body = "\nWe are glad to inform You about new SE(s) possibly suitable for %s:\n" % vo + body body += "\n\nTo suppress information about an SE add its name to BannedSEs list.\n" body += "Add new SEs for vo %s with the command:\n" % vo body += "dirac-admin-add-resources --vo %s --se\n" % vo - self.log.info( body ) + self.log.info(body) if self.addressTo and self.addressFrom: notification = NotificationClient() - result = notification.sendMail( self.addressTo, self.subject, body, self.addressFrom, localAttempt = False ) + result = notification.sendMail(self.addressTo, self.subject, body, self.addressFrom, localAttempt=False) if not result['OK']: - self.log.error( 'Can not send new site notification mail', result['Message'] ) + self.log.error('Can not send new site notification mail', result['Message']) return S_OK() - def __updateSEs( self ): + def __updateSEs(self): """ Update the Storage Element settings in the CS if they were changed in the BDII """ bdiiChangeSet = set() for vo in self.voName: - result = self.__getBdiiSEInfo( vo ) + result = self.__getBdiiSEInfo(vo) if not result['OK']: continue seBdiiDict = result['Value'] - result = getSRMUpdates( vo, bdiiInfo = seBdiiDict ) + result = getSRMUpdates(vo, bdiiInfo=seBdiiDict) if not result['OK']: continue - bdiiChangeSet = bdiiChangeSet.union( result['Value'] ) + bdiiChangeSet = bdiiChangeSet.union(result['Value']) # We have collected all the changes, consolidate VO settings - result = self.__updateCS( bdiiChangeSet ) + result = self.__updateCS(bdiiChangeSet) return result diff --git a/ConfigurationSystem/Agent/GOCDB2CSAgent.py b/ConfigurationSystem/Agent/GOCDB2CSAgent.py index 0a9eb230d3c..bc079b0fbee 100644 --- a/ConfigurationSystem/Agent/GOCDB2CSAgent.py +++ b/ConfigurationSystem/Agent/GOCDB2CSAgent.py @@ -4,6 +4,8 @@ The agent is used to synchronize information between GOCDB and DIRAC configuration System (CS) """ +__RCSID__ = "$Id$" + from DIRAC import S_OK, S_ERROR from DIRAC.Core.Base.AgentModule import AgentModule from DIRAC.Core.LCG.GOCDBClient import GOCDBClient @@ -12,33 +14,32 @@ from DIRAC.ConfigurationSystem.Client.CSAPI import CSAPI from DIRAC.ConfigurationSystem.Client.Config import gConfig -__RCSID__ = "$Id: $" -class GOCDB2CSAgent ( AgentModule ): +class GOCDB2CSAgent (AgentModule): """ Class to retrieve information about service endpoints from GOCDB and update configuration stored by CS """ - def __init__( self, *args, **kwargs ): + def __init__(self, *args, **kwargs): """ c'tor """ - super(GOCDB2CSAgent, self).__init__( *args, **kwargs ) + super(GOCDB2CSAgent, self).__init__(*args, **kwargs) self.GOCDBClient = None self.csAPI = None self.dryRun = False - def initialize( self ): + def initialize(self): """ Run at the agent initialization (normally every 500 cycles) """ # client to connect to GOCDB self.GOCDBClient = GOCDBClient() - self.dryRun = self.am_getOption( 'DryRun', self.dryRun ) + self.dryRun = self.am_getOption('DryRun', self.dryRun) # API needed to update configuration stored by CS self.csAPI = CSAPI() return self.csAPI.initialize() - def execute( self ): + def execute(self): """ Execute GOCDB queries according to the function map and user request (options in configuration). @@ -46,79 +47,79 @@ def execute( self ): # __functionMap is at the end of the class definition for option, functionCall in GOCDB2CSAgent.__functionMap.iteritems(): - optionValue = self.am_getOption( option, True ) + optionValue = self.am_getOption(option, True) if optionValue: - result = functionCall( self ) + result = functionCall(self) if not result['OK']: - self.log.error( "%s() failed with message: %s" % ( functionCall.__name__, result['Message'] ) ) + self.log.error("%s() failed with message: %s" % (functionCall.__name__, result['Message'])) else: - self.log.info( "Successfully executed %s" % functionCall.__name__ ) + self.log.info("Successfully executed %s" % functionCall.__name__) return S_OK() - def updatePerfSONARConfiguration( self ): + def updatePerfSONARConfiguration(self): """ Get current status of perfSONAR endpoints from GOCDB and update CS configuration accordingly. """ - log = self.log.getSubLogger( 'updatePerfSONAREndpoints' ) - log.debug( 'Begin function ...' ) + log = self.log.getSubLogger('updatePerfSONAREndpoints') + log.debug('Begin function ...') # get endpoints result = self.__getPerfSONAREndpoints() if not result['OK']: - log.error( "__getPerfSONAREndpoints() failed with message: %s" % result['Message'] ) - return S_ERROR( 'Unable to fetch perfSONAR endpoints from GOCDB.' ) + log.error("__getPerfSONAREndpoints() failed with message: %s" % result['Message']) + return S_ERROR('Unable to fetch perfSONAR endpoints from GOCDB.') endpointList = result['Value'] # add DIRAC site name - result = self.__addDIRACSiteName( endpointList ) + result = self.__addDIRACSiteName(endpointList) if not result['OK']: - log.error( "__addDIRACSiteName() failed with message: %s" % result['Message'] ) - return S_ERROR( 'Unable to extend the list with DIRAC site names.' ) + log.error("__addDIRACSiteName() failed with message: %s" % result['Message']) + return S_ERROR('Unable to extend the list with DIRAC site names.') extendedEndpointList = result['Value'] # prepare dictionary with new configuration - result = self.__preparePerfSONARConfiguration( extendedEndpointList ) + result = self.__preparePerfSONARConfiguration(extendedEndpointList) if not result['OK']: - log.error( "__preparePerfSONARConfiguration() failed with message: %s" % result['Message'] ) - return S_ERROR( 'Unable to prepare a new perfSONAR configuration.' ) + log.error("__preparePerfSONARConfiguration() failed with message: %s" % result['Message']) + return S_ERROR('Unable to prepare a new perfSONAR configuration.') finalConfiguration = result['Value'] # update configuration according to the final status of endpoints - self.__updateConfiguration( finalConfiguration ) - log.debug( "Configuration updated succesfully" ) + self.__updateConfiguration(finalConfiguration) + log.debug("Configuration updated succesfully") - log.debug( 'End function.' ) + log.debug('End function.') return S_OK() - def __getPerfSONAREndpoints( self ): + def __getPerfSONAREndpoints(self): """ - Retrieve perfSONAR endpoint information directly form GOCDB. + Retrieve perfSONAR endpoint information directly from GOCDB. :return: List of perfSONAR endpoints (dictionaries) as stored by GOCDB. """ - log = self.log.getSubLogger( '__getPerfSONAREndpoints' ) - log.debug( 'Begin function ...' ) + log = self.log.getSubLogger('__getPerfSONAREndpoints') + log.debug('Begin function ...') # get perfSONAR endpoints (latency and bandwidth) form GOCDB endpointList = [] for endpointType in ['Latency', 'Bandwidth']: - result = self.GOCDBClient.getServiceEndpointInfo( 'service_type', 'net.perfSONAR.%s' % endpointType ) + result = self.GOCDBClient.getServiceEndpointInfo('service_type', 'net.perfSONAR.%s' % endpointType) if not result['OK']: - log.error( "getServiceEndpointInfo() failed with message: %s" % result['Message'] ) - return S_ERROR( 'Could not fetch %s endpoints from GOCDB' % endpointType.lower() ) + log.error("getServiceEndpointInfo() failed with message: %s" % result['Message']) + return S_ERROR('Could not fetch %s endpoints from GOCDB' % endpointType.lower()) - log.debug( 'Number of %s endpoints: %s' % ( endpointType.lower(), len( result['Value'] ) ) ) - endpointList.extend( result['Value'] ) + log.debug('Number of %s endpoints: %s' % (endpointType.lower(), len(result['Value']))) + endpointList.extend(result['Value']) - log.debug( 'Number of perfSONAR endpoints: %s' % len( endpointList ) ) - log.debug( 'End function.' ) - return S_OK( endpointList ) + log.debug('Number of perfSONAR endpoints: %s' % len(endpointList)) + log.debug('End function.') + return S_OK(endpointList) - def __preparePerfSONARConfiguration( self, endpointList ): + def __preparePerfSONARConfiguration(self, endpointList): """ Prepare a dictionary with a new CS configuration of perfSONAR endpoints. @@ -127,8 +128,8 @@ def __preparePerfSONARConfiguration( self, endpointList ): or None in case of a path pointing to a section. """ - log = self.log.getSubLogger( '__preparePerfSONARConfiguration' ) - log.debug( 'Begin function ...' ) + log = self.log.getSubLogger('__preparePerfSONARConfiguration') + log.debug('Begin function ...') # static elements of a path rootPath = '/Resources/Sites' @@ -142,25 +143,25 @@ def __preparePerfSONARConfiguration( self, endpointList ): if endpoint['DIRACSITENAME'] is None: continue - split = endpoint['DIRACSITENAME'].split( '.' ) - path = cfgPath( rootPath, split[0], endpoint['DIRACSITENAME'], extPath, endpoint['HOSTNAME'] ) + split = endpoint['DIRACSITENAME'].split('.') + path = cfgPath(rootPath, split[0], endpoint['DIRACSITENAME'], extPath, endpoint['HOSTNAME']) for name, defaultValue in options.iteritems(): newConfiguration[cfgPath(path, name)] = defaultValue # get current configuration currentConfiguration = {} for option in options.iterkeys(): - result = gConfig.getConfigurationTree( rootPath, extPath + '/', '/' + option ) + result = gConfig.getConfigurationTree(rootPath, extPath + '/', '/' + option) if not result['OK']: - log.error( "getConfigurationTree() failed with message: %s" % result['Message'] ) - return S_ERROR( 'Unable to fetch perfSONAR endpoints from CS.' ) + log.error("getConfigurationTree() failed with message: %s" % result['Message']) + return S_ERROR('Unable to fetch perfSONAR endpoints from CS.') currentConfiguration.update(result['Value']) # disable endpoints that disappeared in GOCDB - removedElements = set( currentConfiguration ) - set( newConfiguration ) - newElements = set( newConfiguration ) - set( currentConfiguration ) + removedElements = set(currentConfiguration) - set(newConfiguration) + newElements = set(newConfiguration) - set(currentConfiguration) - addedEndpoints = len( newElements )/len( options ) + addedEndpoints = len(newElements) / len(options) disabledEndpoints = 0 for path in removedElements: if baseOptionName in path: @@ -170,18 +171,18 @@ def __preparePerfSONARConfiguration( self, endpointList ): # inform what will be changed if addedEndpoints > 0: - self.log.info( "%s new perfSONAR endpoints will be added to the configuration" % addedEndpoints ) + self.log.info("%s new perfSONAR endpoints will be added to the configuration" % addedEndpoints) if disabledEndpoints > 0: - self.log.info( "%s old perfSONAR endpoints will be disable in the configuration" % disabledEndpoints ) + self.log.info("%s old perfSONAR endpoints will be disable in the configuration" % disabledEndpoints) if addedEndpoints == 0 and disabledEndpoints == 0: - self.log.info( "perfSONAR configuration is up-to-date" ) + self.log.info("perfSONAR configuration is up-to-date") - log.debug( 'End function.' ) - return S_OK( newConfiguration ) + log.debug('End function.') + return S_OK(newConfiguration) - def __addDIRACSiteName( self, inputList ): + def __addDIRACSiteName(self, inputList): """ Extend given list of GOCDB endpoints with DIRAC site name, i.e. add an entry "DIRACSITENAME" in dictionaries that describe endpoints. @@ -190,18 +191,18 @@ def __addDIRACSiteName( self, inputList ): :return: List of perfSONAR endpoints (dictionaries). """ - log = self.log.getSubLogger( '__addDIRACSiteName' ) - log.debug( 'Begin function ...' ) + log = self.log.getSubLogger('__addDIRACSiteName') + log.debug('Begin function ...') # get site name dictionary result = getDIRACGOCDictionary() if not result['OK']: - log.error( "getDIRACGOCDictionary() failed with message: %s" % result['Message'] ) - return S_ERROR( 'Could not get site name dictionary' ) + log.error("getDIRACGOCDictionary() failed with message: %s" % result['Message']) + return S_ERROR('Could not get site name dictionary') # reverse the dictionary (assume 1 to 1 relation) DIRACGOCDict = result['Value'] - GOCDIRACDict = dict( zip( DIRACGOCDict.values(), DIRACGOCDict.keys() ) ) + GOCDIRACDict = dict(zip(DIRACGOCDict.values(), DIRACGOCDict.keys())) # add DIRAC site names outputList = [] @@ -209,14 +210,14 @@ def __addDIRACSiteName( self, inputList ): try: entry['DIRACSITENAME'] = GOCDIRACDict[entry['SITENAME']] except KeyError: - self.log.warn( "No dictionary entry for %s. " % entry['SITENAME'] ) + self.log.warn("No dictionary entry for %s. " % entry['SITENAME']) entry['DIRACSITENAME'] = None - outputList.append( entry ) + outputList.append(entry) - log.debug( 'End function.' ) - return S_OK( outputList ) + log.debug('End function.') + return S_OK(outputList) - def __updateConfiguration( self, setElements = None, delElements = None ): + def __updateConfiguration(self, setElements=None, delElements=None): """ Update configuration stored by CS. """ @@ -225,8 +226,8 @@ def __updateConfiguration( self, setElements = None, delElements = None ): if delElements is None: delElements = [] - log = self.log.getSubLogger( '__updateConfiguration' ) - log.debug( 'Begin function ...' ) + log = self.log.getSubLogger('__updateConfiguration') + log.debug('Begin function ...') # assure existence and proper value of a section or an option for path, value in setElements.iteritems(): @@ -234,50 +235,49 @@ def __updateConfiguration( self, setElements = None, delElements = None ): if value is None: section = path else: - split = path.rsplit( '/', 1 ) + split = path.rsplit('/', 1) section = split[0] try: - result = self.csAPI.createSection( section ) + result = self.csAPI.createSection(section) if not result['OK']: - log.error( "createSection() failed with message: %s" % result['Message'] ) + log.error("createSection() failed with message: %s" % result['Message']) except Exception as e: - log.error( "Exception in createSection(): %s" % repr( e ).replace( ',)', ')' ) ) + log.error("Exception in createSection(): %s" % repr(e).replace(',)', ')')) if value is not None: try: - result = self.csAPI.setOption( path, value ) + result = self.csAPI.setOption(path, value) if not result['OK']: - log.error( "setOption() failed with message: %s" % result['Message'] ) + log.error("setOption() failed with message: %s" % result['Message']) except Exception as e: - log.error( "Exception in setOption(): %s" % repr( e ).replace( ',)', ')' ) ) + log.error("Exception in setOption(): %s" % repr(e).replace(',)', ')')) # delete elements in the configuration for path in delElements: - result = self.csAPI.delOption( path ) + result = self.csAPI.delOption(path) if not result['OK']: - log.warn( "delOption() failed with message: %s" % result['Message'] ) + log.warn("delOption() failed with message: %s" % result['Message']) - result = self.csAPI.delSection( path ) + result = self.csAPI.delSection(path) if not result['OK']: - log.warn( "delSection() failed with message: %s" % result['Message'] ) + log.warn("delSection() failed with message: %s" % result['Message']) if self.dryRun: - log.info( "Dry Run: CS won't be updated" ) + log.info("Dry Run: CS won't be updated") self.csAPI.showDiff() else: # update configuration stored by CS result = self.csAPI.commit() if not result['OK']: - log.error( "commit() failed with message: %s" % result['Message'] ) - return S_ERROR( "Could not commit changes to CS." ) + log.error("commit() failed with message: %s" % result['Message']) + return S_ERROR("Could not commit changes to CS.") else: log.info("Committed changes to CS") - log.debug( 'End function.' ) + log.debug('End function.') return S_OK() - # define mapping between an agent option in the configuration and a function call - __functionMap = { 'UpdatePerfSONARS': updatePerfSONARConfiguration, - } + __functionMap = {'UpdatePerfSONARS': updatePerfSONARConfiguration, + } diff --git a/ConfigurationSystem/Service/ConfigurationHandler.py b/ConfigurationSystem/Service/ConfigurationHandler.py index 6d49dbbbfa3..2b98e9aaf67 100755 --- a/ConfigurationSystem/Service/ConfigurationHandler.py +++ b/ConfigurationSystem/Service/ConfigurationHandler.py @@ -11,6 +11,7 @@ gServiceInterface = None gPilotSynchronizer = None + def initializeConfigurationHandler(serviceInfo): global gServiceInterface gServiceInterface = ServiceInterface(serviceInfo['URL']) @@ -59,7 +60,8 @@ def export_commitNewData(self, sData): return res # Check the flag for updating the pilot 3 JSON file - if self.srv_getCSOption('UpdatePilotCStoJSONFile', False) and gServiceInterface.isMaster(): + updatePilotCStoJSONFileFlag = self.srv_getCSOption('UpdatePilotCStoJSONFile', False) + if updatePilotCStoJSONFileFlag and gServiceInterface.isMaster(): if gPilotSynchronizer is None: try: # This import is only needed for the Master CS service, making it conditional avoids diff --git a/ConfigurationSystem/private/ServiceInterface.py b/ConfigurationSystem/private/ServiceInterface.py index 37f016c2f74..78bbec8e727 100755 --- a/ConfigurationSystem/private/ServiceInterface.py +++ b/ConfigurationSystem/private/ServiceInterface.py @@ -1,6 +1,8 @@ """ Threaded implementation of services """ +__RCSID__ = "$Id$" + import os import time import re @@ -16,250 +18,247 @@ from DIRAC.Core.Utilities.ReturnValues import S_OK, S_ERROR from DIRAC.Core.DISET.RPCClient import RPCClient -__RCSID__ = "$Id$" +class ServiceInterface(threading.Thread): -class ServiceInterface( threading.Thread ): - - def __init__( self, sURL ): - threading.Thread.__init__( self ) + def __init__(self, sURL): + threading.Thread.__init__(self) self.sURL = sURL - gLogger.info( "Initializing Configuration Service", "URL is %s" % sURL ) - self.__modificationsIgnoreMask = [ '/DIRAC/Configuration/Servers', '/DIRAC/Configuration/Version' ] + gLogger.info("Initializing Configuration Service", "URL is %s" % sURL) + self.__modificationsIgnoreMask = ['/DIRAC/Configuration/Servers', '/DIRAC/Configuration/Version'] gConfigurationData.setAsService() if not gConfigurationData.isMaster(): - gLogger.info( "Starting configuration service as slave" ) - gRefresher.autoRefreshAndPublish( self.sURL ) + gLogger.info("Starting configuration service as slave") + gRefresher.autoRefreshAndPublish(self.sURL) else: - gLogger.info( "Starting configuration service as master" ) + gLogger.info("Starting configuration service as master") gRefresher.disable() self.__loadConfigurationData() self.dAliveSlaveServers = {} self.__launchCheckSlaves() - def isMaster( self ): + def isMaster(self): return gConfigurationData.isMaster() - def __launchCheckSlaves( self ): - gLogger.info( "Starting purge slaves thread" ) - self.setDaemon( 1 ) + def __launchCheckSlaves(self): + gLogger.info("Starting purge slaves thread") + self.setDaemon(1) self.start() - def __loadConfigurationData( self ): - mkDir( os.path.join( DIRAC.rootPath, "etc", "csbackup" ) ) + def __loadConfigurationData(self): + mkDir(os.path.join(DIRAC.rootPath, "etc", "csbackup")) gConfigurationData.loadConfigurationData() if gConfigurationData.isMaster(): bBuiltNewConfiguration = False if not gConfigurationData.getName(): - DIRAC.abort( 10, "Missing name for the configuration to be exported!" ) + DIRAC.abort(10, "Missing name for the configuration to be exported!") gConfigurationData.exportName() sVersion = gConfigurationData.getVersion() if sVersion == "0": - gLogger.info( "There's no version. Generating a new one" ) + gLogger.info("There's no version. Generating a new one") gConfigurationData.generateNewVersion() bBuiltNewConfiguration = True if self.sURL not in gConfigurationData.getServers(): - gConfigurationData.setServers( self.sURL ) + gConfigurationData.setServers(self.sURL) bBuiltNewConfiguration = True - gConfigurationData.setMasterServer( self.sURL ) + gConfigurationData.setMasterServer(self.sURL) if bBuiltNewConfiguration: gConfigurationData.writeRemoteConfigurationToDisk() - def __generateNewVersion( self ): + def __generateNewVersion(self): if gConfigurationData.isMaster(): gConfigurationData.generateNewVersion() gConfigurationData.writeRemoteConfigurationToDisk() - def publishSlaveServer( self, sSlaveURL ): + def publishSlaveServer(self, sSlaveURL): if not gConfigurationData.isMaster(): - return S_ERROR( "Configuration modification is not allowed in this server" ) - gLogger.info( "Pinging slave %s" % sSlaveURL ) - rpcClient = RPCClient( sSlaveURL, timeout = 10, useCertificates = True ) + return S_ERROR("Configuration modification is not allowed in this server") + gLogger.info("Pinging slave %s" % sSlaveURL) + rpcClient = RPCClient(sSlaveURL, timeout=10, useCertificates=True) retVal = rpcClient.ping() - if not retVal[ 'OK' ]: - gLogger.info( "Slave %s didn't reply" % sSlaveURL ) + if not retVal['OK']: + gLogger.info("Slave %s didn't reply" % sSlaveURL) return - if retVal[ 'Value' ][ 'name' ] != 'Configuration/Server': - gLogger.info( "Slave %s is not a CS serveR" % sSlaveURL ) + if retVal['Value']['name'] != 'Configuration/Server': + gLogger.info("Slave %s is not a CS serveR" % sSlaveURL) return bNewSlave = False - if not sSlaveURL in self.dAliveSlaveServers.keys(): + if sSlaveURL not in self.dAliveSlaveServers: bNewSlave = True - gLogger.info( "New slave registered", sSlaveURL ) - self.dAliveSlaveServers[ sSlaveURL ] = time.time() + gLogger.info("New slave registered", sSlaveURL) + self.dAliveSlaveServers[sSlaveURL] = time.time() if bNewSlave: - gConfigurationData.setServers( "%s, %s" % ( self.sURL, - ", ".join( self.dAliveSlaveServers.keys() ) ) ) + gConfigurationData.setServers("%s, %s" % (self.sURL, + ", ".join(self.dAliveSlaveServers.keys()))) self.__generateNewVersion() - def __checkSlavesStatus( self, forceWriteConfiguration = False ): - gLogger.info( "Checking status of slave servers" ) + def __checkSlavesStatus(self, forceWriteConfiguration=False): + gLogger.info("Checking status of slave servers") iGraceTime = gConfigurationData.getSlavesGraceTime() - lSlaveURLs = self.dAliveSlaveServers.keys() bModifiedSlaveServers = False - for sSlaveURL in lSlaveURLs: - if time.time() - self.dAliveSlaveServers[ sSlaveURL ] > iGraceTime: - gLogger.info( "Found dead slave", sSlaveURL ) - del self.dAliveSlaveServers[ sSlaveURL ] + for sSlaveURL in self.dAliveSlaveServers: + if time.time() - self.dAliveSlaveServers[sSlaveURL] > iGraceTime: + gLogger.info("Found dead slave", sSlaveURL) + del self.dAliveSlaveServers[sSlaveURL] bModifiedSlaveServers = True if bModifiedSlaveServers or forceWriteConfiguration: - gConfigurationData.setServers( "%s, %s" % ( self.sURL, - ", ".join( self.dAliveSlaveServers.keys() ) ) ) + gConfigurationData.setServers("%s, %s" % (self.sURL, + ", ".join(self.dAliveSlaveServers.keys()))) self.__generateNewVersion() - def getCompressedConfiguration( self ): - sData = gConfigurationData.getCompressedData() - - def updateConfiguration( self, sBuffer, commiter = "", updateVersionOption = False ): + def updateConfiguration(self, sBuffer, commiter="", updateVersionOption=False): if not gConfigurationData.isMaster(): - return S_ERROR( "Configuration modification is not allowed in this server" ) - #Load the data in a ConfigurationData object - oRemoteConfData = ConfigurationData( False ) - oRemoteConfData.loadRemoteCFGFromCompressedMem( sBuffer ) + return S_ERROR("Configuration modification is not allowed in this server") + # Load the data in a ConfigurationData object + oRemoteConfData = ConfigurationData(False) + oRemoteConfData.loadRemoteCFGFromCompressedMem(sBuffer) if updateVersionOption: - oRemoteConfData.setVersion( gConfigurationData.getVersion() ) - #Test that remote and new versions are the same + oRemoteConfData.setVersion(gConfigurationData.getVersion()) + # Test that remote and new versions are the same sRemoteVersion = oRemoteConfData.getVersion() sLocalVersion = gConfigurationData.getVersion() - gLogger.info( "Checking versions\nremote: %s\nlocal: %s" % ( sRemoteVersion, sLocalVersion ) ) + gLogger.info("Checking versions\nremote: %s\nlocal: %s" % (sRemoteVersion, sLocalVersion)) if sRemoteVersion != sLocalVersion: if not gConfigurationData.mergingEnabled(): - return S_ERROR( "Local and remote versions differ (%s vs %s). Cannot commit." % ( sLocalVersion, sRemoteVersion ) ) + return S_ERROR("Local and remote versions differ (%s vs %s). Cannot commit." % (sLocalVersion, sRemoteVersion)) else: - gLogger.info( "AutoMerging new data!" ) + gLogger.info("AutoMerging new data!") if updateVersionOption: - return S_ERROR( "Cannot AutoMerge! version was overwritten" ) - result = self.__mergeIndependentUpdates( oRemoteConfData ) - if not result[ 'OK' ]: - gLogger.warn( "Could not AutoMerge!", result[ 'Message' ] ) - return S_ERROR( "AutoMerge failed: %s" % result[ 'Message' ] ) - requestedRemoteCFG = result[ 'Value' ] - gLogger.info( "AutoMerge successful!" ) - oRemoteConfData.setRemoteCFG( requestedRemoteCFG ) - #Test that configuration names are the same + return S_ERROR("Cannot AutoMerge! version was overwritten") + result = self.__mergeIndependentUpdates(oRemoteConfData) + if not result['OK']: + gLogger.warn("Could not AutoMerge!", result['Message']) + return S_ERROR("AutoMerge failed: %s" % result['Message']) + requestedRemoteCFG = result['Value'] + gLogger.info("AutoMerge successful!") + oRemoteConfData.setRemoteCFG(requestedRemoteCFG) + # Test that configuration names are the same sRemoteName = oRemoteConfData.getName() sLocalName = gConfigurationData.getName() if sRemoteName != sLocalName: - return S_ERROR( "Names differ: Server is %s and remote is %s" % ( sLocalName, sRemoteName ) ) - #Update and generate a new version - gLogger.info( "Committing new data..." ) + return S_ERROR("Names differ: Server is %s and remote is %s" % (sLocalName, sRemoteName)) + # Update and generate a new version + gLogger.info("Committing new data...") gConfigurationData.lock() - gLogger.info( "Setting the new CFG" ) - gConfigurationData.setRemoteCFG( oRemoteConfData.getRemoteCFG() ) + gLogger.info("Setting the new CFG") + gConfigurationData.setRemoteCFG(oRemoteConfData.getRemoteCFG()) gConfigurationData.unlock() - gLogger.info( "Generating new version" ) + gLogger.info("Generating new version") gConfigurationData.generateNewVersion() - #self.__checkSlavesStatus( forceWriteConfiguration = True ) - gLogger.info( "Writing new version to disk!" ) - retVal = gConfigurationData.writeRemoteConfigurationToDisk( "%s@%s" % ( commiter, gConfigurationData.getVersion() ) ) - gLogger.info( "New version it is!" ) + # self.__checkSlavesStatus( forceWriteConfiguration = True ) + gLogger.info("Writing new version to disk!") + retVal = gConfigurationData.writeRemoteConfigurationToDisk("%s@%s" % (commiter, gConfigurationData.getVersion())) + gLogger.info("New version it is!") return retVal - def getCompressedConfigurationData( self ): + def getCompressedConfigurationData(self): return gConfigurationData.getCompressedData() - def getVersion( self ): + def getVersion(self): return gConfigurationData.getVersion() - def getCommitHistory( self ): - files = self.__getCfgBackups( gConfigurationData.getBackupDir() ) - backups = [ ".".join( fileName.split( "." )[1:-1] ).split( "@" ) for fileName in files ] + def getCommitHistory(self): + files = self.__getCfgBackups(gConfigurationData.getBackupDir()) + backups = [".".join(fileName.split(".")[1:-1]).split("@") for fileName in files] return backups - def run( self ): + def run(self): while True: iWaitTime = gConfigurationData.getSlavesGraceTime() - time.sleep( iWaitTime ) + time.sleep(iWaitTime) self.__checkSlavesStatus() - def getVersionContents( self, date ): + def getVersionContents(self, date): backupDir = gConfigurationData.getBackupDir() - files = self.__getCfgBackups( backupDir, date ) + files = self.__getCfgBackups(backupDir, date) for fileName in files: - with zipfile.ZipFile( "%s/%s" % ( backupDir, fileName ), "r" ) as zFile: + with zipfile.ZipFile("%s/%s" % (backupDir, fileName), "r") as zFile: cfgName = zFile.namelist()[0] - retVal = S_OK( zlib.compress( zFile.read( cfgName ) , 9 ) ) + retVal = S_OK(zlib.compress(zFile.read(cfgName), 9)) return retVal - return S_ERROR( "Version %s does not exist" % date ) + return S_ERROR("Version %s does not exist" % date) - def __getCfgBackups( self, basePath, date = "", subPath = "" ): - rs = re.compile( r"^%s\..*%s.*\.zip$" % ( gConfigurationData.getName(), date ) ) - fsEntries = os.listdir( "%s/%s" % ( basePath, subPath ) ) - fsEntries.sort( reverse = True ) + def __getCfgBackups(self, basePath, date="", subPath=""): + rs = re.compile(r"^%s\..*%s.*\.zip$" % (gConfigurationData.getName(), date)) + fsEntries = os.listdir("%s/%s" % (basePath, subPath)) + fsEntries.sort(reverse=True) backupsList = [] for entry in fsEntries: - entryPath = "%s/%s/%s" % ( basePath, subPath, entry ) - if os.path.isdir( entryPath ): - backupsList.extend( self.__getCfgBackups( basePath, date, "%s/%s" % ( subPath, entry ) ) ) - elif os.path.isfile( entryPath ): - if rs.search( entry ): - backupsList.append( "%s/%s" % ( subPath, entry ) ) + entryPath = "%s/%s/%s" % (basePath, subPath, entry) + if os.path.isdir(entryPath): + backupsList.extend(self.__getCfgBackups(basePath, date, "%s/%s" % (subPath, entry))) + elif os.path.isfile(entryPath): + if rs.search(entry): + backupsList.append("%s/%s" % (subPath, entry)) return backupsList - def __getPreviousCFG( self, oRemoteConfData ): - remoteExpectedVersion = oRemoteConfData.getVersion() - backupsList = self.__getCfgBackups( gConfigurationData.getBackupDir(), date = oRemoteConfData.getVersion() ) + def __getPreviousCFG(self, oRemoteConfData): + backupsList = self.__getCfgBackups(gConfigurationData.getBackupDir(), date=oRemoteConfData.getVersion()) if not backupsList: - return S_ERROR( "Could not AutoMerge. Could not retrieve original commiter's version" ) + return S_ERROR("Could not AutoMerge. Could not retrieve original commiter's version") prevRemoteConfData = ConfigurationData() backFile = backupsList[0] if backFile[0] == "/": - backFile = os.path.join( gConfigurationData.getBackupDir(), backFile[1:] ) + backFile = os.path.join(gConfigurationData.getBackupDir(), backFile[1:]) try: - prevRemoteConfData.loadConfigurationData( backFile ) + prevRemoteConfData.loadConfigurationData(backFile) except Exception as e: - return S_ERROR( "Could not load original commiter's version: %s" % str( e ) ) - gLogger.info( "Loaded client original version %s" % prevRemoteConfData.getVersion() ) - return S_OK( prevRemoteConfData.getRemoteCFG() ) + return S_ERROR("Could not load original commiter's version: %s" % str(e)) + gLogger.info("Loaded client original version %s" % prevRemoteConfData.getVersion()) + return S_OK(prevRemoteConfData.getRemoteCFG()) - def _checkConflictsInModifications( self, realModList, reqModList, parentSection = "" ): - realModifiedSections = dict( [ ( modAc[1], modAc[3] ) for modAc in realModList if modAc[0].find( 'Sec' ) == len( modAc[0] ) - 3 ] ) - reqOptionsModificationList = dict( [ ( modAc[1], modAc[3] ) for modAc in reqModList if modAc[0].find( 'Opt' ) == len( modAc[0] ) - 3 ] ) - optionModRequests = 0 + def _checkConflictsInModifications(self, realModList, reqModList, parentSection=""): + realModifiedSections = dict([(modAc[1], modAc[3]) + for modAc in realModList if modAc[0].find('Sec') == len(modAc[0]) - 3]) + reqOptionsModificationList = dict([(modAc[1], modAc[3]) + for modAc in reqModList if modAc[0].find('Opt') == len(modAc[0]) - 3]) for modAc in reqModList: action = modAc[0] objectName = modAc[1] if action == "addSec": if objectName in realModifiedSections: - return S_ERROR( "Section %s/%s already exists" % ( parentSection, objectName ) ) + return S_ERROR("Section %s/%s already exists" % (parentSection, objectName)) elif action == "delSec": if objectName in realModifiedSections: - return S_ERROR( "Section %s/%s cannot be deleted. It has been modified." % ( parentSection, objectName ) ) + return S_ERROR("Section %s/%s cannot be deleted. It has been modified." % (parentSection, objectName)) elif action == "modSec": if objectName in realModifiedSections: - result = self._checkConflictsInModifications( realModifiedSections[ objectName ], - modAc[3], "%s/%s" % ( parentSection, objectName ) ) - if not result[ 'OK' ]: + result = self._checkConflictsInModifications(realModifiedSections[objectName], + modAc[3], "%s/%s" % (parentSection, objectName)) + if not result['OK']: return result for modAc in realModList: action = modAc[0] objectName = modAc[1] - if action.find( "Opt" ) == len( action ) - 3: - return S_ERROR( "Section %s cannot be merged. Option %s/%s has been modified" % ( parentSection, parentSection, objectName ) ) + if action.find("Opt") == len(action) - 3: + return S_ERROR( + "Section %s cannot be merged. Option %s/%s has been modified" % + (parentSection, parentSection, objectName)) return S_OK() - def __mergeIndependentUpdates( self, oRemoteConfData ): - #return S_ERROR( "AutoMerge is still not finished. Meanwhile... why don't you get the newest conf and update from there?" ) - #Get all the CFGs + def __mergeIndependentUpdates(self, oRemoteConfData): + # return S_ERROR( "AutoMerge is still not finished. + # Meanwhile... why don't you get the newest conf and update from there?" ) + # Get all the CFGs curSrvCFG = gConfigurationData.getRemoteCFG().clone() curCliCFG = oRemoteConfData.getRemoteCFG().clone() - result = self.__getPreviousCFG( oRemoteConfData ) - if not result[ 'OK' ]: + result = self.__getPreviousCFG(oRemoteConfData) + if not result['OK']: return result - prevCliCFG = result[ 'Value' ] - #Try to merge curCli with curSrv. To do so we check the updates from + prevCliCFG = result['Value'] + # Try to merge curCli with curSrv. To do so we check the updates from # prevCli -> curSrv VS prevCli -> curCli - prevCliToCurCliModList = prevCliCFG.getModifications( curCliCFG ) - prevCliToCurSrvModList = prevCliCFG.getModifications( curSrvCFG ) - result = self._checkConflictsInModifications( prevCliToCurSrvModList, - prevCliToCurCliModList ) - if not result[ 'OK' ]: - return S_ERROR( "Cannot AutoMerge: %s" % result[ 'Message' ] ) - #Merge! - result = curSrvCFG.applyModifications( prevCliToCurCliModList ) - if not result[ 'OK' ]: + prevCliToCurCliModList = prevCliCFG.getModifications(curCliCFG) + prevCliToCurSrvModList = prevCliCFG.getModifications(curSrvCFG) + result = self._checkConflictsInModifications(prevCliToCurSrvModList, + prevCliToCurCliModList) + if not result['OK']: + return S_ERROR("Cannot AutoMerge: %s" % result['Message']) + # Merge! + result = curSrvCFG.applyModifications(prevCliToCurCliModList) + if not result['OK']: return result - return S_OK( curSrvCFG ) + return S_OK(curSrvCFG) diff --git a/Core/LCG/GOCDBClient.py b/Core/LCG/GOCDBClient.py index b85824708d4..6617c8e9301 100644 --- a/Core/LCG/GOCDBClient.py +++ b/Core/LCG/GOCDBClient.py @@ -5,6 +5,7 @@ __RCSID__ = "$Id$" + import time import socket @@ -15,14 +16,15 @@ from DIRAC import S_OK, S_ERROR, gLogger -def _parseSingleElement( element, attributes = None ): + +def _parseSingleElement(element, attributes=None): """ Given a DOM Element, return a dictionary of its child elements and values (as strings). """ handler = {} for child in element.childNodes: - attrName = str( child.nodeName ) + attrName = str(child.nodeName) if attributes is not None: if attrName not in attributes: continue @@ -37,7 +39,7 @@ def _parseSingleElement( element, attributes = None ): for subchild in child.childNodes: if subchild.childNodes: dct = {} - for subsubchild in subchild.childNodes: + for subsubchild in subchild.childNodes: if subsubchild.childNodes: dct[subsubchild.nodeName.encode('utf-8')] = subsubchild.childNodes[0].nodeValue.encode('utf-8') handler.setdefault('EXTENSIONS', []).append(dct) @@ -55,14 +57,14 @@ def _parseSingleElement( element, attributes = None ): ############################################################################# -class GOCDBClient( object ): +class GOCDBClient(object): """ Class for dealing with GOCDB. Class because of easier use from RSS """ ############################################################################# - def getStatus( self, granularity, name = None, startDate = None, - startingInHours = None, timeout = None ): + def getStatus(self, granularity, name=None, startDate=None, + startingInHours=None, timeout=None): """ Return actual GOCDB status of entity in `name` @@ -111,24 +113,24 @@ def getStatus( self, granularity, name = None, startDate = None, if startingInHours is not None: startDate = datetime.utcnow() - startDateMax = startDate + timedelta( hours = startingInHours ) + startDateMax = startDate + timedelta(hours=startingInHours) if startDate is not None: - if isinstance( startDate, basestring ): + if isinstance(startDate, basestring): startDate_STR = startDate - startDate = datetime( *time.strptime( startDate, "%Y-%m-%d" )[0:3] ) - elif isinstance( startDate, datetime ): - startDate_STR = startDate.isoformat( ' ' )[0:10] + startDate = datetime(*time.strptime(startDate, "%Y-%m-%d")[0:3]) + elif isinstance(startDate, datetime): + startDate_STR = startDate.isoformat(' ')[0:10] if timeout is not None: - socket.setdefaulttimeout( 10 ) + socket.setdefaulttimeout(10) if startingInHours is not None: - # make 2 queries and later merge the results + # make 2 queries and later merge the results # first call: pass the startDate argument as None, # so the curlDownload method will search for only ongoing DTs - resXML_ongoing = self._downTimeCurlDownload( name ) + resXML_ongoing = self._downTimeCurlDownload(name) if resXML_ongoing is None: resOngoing = {} else: @@ -136,7 +138,7 @@ def getStatus( self, granularity, name = None, startDate = None, resXML_ongoing, granularity, name) # second call: pass the startDate argument - resXML_startDate = self._downTimeCurlDownload( name, startDate_STR ) + resXML_startDate = self._downTimeCurlDownload(name, startDate_STR) if resXML_startDate is None: resStartDate = {} else: @@ -150,12 +152,12 @@ def getStatus( self, granularity, name = None, startDate = None, res[k] = resStartDate[k] else: - #just query for onGoing downtimes - resXML = self._downTimeCurlDownload( name, startDate_STR ) + # just query for onGoing downtimes + resXML = self._downTimeCurlDownload(name, startDate_STR) if resXML is None: - return S_OK( None ) + return S_OK(None) - res = self._downTimeXMLParsing( resXML, granularity, name, startDateMax ) + res = self._downTimeXMLParsing(resXML, granularity, name, startDateMax) # Common: build URL # if res is None or res == []: @@ -163,16 +165,12 @@ def getStatus( self, granularity, name = None, startDate = None, # # self.buildURL(res) - if res == {}: res = None - return S_OK( res ) - - -############################################################################# + return S_OK(res) - def getServiceEndpointInfo( self, granularity, entity ): + def getServiceEndpointInfo(self, granularity, entity): """ Get service endpoint info (in a dictionary) @@ -182,26 +180,23 @@ def getServiceEndpointInfo( self, granularity, entity ): :attr:`entity` : a string. Actual name of the entity. """ - assert isinstance( granularity, basestring ) and isinstance( entity, basestring ) + assert isinstance(granularity, basestring) and isinstance(entity, basestring) try: - serviceXML = self._getServiceEndpointCurlDownload( granularity, entity ) - return S_OK( self._serviceEndpointXMLParsing( serviceXML ) ) + serviceXML = self._getServiceEndpointCurlDownload(granularity, entity) + return S_OK(self._serviceEndpointXMLParsing(serviceXML)) except Exception as e: - _msg = 'Exception getting information for %s %s: %s' % ( granularity, entity, e ) - gLogger.exception( _msg ) - return S_ERROR( _msg ) - - -############################################################################# + _msg = 'Exception getting information for %s %s: %s' % (granularity, entity, e) + gLogger.exception(_msg) + return S_ERROR(_msg) def getCurrentDTLinkList(self): """ Get the list of all current DTs' links """ - gDTPage = self._downTimeCurlDownload() # xml format - gResourceDT = self._downTimeXMLParsing( gDTPage, "Resource" ) # python dictionary format - gSiteDT = self._downTimeXMLParsing( gDTPage, "Site" ) # python dictionary format + gDTPage = self._downTimeCurlDownload() # xml format + gResourceDT = self._downTimeXMLParsing(gDTPage, "Resource") # python dictionary format + gSiteDT = self._downTimeXMLParsing(gDTPage, "Site") # python dictionary format currentDTLinkList = [] for dt in gResourceDT: @@ -214,7 +209,7 @@ def getCurrentDTLinkList(self): ############################################################################# - def getHostnameDowntime( self, hostname, startDate = None, ongoing = False): + def getHostnameDowntime(self, hostname, startDate=None, ongoing=False): params = hostname @@ -261,11 +256,11 @@ def getHostnameDowntime( self, hostname, startDate = None, ongoing = False): ############################################################################# - def _downTimeCurlDownload( self, entity = None, startDate = None ): + def _downTimeCurlDownload(self, entity=None, startDate=None): """ Download ongoing downtimes for entity using the GOC DB programmatic interface """ - #GOCDB-PI url and method settings + # GOCDB-PI url and method settings # # Set the GOCDB URL gocdbpi_url = "https://goc.egi.eu/gocdbpi/public/?method=get_downtime" @@ -280,11 +275,11 @@ def _downTimeCurlDownload( self, entity = None, startDate = None ): # GOCDB-PI to query gocdb_ep = gocdbpi_url if entity is not None: - if isinstance( entity, basestring ): + if isinstance(entity, basestring): gocdb_ep = gocdb_ep + "&topentity=" + entity gocdb_ep = gocdb_ep + when + gocdbpi_startDate + "&scope=" - dtPage = requests.get( gocdb_ep ) + dtPage = requests.get(gocdb_ep) dt = dtPage.text @@ -292,7 +287,7 @@ def _downTimeCurlDownload( self, entity = None, startDate = None ): ############################################################################# - def _getServiceEndpointCurlDownload( self, granularity, entity ): + def _getServiceEndpointCurlDownload(self, granularity, entity): """ Calls method `get_service_endpoint` from the GOC DB programmatic interface. @@ -302,14 +297,14 @@ def _getServiceEndpointCurlDownload( self, granularity, entity ): :attr:`entity` : a string. Actual name of the entity. """ - if not isinstance( granularity, basestring ) or not isinstance( entity, basestring ): - raise ValueError( "Arguments must be strings." ) + if not isinstance(granularity, basestring) or not isinstance(entity, basestring): + raise ValueError("Arguments must be strings.") # GOCDB-PI query gocdb_ep = "https://goc.egi.eu/gocdbpi/public/?method=get_service_endpoint&" \ + granularity + '=' + entity - service_endpoint_page = requests.get( gocdb_ep ) + service_endpoint_page = requests.get(gocdb_ep) return service_endpoint_page.text @@ -332,46 +327,46 @@ def _getServiceEndpointCurlDownload( self, granularity, entity ): ############################################################################# - def _downTimeXMLParsing( self, dt, siteOrRes, entities = None, startDateMax = None ): + def _downTimeXMLParsing(self, dt, siteOrRes, entities=None, startDateMax=None): """ Performs xml parsing from the dt string (returns a dictionary) """ dt = dt.encode('utf-8') - doc = minidom.parseString( dt ) + doc = minidom.parseString(dt) - downtimeElements = doc.getElementsByTagName( "DOWNTIME" ) + downtimeElements = doc.getElementsByTagName("DOWNTIME") dtDict = {} for dtElement in downtimeElements: - elements = _parseSingleElement( dtElement, ['SEVERITY', 'SITENAME', 'HOSTNAME', 'ENDPOINT', - 'HOSTED_BY', 'FORMATED_START_DATE', - 'FORMATED_END_DATE', 'DESCRIPTION', - 'GOCDB_PORTAL_URL', 'SERVICE_TYPE' ] ) + elements = _parseSingleElement(dtElement, ['SEVERITY', 'SITENAME', 'HOSTNAME', 'ENDPOINT', + 'HOSTED_BY', 'FORMATED_START_DATE', + 'FORMATED_END_DATE', 'DESCRIPTION', + 'GOCDB_PORTAL_URL', 'SERVICE_TYPE']) try: - dtDict[ str( dtElement.getAttributeNode( "PRIMARY_KEY" ).nodeValue ) + ' ' + elements['ENDPOINT'] ] = elements + dtDict[str(dtElement.getAttributeNode("PRIMARY_KEY").nodeValue) + ' ' + elements['ENDPOINT']] = elements except Exception: try: - dtDict[ str( dtElement.getAttributeNode( "PRIMARY_KEY" ).nodeValue ) + ' ' + elements['HOSTNAME'] ] = elements + dtDict[str(dtElement.getAttributeNode("PRIMARY_KEY").nodeValue) + ' ' + elements['HOSTNAME']] = elements except Exception: - dtDict[ str( dtElement.getAttributeNode( "PRIMARY_KEY" ).nodeValue ) + ' ' + elements['SITENAME'] ] = elements + dtDict[str(dtElement.getAttributeNode("PRIMARY_KEY").nodeValue) + ' ' + elements['SITENAME']] = elements for dtID in dtDict.keys(): # pylint: disable=consider-iterating-dictionary - if siteOrRes in ( 'Site', 'Sites' ): + if siteOrRes in ('Site', 'Sites'): if 'SITENAME' not in dtDict[dtID]: dtDict.pop(dtID) continue if entities is not None: - if not isinstance( entities, list ): + if not isinstance(entities, list): entities = [entities] if not dtDict[dtID]['SITENAME'] in entities: dtDict.pop(dtID) - elif siteOrRes in ( 'Resource', 'Resources' ): + elif siteOrRes in ('Resource', 'Resources'): if 'HOSTNAME' not in dtDict[dtID]: dtDict.pop(dtID) continue if entities is not None: - if not isinstance( entities, list ): + if not isinstance(entities, list): entities = [entities] if dtDict[dtID]['HOSTNAME'] not in entities: dtDict.pop(dtID) @@ -379,19 +374,17 @@ def _downTimeXMLParsing( self, dt, siteOrRes, entities = None, startDateMax = No if startDateMax is not None: for dtID in dtDict.keys(): # pylint: disable=consider-iterating-dictionary startDateMaxFromKeys = datetime(*time.strptime(dtDict[dtID]['FORMATED_START_DATE'], - "%Y-%m-%d %H:%M" )[0:5] ) + "%Y-%m-%d %H:%M")[0:5]) if startDateMaxFromKeys > startDateMax: dtDict.pop(dtID) return dtDict -############################################################################# - - def _serviceEndpointXMLParsing( self, serviceXML ): + def _serviceEndpointXMLParsing(self, serviceXML): """ Performs xml parsing from the service endpoint string Returns a list. """ - doc = minidom.parseString( serviceXML ) - services = doc.getElementsByTagName( "SERVICE_ENDPOINT" ) - services = [_parseSingleElement( s ) for s in services] + doc = minidom.parseString(serviceXML) + services = doc.getElementsByTagName("SERVICE_ENDPOINT") + services = [_parseSingleElement(s) for s in services] return services diff --git a/FrameworkSystem/Client/NotificationClient.py b/FrameworkSystem/Client/NotificationClient.py index 0e7b8d072b8..b758534035a 100644 --- a/FrameworkSystem/Client/NotificationClient.py +++ b/FrameworkSystem/Client/NotificationClient.py @@ -5,33 +5,25 @@ __RCSID__ = "$Id$" from DIRAC import gLogger, S_ERROR -from DIRAC.Core.DISET.RPCClient import RPCClient +from DIRAC.Core.Base.Client import Client from DIRAC.Core.Utilities.Mail import Mail -class NotificationClient(object): - ############################################################################# - def __init__( self, rpcFunctor = False ): +class NotificationClient(Client): + + def __init__(self): """ Notification Client constructor """ - self.log = gLogger.getSubLogger( 'NotificationClient' ) - if rpcFunctor: - self.__rpcFunctor = rpcFunctor - else: - self.__rpcFunctor = RPCClient - - def __getRPCClient( self, **kwargs ): - return self.__rpcFunctor( "Framework/Notification", **kwargs ) + self.log = gLogger.getSubLogger('NotificationClient') - ############################################################################# - def sendMail( self, addresses, subject, body, - fromAddress = None, localAttempt = True, html = False, avoidSpam = False ): + def sendMail(self, addresses, subject, body, + fromAddress=None, localAttempt=True, html=False, avoidSpam=False): """ Send an e-mail with subject and body to the specified address. Try to send from local area before central service by default. """ - self.log.verbose( 'Received signal to send the following mail to %s:\nSubject = %s\n%s' % ( addresses, - subject, - body ) ) + self.log.verbose('Received signal to send the following mail to %s:\nSubject = %s\n%s' % (addresses, + subject, + body)) result = S_ERROR() addresses = [addresses] if isinstance(addresses, basestring) else list(addresses) @@ -48,36 +40,35 @@ def sendMail( self, addresses, subject, body, m._fromAddress = fromAddress result = m._send() except Exception as x: - self.log.warn( 'Sending mail failed with exception:\n%s' % ( str( x ) ) ) + self.log.warn('Sending mail failed with exception:\n%s' % (str(x))) if result['OK']: - self.log.verbose( 'Mail sent successfully from local host to %s with subject %s' % ( address, subject ) ) - self.log.debug( result['Value'] ) + self.log.verbose('Mail sent successfully from local host to %s with subject %s' % (address, subject)) + self.log.debug(result['Value']) return result - self.log.warn( 'Could not send mail with the following message:\n%s\n will attempt to send via NotificationService' % result['Message'] ) + self.log.warn( + 'Could not send mail with the following message:\n%s\n will attempt to send via NotificationService' % + result['Message']) - notify = self.__getRPCClient( timeout = 120 ) - result = notify.sendMail( address, subject, body, str( fromAddress ), avoidSpam ) + result = self._getRPC().sendMail(address, subject, body, str(fromAddress), avoidSpam) if not result['OK']: - self.log.error( 'Could not send mail via central Notification service', result['Message'] ) + self.log.error('Could not send mail via central Notification service', result['Message']) return result else: - self.log.verbose( result['Value'] ) + self.log.verbose(result['Value']) return result - ############################################################################# - def sendSMS( self, userName, body, fromAddress = None ): + def sendSMS(self, userName, body, fromAddress=None): """ Send an SMS with body to the specified DIRAC user name. """ - self.log.verbose( 'Received signal to send the following SMS to %s:\n%s' % ( userName, body ) ) - notify = RPCClient( 'Framework/Notification', timeout = 120 ) - result = notify.sendSMS( userName, body, str( fromAddress ) ) + self.log.verbose('Received signal to send the following SMS to %s:\n%s' % (userName, body)) + result = self._getRPC().sendSMS(userName, body, str(fromAddress)) if not result['OK']: - self.log.error( 'Could not send SMS via central Notification service', result['Message'] ) + self.log.error('Could not send SMS via central Notification service', result['Message']) else: - self.log.verbose( result['Value'] ) + self.log.verbose(result['Value']) return result @@ -85,94 +76,34 @@ def sendSMS( self, userName, body, fromAddress = None ): # ALARMS ########################################################################### - def newAlarm( self, subject, status, notifications, assignee, body, priority, alarmKey = "" ): - rpcClient = self.__getRPCClient() - if not isinstance( notifications, (list, tuple) ): - return S_ERROR( "Notifications parameter has to be a list or a tuple with a combination of [ 'Web', 'Mail', 'SMS' ]" ) - alarmDef = { 'subject' : subject, 'status' : status, - 'notifications' : notifications, 'assignee' : assignee, - 'priority' : priority, 'body' : body } + def newAlarm(self, subject, status, notifications, assignee, body, priority, alarmKey=""): + if not isinstance(notifications, (list, tuple)): + return S_ERROR( + "Notifications parameter has to be a list or a tuple with a combination of [ 'Web', 'Mail', 'SMS' ]") + alarmDef = {'subject': subject, 'status': status, + 'notifications': notifications, 'assignee': assignee, + 'priority': priority, 'body': body} if alarmKey: - alarmDef[ 'alarmKey' ] = alarmKey - return rpcClient.newAlarm( alarmDef ) + alarmDef['alarmKey'] = alarmKey + return self._getRPC().newAlarm(alarmDef) - def updateAlarm( self, id = -1, alarmKey = "", comment = False, modDict = {} ): - rpcClient = self.__getRPCClient() + def updateAlarm(self, id=-1, alarmKey="", comment=False, modDict={}): if id == -1 and not alarmKey: - return S_ERROR( "Need either alarm id or key to update an alarm!" ) - updateReq = { 'comment' : comment, 'modifications' : modDict } + return S_ERROR("Need either alarm id or key to update an alarm!") + updateReq = {'comment': comment, 'modifications': modDict} if id != -1: - updateReq[ 'id' ] = id + updateReq['id'] = id if alarmKey: - updateReq[ 'alarmKey' ] = alarmKey - return rpcClient.updateAlarm( updateReq ) - - def getAlarms( self, selectDict, sortList, startItem, maxItems ): - rpcClient = self.__getRPCClient() - return rpcClient.getAlarms( selectDict, sortList, startItem, maxItems ) - - def getAlarmInfo( self, alarmId ): - rpcClient = self.__getRPCClient() - return rpcClient.getAlarmInfo( alarmId ) - - def deleteAlarmsById( self, alarmIdList ): - rpcClient = self.__getRPCClient() - return rpcClient.deleteAlarmsByAlarmId( alarmIdList ) - - def deleteAlarmsByKey( self, alarmKeyList ): - rpcClient = self.__getRPCClient() - return rpcClient.deleteAlarmsByAlarmKey( alarmKeyList ) - - ########################################################################### - # MANANGE ASSIGNEE GROUPS - ########################################################################### - - def setAssigneeGroup( self, groupName, userList ): - rpcClient = self.__getRPCClient() - return rpcClient.setAssigneeGroup( groupName, userList ) - - def getUsersInAssigneeGroup( self, groupName ): - rpcClient = self.__getRPCClient() - return rpcClient.getUsersInAssigneeGroup( groupName ) - - def deleteAssigneeGroup( self, groupName ): - rpcClient = self.__getRPCClient() - return rpcClient.deleteAssigneeGroup( groupName ) - - def getAssigneeGroups( self ): - rpcClient = self.__getRPCClient() - return rpcClient.getAssigneeGroups() - - def getAssigneeGroupsForUser( self, user ): - rpcClient = self.__getRPCClient() - return rpcClient.getAssigneeGroupsForUser( user ) + updateReq['alarmKey'] = alarmKey + return self._getRPC().updateAlarm(updateReq) ########################################################################### # MANAGE NOTIFICATIONS ########################################################################### - def addNotificationForUser( self, user, message, lifetime = 604800, deferToMail = True ): - rpcClient = self.__getRPCClient() + def addNotificationForUser(self, user, message, lifetime=604800, deferToMail=True): try: - lifetime = int( lifetime ) - except: - return S_ERROR( "Message lifetime has to be a non decimal number" ) - return rpcClient.addNotificationForUser( user, message, lifetime, deferToMail ) - - def removeNotificationsForUser( self, user, notIds ): - rpcClient = self.__getRPCClient() - return rpcClient.removeNotificationsForUser( user, notIds ) - - def markNotificationsAsRead( self, user, notIds = [] ): - rpcClient = self.__getRPCClient() - return rpcClient.markNotificationsAsRead( user, notIds ) - - def markNotificationsAsNotRead( self, user, notIds = [] ): - rpcClient = self.__getRPCClient() - return rpcClient.markNotificationsAsNotRead( user, notIds ) - - def getNotifications( self, selectDict, sortList, startItem, maxItems ): - rpcClient = self.__getRPCClient() - return rpcClient.getNotifications( selectDict, sortList, startItem, maxItems ) - -#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF#EOF# + lifetime = int(lifetime) + except BaseException: + return S_ERROR("Message lifetime has to be a non decimal number") + return self._getRPC().addNotificationForUser(user, message, lifetime, deferToMail) diff --git a/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py b/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py index d553ccc6f98..bfaf0644a1b 100644 --- a/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py +++ b/WorkloadManagementSystem/Utilities/PilotCStoJSONSynchronizer.py @@ -23,20 +23,20 @@ class PilotCStoJSONSynchronizer(object): - ''' + """ 2 functions are executed: - It updates a JSON file with the values on the CS which can be used by Pilot3 pilots - It updates the pilot 3 files This synchronizer can be triggered at any time via PilotCStoJSONSynchronizer().sync(). As it is today, this is triggered every time there is a successful write on the CS. - ''' + """ def __init__(self): - ''' c'tor + """ c'tor Just setting defaults - ''' + """ self.jsonFile = 'pilot.json' # default filename of the pilot json file # domain name of the web server used to upload the pilot json file and the pilot scripts @@ -56,21 +56,23 @@ def __init__(self): self.pilotVOVersion = '' def sync(self): - ''' Main synchronizer method. - ''' + """ Main synchronizer method. + """ ops = Operations() self.pilotFileServer = ops.getValue("Pilot/pilotFileServer", self.pilotFileServer) if not self.pilotFileServer: - gLogger.warn("Pilot file server not defined, so won't sync but only display") + gLogger.warn("The /Operations//Pilot/pilotFileServer option is not defined") + gLogger.warn("Pilot 3 files won't be updated, and you won't be able to use Pilot 3") + gLogger.warn("The Synchronization steps are anyway displayed") gLogger.notice('-- Synchronizing the content of the JSON file %s with the content of the CS --' % self.jsonFile) - self.pilotRepo = ops.getValue("pilotRepo", self.pilotRepo) - self.pilotVORepo = ops.getValue("pilotVORepo", self.pilotVORepo) - self.projectDir = ops.getValue("projectDir", self.projectDir) - self.pilotScriptPath = ops.getValue("pilotScriptsPath", self.pilotScriptPath) - self.pilotVOScriptPath = ops.getValue("pilotVOScriptsPath", self.pilotVOScriptPath) + self.pilotRepo = ops.getValue("Pilot/pilotRepo", self.pilotRepo) + self.pilotVORepo = ops.getValue("Pilot/pilotVORepo", self.pilotVORepo) + self.projectDir = ops.getValue("Pilot/projectDir", self.projectDir) + self.pilotScriptPath = ops.getValue("Pilot/pilotScriptsPath", self.pilotScriptPath) + self.pilotVOScriptPath = ops.getValue("Pilot/pilotVOScriptsPath", self.pilotVOScriptPath) result = self._syncJSONFile() if not result['OK']: @@ -84,8 +86,8 @@ def sync(self): return S_OK() def _syncJSONFile(self): - ''' Creates the pilot dictionary from the CS, ready for encoding as JSON - ''' + """ Creates the pilot dictionary from the CS, ready for encoding as JSON + """ pilotDict = self._getCSDict() result = self._upload(pilotDict=pilotDict) @@ -259,10 +261,11 @@ def _syncScripts(self): scriptDir = (os.path.join('pilotVOLocalRepo', self.projectDir, self.pilotVOScriptPath, "*.py")) for fileVO in glob.glob(scriptDir): result = self._upload(filename=os.path.basename(fileVO), pilotScript=fileVO) + if not result['OK']: + gLogger.error("Error uploading the VO pilot script: %s" % result['Message']) tarFiles.append(fileVO) - if not result['OK']: - gLogger.error("Error uploading the VO pilot script: %s" % result['Message']) - return result + else: + gLogger.warn("The /Operations//Pilot/pilotVORepo option is not defined") # DIRAC repo if os.path.isdir('pilotLocalRepo'): @@ -290,12 +293,16 @@ def _syncScripts(self): for filename in glob.glob(scriptDir): result = self._upload(filename=os.path.basename(filename), pilotScript=filename) + if not result['OK']: + gLogger.error("Error uploading the pilot script: %s" % result['Message']) tarFiles.append(filename) if not os.path.isfile(os.path.join('pilotLocalRepo', self.pilotScriptPath, "dirac-install.py")): result = self._upload(filename='dirac-install.py', pilotScript=os.path.join('pilotLocalRepo', "Core/scripts/dirac-install.py")) + if not result['OK']: + gLogger.error("Error uploading dirac-install.py: %s" % result['Message']) tarFiles.append('dirac-install.py') with tarfile.TarFile(name='pilot.tar', mode='w') as tf: @@ -306,6 +313,9 @@ def _syncScripts(self): result = self._upload(filename='pilot.tar', pilotScript='pilot.tar') + if not result['OK']: + gLogger.error("Error uploading pilot.tar: %s" % result['Message']) + return result except ValueError: gLogger.error("Error uploading the pilot scripts: %s" % result['Message']) @@ -318,13 +328,14 @@ def _upload(self, pilotDict=None, filename='', pilotScript=''): if pilotDict: # this is for the pilot.json file if not self.pilotFileServer: + gLogger.warn("NOT uploading the pilot JSON file, just printing it out") print json.dumps(pilotDict, indent=4, sort_keys=True) # just print here as formatting is important return S_OK() params = urllib.urlencode({'filename': self.jsonFile, 'data': json.dumps(pilotDict)}) else: # we assume the method is asked to upload the pilots scripts if not self.pilotFileServer: - gLogger.info("NOT uploading %s" % filename) + gLogger.warn("NOT uploading %s" % filename) return S_OK() with open(pilotScript, "rb") as psf: script = psf.read()