Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[integration] Move pilot submission to Monitoring (ES backend) #5788

Merged
merged 16 commits into from
Jan 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ The given periods above are also the default periods in the code.
Enable WMSHistory monitoring
============================

You have to add ``Monitoring`` to the ``Backends`` option of WorkloadManagemet/StatesAccountingAgent.
You have to add ``Monitoring`` to the ``Backends`` option of WorkloadManagement/StatesAccountingAgent.
If you do so, this agent will collect information using the JobDB and send it to the Elasticsearch database.
This same agent can also report to the MySQL backend of the Accounting system (which is in fact the default).

Expand Down Expand Up @@ -112,11 +112,12 @@ You can configure the MQ in the local dirac.cfg file where the agent is running:
}
}

A dashboard for WMSHistory monitoring ``WMSDashboard`` is available `here <https://github.com/DIRACGrid/DIRAC/tree/integration/dashboards/WMSDashboard>`__ for import both as a JSON file and as a NDJSON (as support for JSON is being removed in the latest versions of Kibana).
The dashboard is not compatible with older versions of ElasticSearch (such as ES6).
To import it in the Kibana UI, go to Management -> Saved Objects -> Import and import the JSON file.
*Kibana dashboard for WMSHistory*
A dashboard for WMSHistory monitoring ``WMSDashboard`` is available `here <https://github.com/DIRACGrid/DIRAC/tree/integration/dashboards/WMSDashboard>`__ for import both as a JSON file and as a NDJSON (as support for JSON is being removed in the latest versions of Kibana).
The dashboard is not compatible with older versions of ElasticSearch (such as ES6).
To import it in the Kibana UI, go to Management -> Saved Objects -> Import and import the JSON file.

Note: the JSON file already contains the index patterns needed for the visualizations. You may need to adapt the index patterns to your existing ones.
Note: the JSON file already contains the index patterns needed for the visualizations. You may need to adapt the index patterns to your existing ones.


Enable Component monitoring
Expand Down Expand Up @@ -153,7 +154,11 @@ In order to enable RMSMonitoring we need to set value of ``EnableRMSMonitoring``
}
}

Enable Pilot Submission Monitoring
==================================

In order to enable the monitoring of the pilot submission so that they will be sent to ES backend (by default they are sent to Accounting), you need to set
``sendPilotSubmissionMonitoring = True`` for this option in WorkloadManagement/SiteDirector.

Accessing the Monitoring information
=====================================
Expand Down
3 changes: 2 additions & 1 deletion docs/source/DeveloperGuide/Systems/Monitoring/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@ A new monitoring type can be added:

self.setKeyFields( ['cond1', 'cond2'] )
self.setMonitoringFields( [ 'ex1' ] )
- create the plotter: MonitoringSystem/Client/private/Plotters/ExamplePlotter.py
- create the plotter: MonitoringSystem/private/Plotters/ExamplePlotter.py
Note: The file name must ends with Plotter word.
You have to implement two functions:

def _reportExample( self, reportRequest ):

def _plotExample( self, reportRequest, plotInfo, filename ):

In the Monitoring page you will see and Example. But if you want to rename it:
Expand Down
5 changes: 0 additions & 5 deletions src/DIRAC/MonitoringSystem/Client/MonitoringReporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@
a MQ service is available, if the MQ is not working a failover will be performed.

"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

__RCSID__ = "$Id$"

import threading
import json
Expand Down
30 changes: 30 additions & 0 deletions src/DIRAC/MonitoringSystem/Client/Types/PilotMonitoring.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
"""
Monitoring Type for Pilot Submission
"""

from DIRAC.MonitoringSystem.Client.Types.BaseType import BaseType


class PilotMonitoring(BaseType):
def __init__(self):

super().__init__()

self.keyFields = ["HostName", "SiteDirector", "Site", "CE", "Queue", "Status"]

self.monitoringFields = ["NumTotal", "NumSucceeded"]

self.index = "pilotstats_index"

self.addMapping(
{
"HostName": {"type": "keyword"},
"SiteDirector": {"type": "keyword"},
"Site": {"type": "keyword"},
"CE": {"type": "keyword"},
"Queue": {"type": "keyword"},
"Status": {"type": "keyword"},
}
)

self.checkType()
3 changes: 0 additions & 3 deletions src/DIRAC/MonitoringSystem/Client/Types/WMSHistory.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@

Filled by the agent "WorkloadManagement/StatesAccountingAgent"
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from DIRAC.MonitoringSystem.Client.Types.BaseType import BaseType

Expand Down
5 changes: 0 additions & 5 deletions src/DIRAC/MonitoringSystem/DB/MonitoringDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@


"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

__RCSID__ = "$Id$"

import time
import calendar
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""
This class is used to define the plot using the plot attributes.
"""

from DIRAC import S_OK

from DIRAC.MonitoringSystem.Client.Types.PilotMonitoring import PilotMonitoring
from DIRAC.MonitoringSystem.private.Plotters.BasePlotter import BasePlotter


class WMSHistoryPlotter(BasePlotter):

"""
.. class:: PilotMonitoringPlotter

