diff --git a/Core/Utilities/TimeSeries.py b/Core/Utilities/TimeSeries.py index 0f839a4fc83..4087d04f5b7 100644 --- a/Core/Utilities/TimeSeries.py +++ b/Core/Utilities/TimeSeries.py @@ -6,7 +6,7 @@ import datetime -class TimeSeries: +class TimeSeries( object ): ########################################################################### def __init__( self, maxEntries = False, maxAge = False, minAge = False ): @@ -23,6 +23,12 @@ def __repr__( self ): def __str__( self ): return str( self.convertToList() ) + ########################################################################### + def __nonzero__( self ): + """ for comparisons + """ + return True + ########################################################################### def __len__( self ): return len( self.data ) diff --git a/DataManagementSystem/Client/FTSJob.py b/DataManagementSystem/Client/FTSJob.py index 8eb40c66208..07f40b9fe3e 100644 --- a/DataManagementSystem/Client/FTSJob.py +++ b/DataManagementSystem/Client/FTSJob.py @@ -409,6 +409,11 @@ def fileStatusList( self ): """ get list of files statuses """ return [ ftsFile.Status for ftsFile in self ] + def __nonzero__( self ): + """ for comparisons + """ + return True + def __len__( self ): """ nb of subFiles """ return len( self.__files__ ) diff --git a/RequestManagementSystem/Client/Operation.py b/RequestManagementSystem/Client/Operation.py index 26149e20e5d..7ab42c7c2c2 100644 --- a/RequestManagementSystem/Client/Operation.py +++ b/RequestManagementSystem/Client/Operation.py @@ -204,6 +204,11 @@ def fileStatusList( self ): """ get list of files statuses """ return [ subFile.Status for subFile in self ] + def __nonzero__( self ): + """ for comparisons + """ + return True + def __len__( self ): """ nb of subFiles """ return len( self.__files__ ) diff --git a/RequestManagementSystem/Client/Request.py b/RequestManagementSystem/Client/Request.py index cbcf65dd844..40ac223dd0e 100644 --- a/RequestManagementSystem/Client/Request.py +++ b/RequestManagementSystem/Client/Request.py @@ -270,6 +270,11 @@ def indexOf( self, subReq ): """ return index of subReq (execution order) """ return self.__operations__.index( subReq ) if subReq in self else -1 + def __nonzero__( self ): + """ for comparisons + """ + return True + def __len__( self ): """ nb of subRequests """ return len( self.__operations__ ) diff --git a/WorkloadManagementSystem/DB/JobDB.py b/WorkloadManagementSystem/DB/JobDB.py index 8096d909ad9..4c2f2db6718 100755 --- a/WorkloadManagementSystem/DB/JobDB.py +++ b/WorkloadManagementSystem/DB/JobDB.py @@ -4,7 +4,6 @@ The following methods are provided for public usage: - getJobID() getJobAttribute() getJobAttributes() getAllJobAttributes() @@ -14,7 +13,6 @@ getJobParameters() getAllJobParameters() getInputData() - getSubjobs() getJobJDL() selectJobs() @@ -136,27 +134,6 @@ def __getAttributeNames( self ): return S_OK() -############################################################################# - def getJobID( self ): - """Get the next unique JobID and prepare the new job insertion - """ - - cmd = 'INSERT INTO Jobs (SubmissionTime) VALUES (UTC_TIMESTAMP())' - err = 'JobDB.getJobID: Failed to retrieve a new Id.' - - res = self._update( cmd ) - if not res['OK']: - return S_ERROR( '1 %s\n%s' % ( err, res['Message'] ) ) - - if not 'lastRowId' in res['Value']: - return S_ERROR( '2 %s' % err ) - - jobID = int( res['Value']['lastRowId'] ) - - self.log.info( 'JobDB: New JobID served "%s"' % jobID ) - - return S_OK( jobID ) - ############################################################################# def getAttributesForJobList( self, jobIDList, attrList = None ): """ Get attributes for the jobs in the the jobIDList. @@ -245,9 +222,9 @@ def traceJobParameters( self, site, localIDs, paramList = None, attributeList = since = until - datetime.timedelta( hours = 24 ) else: since = None - for format in ( '%Y-%m-%d', '%Y-%m-%d %H:%M', '%Y-%m-%d %H:%M:%S' ): + for dFormat in ( '%Y-%m-%d', '%Y-%m-%d %H:%M', '%Y-%m-%d %H:%M:%S' ): try: - since = datetime.datetime.strptime( date, format ) + since = datetime.datetime.strptime( date, dFormat ) break except: exactTime = True @@ -458,58 +435,6 @@ def getJobAttributes( self, jobID, attrList = None ): return S_OK( attributes ) -############################################################################# - def getJobInfo( self, jobID, parameters = None ): - """ Get parameters for job specified by jobID. Parameters can be - either job attributes ( fields in the Jobs table ) or those - stored in the JobParameters table. - The return value is a dictionary of the structure: - Dict[Name] = Value - """ - - resultDict = {} - # Parameters are not specified, get them all - parameters + attributes - if not parameters: - result = self.getJobAttributes( jobID ) - if result['OK']: - resultDict = result['value'] - else: - return S_ERROR( 'JobDB.getJobAttributes: can not retrieve job attributes' ) - result = self.getJobParameters( jobID ) - if result['OK']: - resultDict.update( result['value'] ) - else: - return S_ERROR( 'JobDB.getJobParameters: can not retrieve job parameters' ) - return S_OK( resultDict ) - - paramList = [] - attrList = [] - for par in parameters: - if par in self.jobAttributeNames: - attrList.append( par ) - else: - paramList.append( par ) - - # Get Job Attributes first - if attrList: - result = self.getJobAttributes( jobID, attrList ) - if not result['OK']: - return result - if len( result['Value'] ) > 0: - resultDict = result['Value'] - else: - return S_ERROR( 'Job ' + str( jobID ) + ' not found' ) - - # Get Job Parameters - if paramList: - result = self.getJobParameters( jobID, paramList ) - if not result['OK']: - return result - if len( result['Value'] ) > 0: - resultDict.update( result['Value'] ) - - return S_OK( resultDict ) - ############################################################################# def getJobAttribute( self, jobID, attribute ): """ Get the given attribute of a job specified by its jobID @@ -585,40 +510,8 @@ def getJobOptParameters( self, jobID, paramList = None ): return S_ERROR( 'JobDB.getJobOptParameters: failed to retrieve parameters' ) ############################################################################# - def getTimings( self, site, period = 3600 ): - """ Get CPU and wall clock times for the jobs finished in the last hour - """ - ret = self._escapeString( site ) - if not ret['OK']: - return ret - site = ret['Value'] - - date = str( Time.dateTime() - Time.second * period ) - req = "SELECT JobID from Jobs WHERE Site=%s and EndExecTime > '%s' " % ( site, date ) - result = self._query( req ) - jobList = [ str( x[0] ) for x in result['Value'] ] - jobString = ','.join( jobList ) - - req = "SELECT SUM(Value) from JobParameters WHERE Name='TotalCPUTime(s)' and JobID in (%s)" % jobString - result = self._query( req ) - if not result['OK']: - return result - cpu = result['Value'][0][0] - if not cpu: - cpu = 0.0 - - req = "SELECT SUM(Value) from JobParameters WHERE Name='WallClockTime(s)' and JobID in (%s)" % jobString - result = self._query( req ) - if not result['OK']: - return result - wctime = result['Value'][0][0] - if not wctime: - wctime = 0.0 - return S_OK( {"CPUTime":int( cpu ), "WallClockTime":int( wctime )} ) - -############################################################################# - def getInputData ( self, jobID ): + def getInputData( self, jobID ): """Get input data for the given job """ ret = self._escapeString( jobID ) @@ -633,7 +526,7 @@ def getInputData ( self, jobID ): return S_OK( [ i[0] for i in res['Value'] if i[0].strip() ] ) ############################################################################# - def setInputData ( self, jobID, inputData ): + def setInputData( self, jobID, inputData ): """Inserts input data for the given job """ ret = self._escapeString( jobID ) @@ -704,13 +597,6 @@ def setNextOptimizer( self, jobID, currentOptimizer ): return S_OK( nextOptimizer ) ############################################################################ - def countJobs( self, condDict, older = None, newer = None, timeStamp = 'LastUpdateTime' ): - """ Get the number of jobs matching conditions specified by condDict and time limits - """ - self.log.debug ( 'JobDB.countJobs: counting Jobs' ) - return self.countEntries( 'Jobs', condDict, older = older, newer = newer, timeStamp = timeStamp ) - -############################################################################# def selectJobs( self, condDict, older = None, newer = None, timeStamp = 'LastUpdateTime', orderAttribute = None, limit = None ): """ Select jobs matching the following conditions: @@ -733,13 +619,6 @@ def selectJobs( self, condDict, older = None, newer = None, timeStamp = 'LastUpd return S_OK( [] ) return S_OK( [ self._to_value( i ) for i in res['Value'] ] ) -############################################################################# - def selectJobWithStatus( self, status ): - """ Get the list of jobs with a given Major Status - """ - - return self.selectJobs( {'Status':status} ) - ############################################################################# def setJobAttribute( self, jobID, attrName, attrValue, update = False, myDate = None ): """ Set an attribute value for job specified by jobID. @@ -947,7 +826,7 @@ def setJobOptParameter( self, jobID, name, value ): cmd = 'DELETE FROM OptimizerParameters WHERE JobID=%s AND Name=%s' % ( e_jobID, e_name ) if not self._update( cmd )['OK']: - result = S_ERROR( 'JobDB.setJobOptParameter: operation failed.' ) + return S_ERROR( 'JobDB.setJobOptParameter: operation failed.' ) result = self.insertFields( 'OptimizerParameters', ['JobID', 'Name', 'Value'], [jobID, name, value] ) if not result['OK']: @@ -999,7 +878,7 @@ def setAtticJobParameter( self, jobID, key, value, rescheduleCounter ): return ret rescheduleCounter = ret['Value'] - cmd = 'INSERT INTO AtticJobParameters VALUES(%s,%s,%s,%s)' % \ + cmd = 'INSERT INTO AtticJobParameters (JobID,RescheduleCycle,Name,Value) VALUES(%s,%s,%s,%s)' % \ ( jobID, rescheduleCounter, key, value ) result = self._update( cmd ) if not result['OK']: @@ -1130,7 +1009,6 @@ def insertNewJobIntoDB( self, jdl, owner, ownerDN, ownerGroup, diracSetup ): Do initial JDL crosscheck, Set Initial job Attributes and Status """ - jobManifest = JobManifest() result = jobManifest.load( jdl ) if not result['OK']: @@ -1246,6 +1124,17 @@ def insertNewJobIntoDB( self, jdl, owner, ownerDN, ownerGroup, diracSetup ): if not result['OK']: return result + # Adding the job in the Jobs table + result = self.insertFields( 'Jobs', jobAttrNames, jobAttrValues ) + if not result['OK']: + return result + + # Setting the Job parameters + result = self.__setInitialJobParameters( classAdJob, jobID ) + if not result['OK']: + return result + + # Looking for the Input Data inputData = [] if classAdJob.lookupAttribute( 'InputData' ): inputData = classAdJob.getListFromExpression( 'InputData' ) @@ -1273,14 +1162,6 @@ def insertNewJobIntoDB( self, jdl, owner, ownerDN, ownerGroup, diracSetup ): if not result['OK']: return result - result = self.__setInitialJobParameters( classAdJob, jobID ) - if not result['OK']: - return result - - result = self.insertFields( 'Jobs', jobAttrNames, jobAttrValues ) - if not result['OK']: - return result - retVal['Status'] = 'Received' retVal['MinorStatus'] = 'Job accepted' @@ -1403,31 +1284,16 @@ def removeJobFromDB( self, jobIDs ): else: jobIDList = jobIDs - # If this is a master job delete the children first - failedSubjobList = [] - for jobID in jobIDList: - result = self.getJobAttribute( jobID, 'JobSplitType' ) - if result['OK']: - if result['Value'] == "Master": - result = self.getSubjobs( jobID ) - if result['OK']: - subjobs = result['Value'] - if subjobs: - result = self.removeJobFromDB( subjobs ) - if not result['OK']: - failedSubjobList += subjobs - self.log.error( "Failed to delete subjobs " + str( subjobs ) + " from JobDB" ) - failedTablesList = [] jobIDString = ','.join( [str( j ) for j in jobIDList] ) - for table in ( 'JobJDLs', - 'InputData', - 'JobParameters', - 'AtticJobParameters', - 'HeartBeatLoggingInfo', - 'OptimizerParameters', - 'Jobs' - ): + for table in ['InputData', + 'JobParameters', + 'AtticJobParameters', + 'HeartBeatLoggingInfo', + 'OptimizerParameters', + 'JobCommands', + 'Jobs', + 'JobJDLs']: cmd = 'DELETE FROM %s WHERE JobID in (%s)' % ( table, jobIDString ) result = self._update( cmd ) @@ -1435,33 +1301,12 @@ def removeJobFromDB( self, jobIDs ): failedTablesList.append( table ) result = S_OK() - if failedSubjobList: - result = S_ERROR( 'Errors while job removal' ) - result['FailedSubjobs'] = failedSubjobList if failedTablesList: result = S_ERROR( 'Errors while job removal' ) result['FailedTables'] = failedTablesList return result -################################################################# - def getSubjobs( self, jobID ): - """ Get subjobs of the given job - """ - ret = self._escapeString( jobID ) - if not ret['OK']: - return ret - jobID = ret['Value'] - - cmd = "SELECT SubJobID FROM SubJobs WHERE JobID=%s" % jobID - result = self._query( cmd ) - subjobs = [] - if result['OK']: - subjobs = [ int( x[0] ) for x in result['Value']] - return S_OK( subjobs ) - else: - return result - ################################################################# def rescheduleJobs( self, jobIDs ): """ Reschedule all the jobs in the given list @@ -1744,18 +1589,18 @@ def allowSiteInMask( self, site, authorDN = 'Unknown', comment = 'No comment' ): return result ############################################################################# - def removeSiteFromMask( self, site ): + def removeSiteFromMask( self, site = None ): """ Remove the given site from the mask """ - ret = self._escapeString( site ) - if not ret['OK']: - return ret - site = ret['Value'] - - if site == "All": + if not site: req = "DELETE FROM SiteMask" else: + ret = self._escapeString( site ) + if not ret['OK']: + return ret + site = ret['Value'] req = "DELETE FROM SiteMask WHERE Site=%s" % site + return self._update( req ) ############################################################################# @@ -1805,27 +1650,6 @@ def getSiteMaskLogging( self, siteList ): return S_OK( resultDict ) ############################################################################# - def setSandboxReady( self, jobID, stype = 'InputSandbox' ): - """ Set the sandbox status ready for the job with jobID - """ - ret = self._escapeString( jobID ) - if not ret['OK']: - return ret - jobID = ret['Value'] - - - if stype == "InputSandbox": - field = "ISandboxReadyFlag" - elif stype == "OutputSandbox": - field = "OSandboxReadyFlag" - else: - return S_ERROR( 'Illegal Sandbox type: ' + stype ) - - cmd = "UPDATE Jobs SET %s='True' WHERE JobID=%s" % ( field, jobID ) - result = self._update( cmd ) - return result - -################################################################################# def getSiteSummary( self ): """ Get the summary of jobs in a given status on all the sites """ @@ -1881,7 +1705,8 @@ def getSiteSummaryWeb( self, selectDict, sortList, startItem, maxItems ): paramNames = ['Site', 'GridType', 'Country', 'Tier', 'MaskStatus'] paramNames += JOB_STATES paramNames += ['Efficiency', 'Status'] - siteT1List = ['CERN', 'IN2P3', 'NIKHEF', 'PIC', 'CNAF', 'RAL', 'GRIDKA'] + #FIXME: hack!!! + siteT1List = ['CERN', 'IN2P3', 'NIKHEF', 'SARA', 'PIC', 'CNAF', 'RAL', 'GRIDKA', 'RRCKI'] # Sort out records as requested sortItem = -1 diff --git a/WorkloadManagementSystem/DB/JobDB.sql b/WorkloadManagementSystem/DB/JobDB.sql index d5a4d07aeb7..408537cc7ce 100755 --- a/WorkloadManagementSystem/DB/JobDB.sql +++ b/WorkloadManagementSystem/DB/JobDB.sql @@ -30,174 +30,151 @@ FLUSH PRIVILEGES; USE JobDB; -- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS Jobs; -CREATE TABLE Jobs ( - JobID INTEGER NOT NULL AUTO_INCREMENT, - JobType VARCHAR(32) NOT NULL DEFAULT 'normal', - INDEX (JobType), - DIRACSetup VARCHAR(32) NOT NULL, - INDEX (DIRACSetup), - JobGroup VARCHAR(32) NOT NULL DEFAULT 'NoGroup', - INDEX (JobGroup), - JobSplitType ENUM ('Single','Master','Subjob','DAGNode') NOT NULL DEFAULT 'Single', - INDEX (JobSplitType), - MasterJobID INTEGER NOT NULL DEFAULT 0, - Site VARCHAR(100) NOT NULL DEFAULT 'ANY', - INDEX (Site), - JobName VARCHAR(128) NOT NULL DEFAULT 'Unknown', - Owner VARCHAR(32) NOT NULL DEFAULT 'Unknown', - INDEX (Owner), - OwnerDN VARCHAR(255) NOT NULL DEFAULT 'Unknown', - INDEX (OwnerDN), - OwnerGroup varchar(128) NOT NULL DEFAULT 'lhcb_user', - INDEX (OwnerGroup), - SubmissionTime DATETIME, - RescheduleTime DATETIME, - LastUpdateTime DATETIME, - StartExecTime DATETIME, - HeartBeatTime DATETIME, - EndExecTime DATETIME, - Status VARCHAR(32) NOT NULL DEFAULT 'Received', - INDEX (Status), - INDEX (Status,Site), - MinorStatus VARCHAR(128) NOT NULL DEFAULT 'Initial insertion', - INDEX (MinorStatus), - ApplicationStatus VARCHAR(255) NOT NULL DEFAULT 'Unknown', - INDEX (ApplicationStatus), - ApplicationNumStatus INTEGER NOT NULL DEFAULT 0, - CPUTime FLOAT NOT NULL DEFAULT 0.0, - UserPriority INTEGER NOT NULL DEFAULT 0, - SystemPriority INTEGER NOT NULL DEFAULT 0, - RescheduleCounter INTEGER NOT NULL DEFAULT 0, - VerifiedFlag ENUM ('True','False') NOT NULL DEFAULT 'False', - DeletedFlag ENUM ('True','False') NOT NULL DEFAULT 'False', - KilledFlag ENUM ('True','False') NOT NULL DEFAULT 'False', - FailedFlag ENUM ('True','False') NOT NULL DEFAULT 'False', - ISandboxReadyFlag ENUM ('True','False') NOT NULL DEFAULT 'False', - OSandboxReadyFlag ENUM ('True','False') NOT NULL DEFAULT 'False', - RetrievedFlag ENUM ('True','False') NOT NULL DEFAULT 'False', - AccountedFlag ENUM ('True','False','Failed') NOT NULL DEFAULT 'False', - PRIMARY KEY (JobID) -) ENGINE = InnoDB; +DROP TABLE IF EXISTS `Jobs`; +CREATE TABLE `Jobs` ( + `JobID` INT(11) UNSIGNED NOT NULL DEFAULT 0, + `JobType` VARCHAR(32) NOT NULL DEFAULT 'user', + `DIRACSetup` VARCHAR(32) NOT NULL DEFAULT 'test', + `JobGroup` VARCHAR(32) NOT NULL DEFAULT '00000000', + `JobSplitType` ENUM('Single','Master','Subjob','DAGNode') NOT NULL DEFAULT 'Single', + `MasterJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0, + `Site` VARCHAR(100) NOT NULL DEFAULT 'ANY', + `JobName` VARCHAR(128) NOT NULL DEFAULT 'Unknown', + `Owner` VARCHAR(32) NOT NULL DEFAULT 'Unknown', + `OwnerDN` VARCHAR(255) NOT NULL DEFAULT 'Unknown', + `OwnerGroup` VARCHAR(128) NOT NULL DEFAULT 'Unknown', + `SubmissionTime` DATETIME DEFAULT NULL, + `RescheduleTime` DATETIME DEFAULT NULL, + `LastUpdateTime` DATETIME DEFAULT NULL, + `StartExecTime` DATETIME DEFAULT NULL, + `HeartBeatTime` DATETIME DEFAULT NULL, + `EndExecTime` DATETIME DEFAULT NULL, + `Status` VARCHAR(32) NOT NULL DEFAULT 'Received', + `MinorStatus` VARCHAR(128) NOT NULL DEFAULT 'Unknown', + `ApplicationStatus` VARCHAR(255) DEFAULT 'Unknown', + `ApplicationNumStatus` INT(11) NOT NULL DEFAULT 'Unknown', + `CPUTime` FLOAT NOT NULL DEFAULT 0.0, + `UserPriority` INT(11) NOT NULL DEFAULT 0, + `SystemPriority` INT(11) NOT NULL DEFAULT 0, + `RescheduleCounter` INT(11) NOT NULL DEFAULT 0, + `VerifiedFlag` ENUM('True','False') NOT NULL DEFAULT 'False', + `DeletedFlag` ENUM('True','False') NOT NULL DEFAULT 'False', + `KilledFlag` ENUM('True','False') NOT NULL DEFAULT 'False', + `FailedFlag` ENUM('True','False') NOT NULL DEFAULT 'False', + `ISandboxReadyFlag` ENUM('True','False') NOT NULL DEFAULT 'False', + `OSandboxReadyFlag` ENUM('True','False') NOT NULL DEFAULT 'False', + `RetrievedFlag` ENUM('True','False') NOT NULL DEFAULT 'False', + `AccountedFlag` ENUM('True','False','Failed') NOT NULL DEFAULT 'False', + PRIMARY KEY (`JobID`), + FOREIGN KEY (`JobID`) REFERENCES `JobJDLs`(`JobID`), + KEY `JobType` (`JobType`), + KEY `DIRACSetup` (`DIRACSetup`), + KEY `JobGroup` (`JobGroup`), + KEY `JobSplitType` (`JobSplitType`), + KEY `Site` (`Site`), + KEY `Owner` (`Owner`), + KEY `OwnerDN` (`OwnerDN`), + KEY `OwnerGroup` (`OwnerGroup`), + KEY `Status` (`Status`), + KEY `MinorStatus` (`MinorStatus`), + KEY `ApplicationStatus` (`ApplicationStatus`), + KEY `StatusSite` (`Status`,`Site`), + KEY `LastUpdateTime` (`LastUpdateTime`), +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +-- ------------------------------------------------------------------------------ +DROP TABLE IF EXISTS `JobJDLs`; +CREATE TABLE `JobJDLs` ( + `JobID` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT, + `JDL` BLOB NOT NULL, + `JobRequirements` BLOB NOT NULL, + `OriginalJDL` BLOB NOT NULL, + PRIMARY KEY (`JobID`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +-- ------------------------------------------------------------------------------ +DROP TABLE IF EXISTS `InputData`; +CREATE TABLE `InputData` ( + `JobID` INT(11) UNSIGNED NOT NULL, + `LFN` VARCHAR(255) NOT NULL DEFAULT '', + `Status` VARCHAR(32) NOT NULL DEFAULT 'AprioriGood', + PRIMARY KEY (`JobID`,`LFN`), + FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +-- ------------------------------------------------------------------------------ +DROP TABLE IF EXISTS `JobParameters`; +CREATE TABLE `JobParameters` ( + `JobID` INT(11) UNSIGNED NOT NULL, + `Name` VARCHAR(100) NOT NULL, + `Value` BLOB NOT NULL, + PRIMARY KEY (`JobID`,`Name`), + FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +-- ------------------------------------------------------------------------------ +DROP TABLE IF EXISTS `OptimizerParameters`; +CREATE TABLE `OptimizerParameters` ( + `JobID` INT(11) UNSIGNED NOT NULL, + `Name` VARCHAR(100) NOT NULL, + `Value` MEDIUMBLOB NOT NULL, + PRIMARY KEY (`JobID`,`Name`), + FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +-- ------------------------------------------------------------------------------ +DROP TABLE IF EXISTS `AtticJobParameters`; +CREATE TABLE `AtticJobParameters` ( + `JobID` INT(11) UNSIGNED NOT NULL, + `Name` VARCHAR(100) NOT NULL, + `Value` BLOB NOT NULL, + `RescheduleCycle` INT(11) UNSIGNED NOT NULL, + PRIMARY KEY (`JobID`,`Name`,`RescheduleCycle`), + FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +-- ------------------------------------------------------------------------------ +DROP TABLE IF EXISTS `SiteMask`; +CREATE TABLE `SiteMask` ( + `Site` VARCHAR(64) NOT NULL, + `Status` VARCHAR(64) NOT NULL, + `LastUpdateTime` DATETIME NOT NULL, + `Author` VARCHAR(255) NOT NULL, + `Comment` BLOB NOT NULL, + PRIMARY KEY (`Site`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +DROP TABLE IF EXISTS `SiteMaskLogging`; +CREATE TABLE `SiteMaskLogging` ( + `Site` VARCHAR(64) NOT NULL, + `Status` VARCHAR(64) NOT NULL, + `UpdateTime` DATETIME NOT NULL, + `Author` VARCHAR(255) NOT NULL, + `Comment` BLOB NOT NULL, + PRIMARY KEY (`Site`,`UpdateTime`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +-- ------------------------------------------------------------------------------ +DROP TABLE IF EXISTS `HeartBeatLoggingInfo`; +CREATE TABLE `HeartBeatLoggingInfo` ( + `JobID` INT(11) UNSIGNED NOT NULL, + `Name` VARCHAR(100) NOT NULL, + `Value` BLOB NOT NULL, + `HeartBeatTime` DATETIME NOT NULL, + PRIMARY KEY (`JobID`,`Name`,`HeartBeatTime`), + FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +-- ------------------------------------------------------------------------------ +DROP TABLE IF EXISTS `JobCommands`; +CREATE TABLE `JobCommands` ( + `JobID` INT(11) UNSIGNED NOT NULL, + `Command` VARCHAR(100) NOT NULL, + `Arguments` VARCHAR(100) NOT NULL, + `Status` VARCHAR(64) NOT NULL DEFAULT 'Received', + `ReceptionTime` DATETIME NOT NULL, + `ExecutionTime` DATETIME DEFAULT NULL, + PRIMARY KEY (`JobID`,`Arguments`,`ReceptionTime`), + FOREIGN KEY (`JobID`) REFERENCES `Jobs`(`JobID`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS JobJDLs; -CREATE TABLE JobJDLs ( - JobID INTEGER NOT NULL AUTO_INCREMENT, - JDL BLOB NOT NULL DEFAULT '', - JobRequirements BLOB NOT NULL DEFAULT '', - OriginalJDL BLOB NOT NULL DEFAULT '', - PRIMARY KEY (JobID) -); - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS SubJobs; -CREATE TABLE SubJobs ( - JobID INTEGER NOT NULL, - SubJobID INTEGER NOT NULL -); - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS PrecursorJobs; -CREATE TABLE PrecursorJobs ( - JobID INTEGER NOT NULL, - PreJobID INTEGER NOT NULL -); - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS InputData; -CREATE TABLE InputData ( - JobID INTEGER NOT NULL, - Status VARCHAR(32) NOT NULL DEFAULT 'AprioriGood', - LFN VARCHAR(255), - PRIMARY KEY(JobID, LFN) -); - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS JobParameters; -CREATE TABLE JobParameters ( - JobID INTEGER NOT NULL, - Name VARCHAR(100) NOT NULL, - Value BLOB NOT NULL, - PRIMARY KEY(JobID, Name) -) ENGINE = InnoDB; - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS OptimizerParameters; -CREATE TABLE OptimizerParameters ( - JobID INTEGER NOT NULL, - Name VARCHAR(100) NOT NULL, - Value MEDIUMBLOB NOT NULL, - PRIMARY KEY(JobID, Name) -) ENGINE = InnoDB; - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS AtticJobParameters; -CREATE TABLE AtticJobParameters ( - JobID INTEGER NOT NULL, - RescheduleCycle INTEGER NOT NULL, - Name VARCHAR(100) NOT NULL, - Value BLOB NOT NULL, - PRIMARY KEY(JobID, Name, RescheduleCycle) -); - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS TaskQueues; -CREATE TABLE TaskQueues ( - TaskQueueID INTEGER NOT NULL AUTO_INCREMENT, - Priority INTEGER NOT NULL DEFAULT 0, - Requirements BLOB NOT NULL, - NumberOfJobs INTEGER NOT NULL DEFAULT 0, - PRIMARY KEY (TaskQueueID) -); - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS TaskQueue; -CREATE TABLE TaskQueue ( - TaskQueueID INTEGER NOT NULL, - JobID INTEGER NOT NULL, - Rank INTEGER NOT NULL DEFAULT 0, - PRIMARY KEY (JobID, TaskQueueID) -); - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS SiteMask; -CREATE TABLE SiteMask ( - Site VARCHAR(64) NOT NULL, - Status VARCHAR(64) NOT NULL, - LastUpdateTime DATETIME NOT NULL, - Author VARCHAR(255) NOT NULL, - Comment BLOB NOT NULL, - PRIMARY KEY (Site) -); - -DROP TABLE IF EXISTS SiteMaskLogging; -CREATE TABLE SiteMaskLogging ( - Site VARCHAR(64) NOT NULL, - Status VARCHAR(64) NOT NULL, - UpdateTime DATETIME NOT NULL, - Author VARCHAR(255) NOT NULL, - Comment BLOB NOT NULL -); - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS HeartBeatLoggingInfo; -CREATE TABLE HeartBeatLoggingInfo ( - JobID INTEGER NOT NULL, - Name VARCHAR(100) NOT NULL, - Value BLOB NOT NULL, - HeartBeatTime DATETIME NOT NULL, - INDEX (JobID) -)ENGINE = InnoDB; - --- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS JobCommands; -CREATE TABLE JobCommands ( - JobID INTEGER NOT NULL, - Command VARCHAR(100) NOT NULL, - Arguments VARCHAR(100) NOT NULL, - Status VARCHAR(64) NOT NULL DEFAULT 'Received', - ReceptionTime DATETIME NOT NULL, - ExecutionTime DATETIME, - INDEX (JobID) -); diff --git a/WorkloadManagementSystem/DB/PilotAgentsDB.sql b/WorkloadManagementSystem/DB/PilotAgentsDB.sql index 73a1009df70..90e8f3762cc 100755 --- a/WorkloadManagementSystem/DB/PilotAgentsDB.sql +++ b/WorkloadManagementSystem/DB/PilotAgentsDB.sql @@ -30,54 +30,58 @@ FLUSH PRIVILEGES; USE PilotAgentsDB; -- ------------------------------------------------------------------------------ -DROP TABLE IF EXISTS PilotAgents; -CREATE TABLE PilotAgents ( - PilotID INTEGER NOT NULL AUTO_INCREMENT, - InitialJobID INTEGER NOT NULL DEFAULT 0, - CurrentJobID INTEGER NOT NULL DEFAULT 0, - TaskQueueID INTEGER NOT NULL DEFAULT '0', - PilotJobReference VARCHAR(255) NOT NULL DEFAULT 'Unknown', - PilotStamp VARCHAR(32) NOT NULL DEFAULT '', - DestinationSite VARCHAR(128) NOT NULL DEFAULT 'NotAssigned', - Queue VARCHAR(128) NOT NULL DEFAULT 'Unknown', - GridSite VARCHAR(128) NOT NULL DEFAULT 'Unknown', - Broker VARCHAR(128) NOT NULL DEFAULT 'Unknown', - OwnerDN VARCHAR(255) NOT NULL, - OwnerGroup VARCHAR(128) NOT NULL, - GridType VARCHAR(32) NOT NULL DEFAULT 'LCG', - BenchMark DOUBLE NOT NULL DEFAULT 0.0, - SubmissionTime DATETIME, - LastUpdateTime DATETIME, - Status VARCHAR(32) NOT NULL DEFAULT 'Unknown', - StatusReason VARCHAR(255) NOT NULL DEFAULT 'Unknown', - ParentID INTEGER NOT NULL DEFAULT 0, - OutputReady ENUM ('True','False') NOT NULL DEFAULT 'False', - AccountingSent ENUM ('True','False') NOT NULL DEFAULT 'False', - PRIMARY KEY (PilotID), - INDEX (PilotJobReference), - INDEX (Status) -) ENGINE = InnoDB; +DROP TABLE IF EXISTS `PilotAgents`; +CREATE TABLE `PilotAgents` ( + `PilotID` INT(11) UNSIGNED NOT NULL AUTO_INCREMENT, + `InitialJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0, + `CurrentJobID` INT(11) UNSIGNED NOT NULL DEFAULT 0, + `TaskQueueID` INT(11) UNSIGNED NOT NULL DEFAULT 0, + `PilotJobReference` VARCHAR(255) NOT NULL DEFAULT 'Unknown', + `PilotStamp` VARCHAR(32) NOT NULL DEFAULT '', + `DestinationSite` VARCHAR(128) NOT NULL DEFAULT 'NotAssigned', + `Queue` VARCHAR(128) NOT NULL DEFAULT 'Unknown', + `GridSite` VARCHAR(128) NOT NULL DEFAULT 'Unknown', + `Broker` VARCHAR(128) NOT NULL DEFAULT 'Unknown', + `OwnerDN` VARCHAR(255) NOT NULL, + `OwnerGroup` VARCHAR(128) NOT NULL, + `GridType` VARCHAR(32) NOT NULL DEFAULT 'LCG', + `GridRequirements` blob, + `BenchMark` DOUBLE NOT NULL DEFAULT 0.0, + `SubmissionTime` DATETIME DEFAULT NULL, + `LastUpdateTime` DATETIME DEFAULT NULL, + `Status` VARCHAR(32) NOT NULL DEFAULT 'Unknown', + `StatusReason` VARCHAR(255) NOT NULL DEFAULT 'Unknown', + `ParentID` INT(11) UNSIGNED NOT NULL DEFAULT 0, + `OutputReady` ENUM('True','False') NOT NULL DEFAULT 'False', + `AccountingSent` ENUM('True','False') NOT NULL DEFAULT 'False', + PRIMARY KEY (`PilotID`), + KEY `PilotJobReference` (`PilotJobReference`), + KEY `Status` (`Status`), + KEY `Statuskey` (`GridSite`,`DestinationSite`,`Status`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; -DROP TABLE IF EXISTS JobToPilotMapping; -CREATE TABLE JobToPilotMapping ( - PilotID INTEGER NOT NULL, - JobID INTEGER NOT NULL, - StartTime DATETIME NOT NULL, - INDEX (PilotID), - INDEX (JobID) -) ENGINE = InnoDB; -DROP TABLE IF EXISTS PilotOutput; -CREATE TABLE PilotOutput ( - PilotID INTEGER NOT NULL, - StdOutput MEDIUMBLOB, - StdError MEDIUMBLOB, - PRIMARY KEY (PilotID) -) ENGINE = InnoDB; +DROP TABLE IF EXISTS `JobToPilotMapping`; +CREATE TABLE `JobToPilotMapping` ( + `PilotID` INT(11) UNSIGNED NOT NULL, + `JobID` INT(11) UNSIGNED NOT NULL, + `StartTime` DATETIME NOT NULL, + KEY `JobID` (`JobID`), + KEY `PilotID` (`PilotID`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +DROP TABLE IF EXISTS `PilotOutput`; +CREATE TABLE `PilotOutput` ( + `PilotID` INT(11) UNSIGNED NOT NULL, + `StdOutput` MEDIUMBLOB, + `StdError` MEDIUMBLOB, + PRIMARY KEY (`PilotID`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +DROP TABLE IF EXISTS `PilotRequirements`; +CREATE TABLE `PilotRequirements` ( + `PilotID` INT(11) UNSIGNED NOT NULL, + `Requirements` BLOB, + PRIMARY KEY (`PilotID`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; -DROP TABLE IF EXISTS PilotRequirements; -CREATE TABLE PilotRequirements ( - PilotID INTEGER NOT NULL, - Requirements BLOB, - PRIMARY KEY (PilotID) -); diff --git a/WorkloadManagementSystem/DB/SandboxMetadataDB.py b/WorkloadManagementSystem/DB/SandboxMetadataDB.py index c582721645f..55b562e97a3 100644 --- a/WorkloadManagementSystem/DB/SandboxMetadataDB.py +++ b/WorkloadManagementSystem/DB/SandboxMetadataDB.py @@ -1,14 +1,10 @@ -######################################################################## -# $HeadURL$ -######################################################################## """ SandboxMetadataDB class is a front-end to the metadata for sandboxes """ __RCSID__ = "$Id$" -import time import types -from DIRAC import gConfig, gLogger, S_OK, S_ERROR +from DIRAC import gLogger, S_OK, S_ERROR from DIRAC.Core.Base.DB import DB from DIRAC.Core.Utilities import List from DIRAC.Core.Security import Properties, CS @@ -35,7 +31,7 @@ def __initializeDB( self ): tablesToCreate = {} self.__tablesDesc = {} - self.__tablesDesc[ 'sb_Owners' ] = { 'Fields' : { 'OwnerId' : 'INTEGER UNSIGNED AUTO_INCREMENT NOT NULL', + self.__tablesDesc[ 'sb_Owners' ] = { 'Fields' : { 'OwnerId' : 'INTEGER(10) UNSIGNED AUTO_INCREMENT NOT NULL', 'Owner' : 'VARCHAR(32) NOT NULL', 'OwnerDN' : 'VARCHAR(255) NOT NULL', 'OwnerGroup' : 'VARCHAR(32) NOT NULL', @@ -43,11 +39,11 @@ def __initializeDB( self ): 'PrimaryKey' : 'OwnerId', } - self.__tablesDesc[ 'sb_SandBoxes' ] = { 'Fields' : { 'SBId' : 'INTEGER UNSIGNED AUTO_INCREMENT NOT NULL', - 'OwnerId' : 'INTEGER UNSIGNED NOT NULL', + self.__tablesDesc[ 'sb_SandBoxes' ] = { 'Fields' : { 'SBId' : 'INTEGER(10) UNSIGNED AUTO_INCREMENT NOT NULL', + 'OwnerId' : 'INTEGER(10) UNSIGNED NOT NULL', 'SEName' : 'VARCHAR(64) NOT NULL', 'SEPFN' : 'VARCHAR(512) NOT NULL', - 'Bytes' : 'BIGINT NOT NULL DEFAULT 0', + 'Bytes' : 'BIGINT(20) NOT NULL DEFAULT 0', 'RegistrationTime' : 'DATETIME NOT NULL', 'LastAccessTime' : 'DATETIME NOT NULL', 'Assigned' : 'TINYINT NOT NULL DEFAULT 0', @@ -59,7 +55,7 @@ def __initializeDB( self ): } - self.__tablesDesc[ 'sb_EntityMapping' ] = { 'Fields' : { 'SBId' : 'INTEGER UNSIGNED NOT NULL', + self.__tablesDesc[ 'sb_EntityMapping' ] = { 'Fields' : { 'SBId' : 'INTEGER(10) UNSIGNED NOT NULL', 'EntitySetup' : 'VARCHAR(64) NOT NULL', 'EntityId' : 'VARCHAR(128) NOT NULL', 'Type' : 'VARCHAR(64) NOT NULL', diff --git a/WorkloadManagementSystem/DB/TaskQueueDB.py b/WorkloadManagementSystem/DB/TaskQueueDB.py index b07ab53e219..a8f4715d675 100755 --- a/WorkloadManagementSystem/DB/TaskQueueDB.py +++ b/WorkloadManagementSystem/DB/TaskQueueDB.py @@ -91,11 +91,11 @@ def __initializeDB( self ): tablesToCreate = {} self.__tablesDesc = {} - self.__tablesDesc[ 'tq_TaskQueues' ] = { 'Fields' : { 'TQId' : 'INTEGER UNSIGNED AUTO_INCREMENT NOT NULL', + self.__tablesDesc[ 'tq_TaskQueues' ] = { 'Fields' : { 'TQId' : 'INTEGER(10) UNSIGNED AUTO_INCREMENT NOT NULL', 'OwnerDN' : 'VARCHAR(255) NOT NULL', 'OwnerGroup' : 'VARCHAR(32) NOT NULL', 'Setup' : 'VARCHAR(32) NOT NULL', - 'CPUTime' : 'BIGINT UNSIGNED NOT NULL', + 'CPUTime' : 'BIGINT(20) UNSIGNED NOT NULL', 'Priority' : 'FLOAT NOT NULL', 'Enabled' : 'TINYINT(1) NOT NULL DEFAULT 0' }, @@ -105,8 +105,8 @@ def __initializeDB( self ): } } - self.__tablesDesc[ 'tq_Jobs' ] = { 'Fields' : { 'TQId' : 'INTEGER UNSIGNED NOT NULL', - 'JobId' : 'INTEGER UNSIGNED NOT NULL', + self.__tablesDesc[ 'tq_Jobs' ] = { 'Fields' : { 'TQId' : 'INTEGER(10) UNSIGNED NOT NULL', + 'JobId' : 'INTEGER(11) UNSIGNED NOT NULL', 'Priority' : 'INTEGER UNSIGNED NOT NULL', 'RealPriority' : 'FLOAT NOT NULL' }, @@ -906,8 +906,8 @@ def deleteTaskQueue( self, tqId, tqOwnerDN = False, tqOwnerGroup = False, connOb retVal = self._update( sqlCmd, conn = connObj ) if not retVal[ 'OK' ]: return S_ERROR( "Could not delete task queue %s: %s" % ( tqId, retVal[ 'Message' ] ) ) - for _ in self.__multiValueDefFields: - retVal = self._update( "DELETE FROM `tq_TQTo%s` WHERE TQId = %s" % tqId, conn = connObj ) + for field in self.__multiValueDefFields: + retVal = self._update( "DELETE FROM `tq_TQTo%s` WHERE TQId = %s" % ( field, tqId ), conn = connObj ) if not retVal[ 'OK' ]: return retVal if delTQ > 0: diff --git a/WorkloadManagementSystem/DB/test/TestJobDB.py b/WorkloadManagementSystem/DB/test/TestJobDB.py deleted file mode 100755 index cf64af1f43f..00000000000 --- a/WorkloadManagementSystem/DB/test/TestJobDB.py +++ /dev/null @@ -1,182 +0,0 @@ -import unittest,types -from DIRAC.WorkloadManagementSystem.DB.JobDB import JobDB - -class JobDBTestCase(unittest.TestCase): - """ Base class for the JobDB test cases - """ - - def setUp(self): - print - self.jobDB = JobDB('Test',20) - - def createJob(self): - result = self.jobDB.getJobID() - jobID = result['Value'] - jdlfile = open("test.jdl","r") - jdl = jdlfile.read() - jdlfile.close() - result = self.jobDB.insertJobIntoDB(jobID,jdl) - return jobID - -class JobSubmissionCase(JobDBTestCase): - """ TestJobDB represents a test suite for the JobDB database front-end - """ - - def test_getJobID(self): - - result = self.jobDB.getJobID() - self.assert_( result['OK']) - self.assertEqual(type(result['Value']),types.IntType) - id1 = result['Value'] - result = self.jobDB.getJobID() - self.assert_( result['OK']) - self.assertEqual(type(result['Value']),types.IntType) - id2 = result['Value'] - self.assertEqual(id2,id1+1) - - def test_insertJobIntoDB(self): - - jobID = self.createJob() - jdlfile = open("test.jdl","r") - jdl = jdlfile.read() - jdlfile.close() - result = self.jobDB.insertJobIntoDB(jobID,jdl) - self.assert_( result['OK'],'Status after insertJobIntoDB') - result = self.jobDB.getJobAttribute(jobID,'Status') - self.assert_( result['OK'],'Status after getJobAttribute') - self.assertEqual(result['Value'],'received','Proper received status') - - def test_addJobToDB(self): - - jobID = self.createJob() - jdlfile = open("test.jdl","r") - jdl = jdlfile.read() - jdlfile.close() - result = self.jobDB.addJobToDB(jobID,jdl) - self.assert_( result['OK'],'Status after addJobToDB') - result = self.jobDB.getJobAttribute(jobID,'Status') - self.assert_( result['OK'],'Status after getJobAttribute') - self.assertEqual(result['Value'],'received','Proper received status') - result = self.jobDB.getJobAttribute(jobID,'MinorStatus') - self.assert_( result['OK'],'Status after getJobAttribute') - self.assertEqual(result['Value'],'Job accepted','Proper received status') - -class JobRemovalCase(JobDBTestCase): - - def test_removeJobFromDB(self): - - for i in range(10): - jobID = self.createJob() - - result = self.jobDB.selectJobs({}) - self.assert_( result['OK'],'Status after selectJobs') - jobs = result['Value'] - for job in jobs: - result = self.jobDB.removeJobFromDB(job) - self.assert_( result['OK'],'Status after removeJobFromDB') - -class JobRescheduleCase(JobDBTestCase): - - def test_rescheduleJob(self): - - jobID = self.createJob() - result = self.jobDB.rescheduleJob(jobID) - self.assert_( result['OK'],'Status after rescheduleJob') - - for i in range(10): - jobID = self.createJob() - result = self.jobDB.addJobToDB(jobID) - - result = self.jobDB.selectJobs({}) - self.assert_( result['OK'],'Status after selectJobs') - jobs = result['Value'] - result = self.jobDB.rescheduleJobs(jobs) - self.assert_( result['OK'],'Status after rescheduleJobs') - -class JobParametersCase(JobDBTestCase): - - def test_countJobs(self): - - result = self.jobDB.countJobs({}) - self.assert_( result['OK'],'Status after countJobs') - njobs = result['Value'] - result = self.jobDB.selectJobs({}) - self.assert_( result['OK'],'Status after selectJobs') - jobs = result['Value'] - self.assertEqual(njobs,len(jobs),'Equality of number of jobs' ) - -class SiteMaskCase(JobDBTestCase): - - def test_setMask(self): - - result = self.jobDB.setMask(["DIRAC.in2p3.fr","DIRAC.cern.ch"]) - self.assert_( result['OK'],'Status after setMask') - result = self.jobDB.getMask() - self.assert_( result['OK'],'Status after getMask') - self.assertEqual(result['Value'], - '[ Requirements = OtherSite == "DIRAC.in2p3.fr" || Other.Site == "DIRAC.cern.ch" ]', - 'Equality of the site mask' ) - - def test_allowSiteInMask(self): - - result = self.jobDB.setMask(["DIRAC.in2p3.fr","DIRAC.cern.ch"]) - self.assert_( result['OK'],'Status after setMask') - result = self.jobDB.banSiteInMask("DIRAC.in2p3.fr") - self.assert_( result['OK'],'Status after banSiteInMask') - result = self.jobDB.getMask() - self.assert_( result['OK'],'Status after getMask') - self.assertEqual(result['Value'], - '[ Requirements = OtherSite == "DIRAC.cern.ch" ]', - 'Equality of the site mask' ) - result = self.jobDB.allowSiteInMask("DIRAC.in2p3.fr") - self.assert_( result['OK'],'Status after allowSiteInMask') - result = self.jobDB.getMask() - self.assert_( result['OK'],'Status after getMask') - self.assertEqual(result['Value'], - '[ Requirements = OtherSite == "DIRAC.in2p3.fr" || Other.Site == "DIRAC.cern.ch" ]', - 'Equality of the site mask' ) - -class TaskQueueCase(JobDBTestCase): - - def test_manipulateQueue(self): - - result = self.jobDB.selectQueue('Other.Site == "DIRAC.IN2P3.fr"') - self.assert_( result['OK'],'Status after selectQueue') - queueID_1 = result['Value'] - result = self.jobDB.selectQueue('Other.Site == "DIRAC.IN2P3.fr"') - self.assert_( result['OK'],'Status after selectQueue') - queueID_2 = result['Value'] - self.assertEqual(queueID_1,queueID_2) - - def test_jobsInQueue(self): - - jobID = self.createJob() - result = self.jobDB.addJobToDB(jobID) - self.assert_( result['OK'],'Status after addJobToDB') - result = self.jobDB.selectQueue('Other.Site == "DIRAC.IN2P3.fr"') - self.assert_( result['OK'],'Status after selectQueue') - queueID = result['Value'] - result = self.jobDB.addJobToQueue(jobID,queueID,120) - self.assert_( result['OK'],'Status after addJobToQueue') - result = self.jobDB.deleteJobFromQueue(jobID) - self.assert_( result['OK'],'Status after deleteJobFromQueue') - -class CountJobsCase(JobDBTestCase): - - def test_getCounters(self): - - result = self.jobDB.getCounters(['Status','MinorStatus'],{},'2007-04-22 00:00:00') - self.assert_( result['OK'],'Status after getCounters') - - -if __name__ == '__main__': - - suite = unittest.defaultTestLoader.loadTestsFromTestCase(JobSubmissionCase) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(JobRemovalCase)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(JobRescheduleCase)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(JobParametersCase)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(SiteMaskCase)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(TaskQueueCase)) - suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(CountJobsCase)) - - testResult = unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/WorkloadManagementSystem/DB/test/TestJobLoggingDB.py b/WorkloadManagementSystem/DB/test/TestJobLoggingDB.py deleted file mode 100755 index 7b36c930925..00000000000 --- a/WorkloadManagementSystem/DB/test/TestJobLoggingDB.py +++ /dev/null @@ -1,53 +0,0 @@ -import unittest,types,time,datetime -from DIRAC.WorkloadManagementSystem.DB.JobLoggingDB import JobLoggingDB - -class JobDBTestCase(unittest.TestCase): - """ Base class for the JobDB test cases - """ - - def setUp(self): - print - self.jlogDB = JobLoggingDB('Test',20) - - -class JobLoggingCase(JobDBTestCase): - """ TestJobDB represents a test suite for the JobDB database front-end - """ - - def test_JobStatus(self): - - result = self.jlogDB.addLoggingRecord(1,status="testing", - minor='date=datetime.datetime.utcnow()', - date=datetime.datetime.utcnow(), - source='Unittest') - self.assert_( result['OK']) - date = '2006-04-25 14:20:17' - result = self.jlogDB.addLoggingRecord(1,status="testing", - minor='2006-04-25 14:20:17', - date=date, - source='Unittest') - self.assert_( result['OK']) - result = self.jlogDB.addLoggingRecord(1,status="testing", - minor='No date 1', - source='Unittest') - self.assert_( result['OK']) - result = self.jlogDB.addLoggingRecord(1,status="testing", - minor='No date 2', - source='Unittest') - self.assert_( result['OK']) - result = self.jlogDB.getJobLoggingInfo(1) - self.assert_( result['OK']) - #for row in result['Value']: - # print row - - result = self.jlogDB.getWMSTimeStamps(1) - self.assert_( result['OK']) - #print result['Value'] - - -if __name__ == '__main__': - - suite = unittest.defaultTestLoader.loadTestsFromTestCase(JobLoggingCase) -# suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(JobRemovalCase)) - - testResult = unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/WorkloadManagementSystem/DB/test/TestProxyRepository.py b/WorkloadManagementSystem/DB/test/TestProxyRepository.py deleted file mode 100755 index 23a71d45a60..00000000000 --- a/WorkloadManagementSystem/DB/test/TestProxyRepository.py +++ /dev/null @@ -1,35 +0,0 @@ -import unittest,types,time,datetime -from DIRAC.WorkloadManagementSystem.DB.ProxyRepositoryDB import ProxyRepositoryDB - - - - -class ProxyCase(unittest.TestCase): - """ TestJobDB represents a test suite for the JobDB database front-end - """ - - def setUp(self): - self.repository = ProxyRepositoryDB('Test',10) - - - def test_manipulateProxies(self): - - result = self.repository.storeProxy('This is an imitation of a proxy', - 'This is a DN', - '/lhcb' ) - self.assert_( result['OK']) - result = self.repository.getProxy('This is a DN','/lhcb') - self.assert_( result['OK']) - self.assertEqual('This is an imitation of a proxy',result['Value']) - result = self.repository.getUsers() - self.assert_( result['OK']) - result = self.repository.removeProxy(userDN='This is a DN') - self.assert_( result['OK']) - - -if __name__ == '__main__': - - suite = unittest.defaultTestLoader.loadTestsFromTestCase(ProxyCase) -# suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(JobRemovalCase)) - - testResult = unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/WorkloadManagementSystem/DB/test/TestSandboxDB.py b/WorkloadManagementSystem/DB/test/TestSandboxDB.py deleted file mode 100755 index d237c39c304..00000000000 --- a/WorkloadManagementSystem/DB/test/TestSandboxDB.py +++ /dev/null @@ -1,47 +0,0 @@ -import unittest,zlib -from DIRAC.WorkloadManagementSystem.DB.SandboxDB import SandboxDB - -class JobDBTestCase(unittest.TestCase): - """ Base class for the SandboxDB test cases - """ - - def setUp(self): - print - self.sDB = SandboxDB('Test',20) - - -class SandboxCase(JobDBTestCase): - """ TestJobDB represents a test suite for the JobDB database front-end - """ - - def test_uploadFile(self): - - sandbox = 'out' - - #testfile = open('test.jdl','r') - testfile = open('/home/atsareg/distributive/skype-1.3.0.53-1mdk.i586.rpm','r') - body = testfile.read() - #body = zlib.compress(body) - testfile.close() - - result = self.sDB.storeSandboxFile(1,sandbox+'putFile1',body,sandbox) - print result - self.assert_( result['OK']) - - result = self.sDB.getSandboxFile(1,sandbox+'putFile1',sandbox) - self.assert_( result['OK']) - - newbody = result['Value'] - - self.assertEqual(body,newbody) - - result = self.sDB.getFileNames(1,sandbox) - self.assert_( result['OK']) - print result - -if __name__ == '__main__': - - suite = unittest.defaultTestLoader.loadTestsFromTestCase(SandboxCase) -# suite.addTest(unittest.defaultTestLoader.loadTestsFromTestCase(JobRemovalCase)) - - testResult = unittest.TextTestRunner(verbosity=2).run(suite) diff --git a/WorkloadManagementSystem/DB/test/test.jdl b/WorkloadManagementSystem/DB/test/test.jdl deleted file mode 100755 index 92a510fc9f4..00000000000 --- a/WorkloadManagementSystem/DB/test/test.jdl +++ /dev/null @@ -1,18 +0,0 @@ -[ - -Executable = "/bin/ls"; -Arguments = "> ls.out"; -InputSandbox = "test.jdl"; -OutputSandbox = "ls.out"; -JobType = "processing"; -JobGroup = "TestJobs"; -DIRACSetup = "Test"; -Requirements = Other.Site == "DIRAC.in2p3.fr"; - -Parameters = [ - Par1 = "Value1"; - par2 = 3 - - ]; - -] diff --git a/WorkloadManagementSystem/Service/JobStateUpdateHandler.py b/WorkloadManagementSystem/Service/JobStateUpdateHandler.py index 87805496b5f..1afa290e97d 100755 --- a/WorkloadManagementSystem/Service/JobStateUpdateHandler.py +++ b/WorkloadManagementSystem/Service/JobStateUpdateHandler.py @@ -13,7 +13,8 @@ __RCSID__ = "$Id$" -from types import * +from types import StringType, IntType, LongType, ListType, DictType +# from types import * import time from DIRAC.Core.DISET.RequestHandler import RequestHandler from DIRAC import gLogger, S_OK, S_ERROR diff --git a/WorkloadManagementSystem/Service/WMSAdministratorHandler.py b/WorkloadManagementSystem/Service/WMSAdministratorHandler.py index 2d06bb0a8d4..2d28b650deb 100755 --- a/WorkloadManagementSystem/Service/WMSAdministratorHandler.py +++ b/WorkloadManagementSystem/Service/WMSAdministratorHandler.py @@ -50,16 +50,15 @@ def initializeWMSAdministratorHandler( serviceInfo ): class WMSAdministratorHandler(RequestHandler): ########################################################################### - types_setMask = [StringTypes] - def export_setSiteMask(self, siteList, comment='No comment'): - """ Set the site mask for matching. The mask is given in a form of Classad - string. + types_setSiteMask = [ListType] + def export_setSiteMask( self, siteList ): + """ Set the site mask for matching. The mask is given in a form of Classad string. """ result = self.getRemoteCredentials() dn = result['DN'] maskList = [ (site,'Active') for site in siteList ] - result = jobDB.setSiteMask(maskList,dn,comment) + result = jobDB.setSiteMask( maskList, dn, 'No comment' ) return result ############################################################################## @@ -67,19 +66,7 @@ def export_setSiteMask(self, siteList, comment='No comment'): def export_getSiteMask(self): """ Get the site mask """ - - result = jobDB.getSiteMask('Active') - return result - - if result['Status'] == "OK": - active_list = result['Value'] - mask = [] - for i in range(1,len(active_list),2): - mask.append(active_list[i]) - - return S_OK(mask) - else: - return S_ERROR('Failed to get the mask from the Job DB') + return jobDB.getSiteMask( 'Active' ) ############################################################################## types_banSite = [StringTypes] @@ -119,19 +106,18 @@ def export_clearMask(self): """ Clear up the entire site mask """ - return jobDB.removeSiteFromMask("All") + return jobDB.removeSiteFromMask( None ) ############################################################################## - types_getSiteMaskLogging = [ list(StringTypes)+[ListType] ] - def export_getSiteMaskLogging(self,sites): + types_getSiteMaskLogging = [ list( StringTypes ) + [ListType] ] + def export_getSiteMaskLogging( self, sites ): """ Get the site mask logging history """ if type(sites) in StringTypes: - msites = [sites] - else: - msites = sites - return jobDB.getSiteMaskLogging(msites) + sites = [sites] + + return jobDB.getSiteMaskLogging( sites ) ############################################################################## types_getSiteMaskSummary = [ ] @@ -161,7 +147,7 @@ def export_getSiteMaskSummary(self): return S_OK(siteDict) ############################################################################## - types_getCurrentPilotCounters = [ ] + types_getCurrentPilotCounters = [ DictType ] def export_getCurrentPilotCounters( self, attrDict={}): """ Get pilot counters per Status with attrDict selection. Final statuses are given for the last day. @@ -547,7 +533,8 @@ def export_killPilot(self, pilotRefList ): return result ce = result['Value'] - if gridType in ["LCG","gLite","CREAM"]: + # FIXME: quite hacky. Should be either removed, or based on some flag + if gridType in ["LCG", "gLite", "CREAM", 'ARC']: group = getGroupOption(group,'VOMSRole',group) ret = gProxyManager.getPilotProxyFromVOMSGroup( owner, group ) if not ret['OK']: @@ -659,6 +646,6 @@ def export_getPilotStatistics ( attribute, selectDict ): userName = getUsernameForDN( status['OwnerDN'] ) if userName['OK']: status['OwnerDN'] = userName['Value'] - statistics[status[selector]] = count + statistics[status] = count return S_OK( statistics )