It is used to crate the plots.

param: str _typeName monitoring type
param: list _typeKeyFields list of keys what we monitor (list of attributes)
"""

_typeName = "PilotMonitoring"
_typeKeyFields = PilotMonitoring().keyFields

def reportNumberOfSubmissions(self, reportRequest):
"""It is used to retrieve the data from the database.

:param dict reportRequest: contains attributes used to create the plot.
:return: S_OK or S_ERROR {'data':value1, 'granularity':value2} value1 is a dictionary, value2 is the bucket length
"""
retVal = self._getTimedData(
startTime=reportRequest["startTime"],
endTime=reportRequest["endTime"],
selectField="NumTotal",
preCondDict=reportRequest["condDict"],
metadataDict=None,
)
if not retVal["OK"]:
return retVal
dataDict, granularity = retVal["Value"]
return S_OK({"data": dataDict, "granularity": granularity})

def _plotNumberOfSubmissions(self, reportRequest, plotInfo, filename):
"""It creates the plot.

:param dict reportRequest: plot attributes
:param dict plotInfo: contains all the data which are used to create the plot
:param str filename:
:return: S_OK or S_ERROR { 'plot' : value1, 'thumbnail' : value2 } value1 and value2 are TRUE/FALSE
"""
metadata = {
"title": "Pilot Submissions by %s" % reportRequest["grouping"],
"starttime": reportRequest["startTime"],
"endtime": reportRequest["endTime"],
"span": plotInfo["granularity"],
"skipEdgeColor": True,
"ylabel": "Submissions",
}

plotInfo["data"] = self._fillWithZero(
granularity=plotInfo["granularity"],
startEpoch=reportRequest["startTime"],
endEpoch=reportRequest["endTime"],
dataDict=plotInfo["data"],
)

return self._generateStackedLinePlot(filename=filename, dataDict=plotInfo["data"], metadata=metadata)

def reportNumSucceeded(self, reportRequest):
"""It is used to retrieve the data from the database.

:param dict reportRequest: contains attributes used to create the plot.
:return: S_OK or S_ERROR {'data':value1, 'granularity':value2} value1 is a dictionary, value2 is the bucket length
"""
retVal = self._getTimedData(
startTime=reportRequest["startTime"],
endTime=reportRequest["endTime"],
selectField="NumSucceeded",
preCondDict=reportRequest["condDict"],
metadataDict=None,
)
if not retVal["OK"]:
return retVal
dataDict, granularity = retVal["Value"]
return S_OK({"data": dataDict, "granularity": granularity})

def _plotNumSucceeded(self, reportRequest, plotInfo, filename):
"""It creates the plot.

:param dict reportRequest: plot attributes
:param dict plotInfo: contains all the data which are used to create the plot
:param str filename:
:return: S_OK or S_ERROR { 'plot' : value1, 'thumbnail' : value2 } value1 and value2 are TRUE/FALSE
"""
metadata = {
"title": "SuSubmissions by %s" % reportRequest["grouping"],
"starttime": reportRequest["startTime"],
"endtime": reportRequest["endTime"],
"span": plotInfo["granularity"],
"skipEdgeColor": True,
"ylabel": "submissions",
}

plotInfo["data"] = self._fillWithZero(
granularity=plotInfo["granularity"],
startEpoch=reportRequest["startTime"],
endEpoch=reportRequest["endTime"],
dataDict=plotInfo["data"],
)

return self._generateStackedLinePlot(filename=filename, dataDict=plotInfo["data"], metadata=metadata)
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
"""
This class is used to define the plot using the plot attributes.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from DIRAC import S_OK

from DIRAC.MonitoringSystem.Client.Types.WMSHistory import WMSHistory
from DIRAC.MonitoringSystem.private.Plotters.BasePlotter import BasePlotter

__RCSID__ = "$Id$"


class WMSHistoryPlotter(BasePlotter):

Expand Down
75 changes: 71 additions & 4 deletions src/DIRAC/WorkloadManagementSystem/Agent/SiteDirector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
from concurrent.futures import ThreadPoolExecutor

import DIRAC
from DIRAC import S_OK, gConfig
from DIRAC import S_OK, S_ERROR, gConfig
from DIRAC.ConfigurationSystem.Client.Helpers import CSGlobals, Registry
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations
from DIRAC.ConfigurationSystem.Client.Helpers.Resources import getCESiteMapping
from DIRAC.Core.Base.AgentModule import AgentModule
from DIRAC.Core.Utilities.Time import dateTime, second
from DIRAC.Core.Utilities.Time import dateTime, second, toEpoch
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
from DIRAC.FrameworkSystem.Client.ProxyManagerClient import gProxyManager
from DIRAC.AccountingSystem.Client.Types.Pilot import Pilot as PilotAccounting
from DIRAC.AccountingSystem.Client.Types.PilotSubmission import PilotSubmission as PilotSubmissionAccounting
from DIRAC.MonitoringSystem.Client.MonitoringReporter import MonitoringReporter
from DIRAC.MonitoringSystem.Client.Types.PilotMonitoring import PilotMonitoring as PilotSubmissionMonitoring
from DIRAC.AccountingSystem.Client.DataStoreClient import gDataStoreClient
from DIRAC.WorkloadManagementSystem.Client import PilotStatus
from DIRAC.WorkloadManagementSystem.Client.MatcherClient import MatcherClient
Expand Down Expand Up @@ -87,7 +89,7 @@ def __init__(self, *args, **kwargs):
self.getOutput = False
self.sendAccounting = True
self.sendSubmissionAccounting = True

self.sendSubmissionMonitoring = False
self.siteClient = None
self.rssClient = None
self.rssFlag = None
Expand Down Expand Up @@ -192,6 +194,9 @@ def beginExecution(self):
self.sendSubmissionAccounting = self.am_getOption(
"SendPilotSubmissionAccounting", self.sendSubmissionAccounting
)
self.sendSubmissionMonitoring = self.am_getOption(
"SendPilotSubmissionMonitoring", self.sendSubmissionMonitoring
)

# Get the site description dictionary
siteNames = None
Expand Down Expand Up @@ -270,6 +275,8 @@ def beginExecution(self):
self.log.always("Pilot accounting sending requested")
if self.sendSubmissionAccounting:
self.log.always("Pilot submission accounting sending requested")
if self.sendSubmissionMonitoring:
self.log.always("Pilot submission monitoring sending requested")

self.log.always("MaxPilotsToSubmit:", self.maxPilotsToSubmit)

Expand Down Expand Up @@ -716,7 +723,15 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue):
0,
"Failed",
)

if self.sendSubmissionMonitoring:
self.sendPilotSubmissionMonitoring(
self.queueDict[queue]["Site"],
self.queueDict[queue]["CEName"],
self.queueDict[queue]["QueueName"],
pilotsToSubmit,
0,
"Failed",
)
self.failedQueues[queue] += 1
return submitResult

Expand All @@ -739,6 +754,15 @@ def _submitPilotsToQueue(self, pilotsToSubmit, ce, queue):
len(pilotList),
"Succeeded",
)
if self.sendSubmissionMonitoring:
self.sendPilotSubmissionMonitoring(
self.queueDict[queue]["Site"],
self.queueDict[queue]["CEName"],
self.queueDict[queue]["QueueName"],
len(pilotList),
len(pilotList),
"Succeeded",
)

return S_OK((pilotList, stampDict))

Expand Down Expand Up @@ -1359,3 +1383,46 @@ def sendPilotSubmissionAccounting(self, siteName, ceName, queueName, numTotal, n
if not result["OK"]:
self.log.error("Error in Commit:" + result["Message"])
return result

def sendPilotSubmissionMonitoring(self, siteName, ceName, queueName, numTotal, numSucceeded, status):
"""Sends pilot submission records to monitoring

:param str siteName: Site name
:param str ceName: CE name
:param str queueName: queue Name
:param int numTotal: Total number of submission
:param int numSucceeded: Total number of submission succeeded
:param str status: 'Succeeded' or 'Failed'

:returns: S_OK / S_ERROR
"""

pilotMonitoringReporter = MonitoringReporter(monitoringType="PilotMonitoring")

if hasattr(self, "_AgentModule__moduleProperties"):
siteDirName = self.am_getModuleParam("agentName")
else: # In case it is not executed as agent
siteDirName = "Client"

pilotMonitoringData = [
{
"HostName": DIRAC.siteName(),
"SiteDirector": siteDirName,
"Site": siteName,
"CE": ceName,
"Queue": queueName,
"Status": status,
"NumTotal": numTotal,
"NumSucceded": numSucceeded,
"timestamp": int(toEpoch(dateTime())),
}
]
pilotMonitoringReporter.addRecord(pilotMonitoringData)
result = pilotMonitoringReporter.commit()

self.log.verbose("Committing pilot submission to monitoring")
if not result["OK"]:
self.log.error("Couldn't commit pilot submission to monitoring", result["Message"])
return S_ERROR()
self.log.verbose("Done committing to monitoring")
return S_OK()
2 changes: 2 additions & 0 deletions src/DIRAC/WorkloadManagementSystem/ConfigTemplate.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ Agents
SendPilotAccounting = True
# Boolean value that indicates if the pilot submission statistics will be sended for accounting
SendPilotSubmissionAccounting = True
# Boolean value that indicates if the pilot submission statistics will be sended for monitoring
SendPilotSubmissionMonitoring = False
}
##END
##BEGIN StatesAccountingAgent
Expand Down
6 changes: 0 additions & 6 deletions tests/Integration/Monitoring/Test_MonitoringDB.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
""" Test for MonitoringDB
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

__RCSID__ = "$Id$"

import time
import json
import pytest
Expand Down
Loading