From e7fc7149d5ecb45e69dc3056b83334b03223df63 Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Sun, 22 Jun 2025 18:56:00 +0200 Subject: [PATCH 01/10] Fix: back to ubuntu/debian repositories --- Dockerfile | 6 +++--- docker/kairosdb/Dockerfile | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/Dockerfile b/Dockerfile index 3c8c8d6..0105f6f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,13 +3,13 @@ FROM examonhpc/examon:0.2.0 ENV EXAMON_HOME /etc/examon_deploy/examon # Create a backup of the existing sources.list -RUN mv /etc/apt/sources.list /etc/apt/sources.list.backup +#RUN mv /etc/apt/sources.list /etc/apt/sources.list.backup # Create a new sources.list file -RUN touch /etc/apt/sources.list +#RUN touch /etc/apt/sources.list # Debian strech moved to archived -RUN echo "deb https://debian.mirror.garr.it/debian-archive/ stretch main" > /etc/apt/sources.list +#RUN echo "deb https://debian.mirror.garr.it/debian-archive/ stretch main" > /etc/apt/sources.list # Install dependencies diff --git a/docker/kairosdb/Dockerfile b/docker/kairosdb/Dockerfile index 16ab7c6..08a0698 100644 --- a/docker/kairosdb/Dockerfile +++ b/docker/kairosdb/Dockerfile @@ -3,13 +3,13 @@ FROM adoptopenjdk:8-jre-hotspot-focal # Create a backup of the existing sources.list -RUN mv /etc/apt/sources.list /etc/apt/sources.list.backup +#RUN mv /etc/apt/sources.list /etc/apt/sources.list.backup # Create a new sources.list file -RUN touch /etc/apt/sources.list +#RUN touch /etc/apt/sources.list # Add the new server repository for focal packages -RUN echo "deb https://ubuntu.mirror.garr.it/ubuntu/ focal main" > /etc/apt/sources.list +#RUN echo "deb https://ubuntu.mirror.garr.it/ubuntu/ focal main" > /etc/apt/sources.list RUN set -eux; \ apt-get update; \ From 31001a30b14a7a463edf0523d21bdf13a295d890 Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Mon, 23 Jun 2025 15:55:03 +0200 Subject: [PATCH 02/10] Chore: Python3 migration steps --- .gitignore | 3 +- Dockerfile | 43 ++-- VERSION | 2 +- docker-compose.yml | 6 +- docker/examon/supervisor.conf | 2 +- lib/examon-common/.gitignore | 2 - lib/examon-common/README.rst | 3 - lib/examon-common/examon/__init__.py | 5 - lib/examon-common/examon/db/__init__.py | 0 lib/examon-common/examon/db/kairosdb.py | 77 ------- lib/examon-common/examon/plugin/__init__.py | 0 lib/examon-common/examon/plugin/examonapp.py | 73 ------- .../examon/plugin/sensorreader.py | 117 ---------- .../examon/transport/__init__.py | 0 lib/examon-common/examon/transport/mqtt.py | 189 ---------------- lib/examon-common/examon/utils/__init__.py | 0 lib/examon-common/examon/utils/config.py | 48 ----- lib/examon-common/examon/utils/daemon.py | 150 ------------- lib/examon-common/examon/utils/executor.py | 73 ------- lib/examon-common/setup.py | 20 -- web/examon-server/.gitignore | 4 + web/examon-server/doc.md | 154 +++++++++++++ web/examon-server/example_server.conf | 4 + web/examon-server/requirements.txt | 28 +-- web/examon-server/server.py | 202 ++++++++++++------ 25 files changed, 328 insertions(+), 877 deletions(-) delete mode 100644 lib/examon-common/.gitignore delete mode 100644 lib/examon-common/README.rst delete mode 100644 lib/examon-common/examon/__init__.py delete mode 100644 lib/examon-common/examon/db/__init__.py delete mode 100644 lib/examon-common/examon/db/kairosdb.py delete mode 100644 lib/examon-common/examon/plugin/__init__.py delete mode 100644 lib/examon-common/examon/plugin/examonapp.py delete mode 100644 lib/examon-common/examon/plugin/sensorreader.py delete mode 100644 lib/examon-common/examon/transport/__init__.py delete mode 100644 lib/examon-common/examon/transport/mqtt.py delete mode 100644 lib/examon-common/examon/utils/__init__.py delete mode 100644 lib/examon-common/examon/utils/config.py delete mode 100644 lib/examon-common/examon/utils/daemon.py delete mode 100644 lib/examon-common/examon/utils/executor.py delete mode 100644 lib/examon-common/setup.py create mode 100644 web/examon-server/doc.md diff --git a/.gitignore b/.gitignore index 31698e7..df6cd89 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,6 @@ *.log.* *.pyc .ipynb_* -examon-cache/ -examon-cache/* +examon-cache build site diff --git a/Dockerfile b/Dockerfile index 0105f6f..3b6df34 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,52 +1,35 @@ -FROM examonhpc/examon:0.2.0 +FROM examonhpc/examon:0.3.0 ENV EXAMON_HOME /etc/examon_deploy/examon -# Create a backup of the existing sources.list -#RUN mv /etc/apt/sources.list /etc/apt/sources.list.backup - -# Create a new sources.list file -#RUN touch /etc/apt/sources.list - -# Debian strech moved to archived -#RUN echo "deb https://debian.mirror.garr.it/debian-archive/ stretch main" > /etc/apt/sources.list - - -# Install dependencies -RUN apt-get update && apt-get install -y \ - apt-transport-https \ - ca-certificates \ - libffi-dev \ - build-essential \ - libssl-dev \ - python-dev \ - && rm -rf /var/lib/apt/lists/* - -# copy app +# Copy app ADD ./publishers/random_pub ${EXAMON_HOME}/publishers/random_pub -ADD ./lib/examon-common $EXAMON_HOME/lib/examon-common ADD ./docker/examon/supervisor.conf /etc/supervisor/conf.d/supervisor.conf ADD ./scripts/examon.conf $EXAMON_HOME/scripts/examon.conf ADD ./web $EXAMON_HOME/web -# install -RUN pip --trusted-host pypi.python.org install --upgrade pip==20.1.1 +# Venvs +WORKDIR $EXAMON_HOME/scripts +RUN virtualenv py3_env + ENV PIP $EXAMON_HOME/scripts/ve/bin/pip +ENV S_PIP $EXAMON_HOME/scripts/py3_env/bin/pip +# Install WORKDIR $EXAMON_HOME/lib/examon-common -RUN $PIP install . -RUN pip install . +RUN $S_PIP install . +# Random publisher WORKDIR $EXAMON_HOME/publishers/random_pub RUN $PIP install -r requirements.txt +# Web WORKDIR $EXAMON_HOME/web RUN virtualenv flask -RUN flask/bin/pip --trusted-host pypi.python.org install --upgrade pip==20.1.1 -RUN CASS_DRIVER_BUILD_CONCURRENCY=8 flask/bin/pip --trusted-host pypi.python.org install -r ./examon-server/requirements.txt +RUN CASS_DRIVER_BUILD_CONCURRENCY=8 flask/bin/pip install -r ./examon-server/requirements.txt WORKDIR $EXAMON_HOME/scripts -EXPOSE 1883 9001 +EXPOSE 1883 5000 9001 CMD ["./frontend_ctl.sh", "start"] diff --git a/VERSION b/VERSION index 1e66a61..01e994d 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -v0.3.1 \ No newline at end of file +v0.4.0 \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index b1da1f8..1de513a 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,4 +1,3 @@ -version: "3.8" name: "examon" networks: @@ -11,11 +10,8 @@ services: examon: build: context: . - image: examonhpc/examon:0.2.2 + image: examonhpc/examon:0.3.1 volumes: - - type: bind - source: ./lib/examon-common - target: /etc/examon_deploy/examon/lib/examon-common - type: bind source: ./web/examon-server target: /etc/examon_deploy/examon/web/examon-server diff --git a/docker/examon/supervisor.conf b/docker/examon/supervisor.conf index 34a623c..4efbbe7 100644 --- a/docker/examon/supervisor.conf +++ b/docker/examon/supervisor.conf @@ -23,7 +23,7 @@ startsec=2 [program:random_pub] directory=/etc/examon_deploy/examon/publishers/random_pub -command=python ./random_pub.py run +command=/etc/examon_deploy/examon/scripts/ve/bin/python random_pub.py run autostart=true autorestart=true stderr_logfile=/var/log/random_pub.log diff --git a/lib/examon-common/.gitignore b/lib/examon-common/.gitignore deleted file mode 100644 index bf054b8..0000000 --- a/lib/examon-common/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -*.pyc -./examon/version.py diff --git a/lib/examon-common/README.rst b/lib/examon-common/README.rst deleted file mode 100644 index ffdce31..0000000 --- a/lib/examon-common/README.rst +++ /dev/null @@ -1,3 +0,0 @@ -Examon common utilities package -=============================== -v0.2.3 diff --git a/lib/examon-common/examon/__init__.py b/lib/examon-common/examon/__init__.py deleted file mode 100644 index 1f8fc77..0000000 --- a/lib/examon-common/examon/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ - -import logging -from logging import NullHandler - -logging.getLogger(__name__).addHandler(NullHandler()) \ No newline at end of file diff --git a/lib/examon-common/examon/db/__init__.py b/lib/examon-common/examon/db/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/lib/examon-common/examon/db/kairosdb.py b/lib/examon-common/examon/db/kairosdb.py deleted file mode 100644 index 8a37557..0000000 --- a/lib/examon-common/examon/db/kairosdb.py +++ /dev/null @@ -1,77 +0,0 @@ - -import sys -import zlib -import gzip -import json -import requests -import StringIO -import logging - - -class KairosDB: - """ - KairosDB REST client - """ - def __init__(self, server, port, user=None, password=None): - self.server = server - self.port = port - self.user = user - self.password = password - self.s = requests.Session() - if self.password: - self.s.auth = (self.user, self.password) - #self.s.headers.update({'x-test': 'true'}) - self.logger = logging.getLogger(__name__) - self.apis = {} - self.api_server = "http://" + self.server + ":" + self.port - self.apis['post_metrics'] = self.api_server + "/api/v1/datapoints" - self.apis['post_query'] = self.api_server + "/api/v1/datapoints/query" - - def _compress(self, payload): - s = StringIO.StringIO() - with gzip.GzipFile(fileobj=s, mode='w') as g: - g.write(payload) - return s.getvalue() - - def put_metrics(self, metrics, comp=True): - headers = {} - response = None - if comp: - headers = {'content-type': 'application/gzip'} - payload = self._compress(json.dumps(metrics)) - else: - payload = json.dumps(metrics) - try: - self.logger.debug("Inserting %d metrics" % len(metrics)) - response = self.s.post(self.apis['post_metrics'], payload, headers=headers) - response.raise_for_status() - - # # DEBUG: send one metric at time - # for m in metrics: - # pay = [m] - # try: - # response = self.s.post(self.apis['post_metrics'], json.dumps([m]), headers=headers) - # response.raise_for_status() - # except: - # self.logger.error("Exception in post()", exc_info=True) - # self.logger.error("Request payload: %s" % (json.dumps(pay, indent=4))) - # self.logger.error("Reason %s" % (response.text)) - - except: - #e = sys.exc_info()[0] - #logger.error("[%s] Exception in post(): %s", "KairosDB", e) - #self.logger.error("Exception in post()", exc_info=True) - self.logger.exception("Exception in post()") - #if response: - # self.logger.error("Reason %s" % (response.text)) - #self.logger.error("Request payload: %s" % (json.dumps(pay, indent=4))) - #print "[%s] Exception in post(): %s" % ("KairosDB", e,) - #print "[%s] Reason: " % ("KairosDB",) - #print response.text - #sys.exit(1) - - def query_metrics(self, query): - self.logger.debug("query metrics: %s" % repr(query)) - headers = {'Accept-Encoding': 'gzip, deflate'} - response = self.s.post(self.apis['post_query'], data=json.dumps(query), headers=headers) - return response.json() diff --git a/lib/examon-common/examon/plugin/__init__.py b/lib/examon-common/examon/plugin/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/lib/examon-common/examon/plugin/examonapp.py b/lib/examon-common/examon/plugin/examonapp.py deleted file mode 100644 index 18d2d97..0000000 --- a/lib/examon-common/examon/plugin/examonapp.py +++ /dev/null @@ -1,73 +0,0 @@ -import os -import sys -import signal -import logging -import collections -from logging.handlers import RotatingFileHandler - -from examon.utils.executor import Executor -from examon.utils.config import Config -from examon.utils.daemon import Daemon - -#import multiprocessing_logging as mp_logging -from concurrent_log_handler import ConcurrentRotatingFileHandler - -class ExamonApp(Executor): - def __init__(self, executor='Daemon', configfilename=None): - if configfilename == None: - self.configfilename = os.path.splitext(os.path.basename(sys.argv[0]))[0] - else: - self.configfilename = configfilename - self.cfg = Config(self.configfilename + '.conf') - self.conf = self.cfg.get_defaults() - self.pidfile = None - self.daemon = None - self.runmode = 'run' - self.logger = logging.getLogger('examon') - super(ExamonApp, self).__init__(executor) - - def parse_opt(self): - self.conf = self.cfg.get_conf() - self.runmode = self.conf['runmode'] - self.pidfile = self.conf['PID_FILENAME'] - self.daemon = Daemon(self.pidfile, signal.SIGINT) - - def examon_tags(self): - return collections.OrderedDict() - - def set_logging(self): - LOGFILE_SIZE_B = int(self.conf['LOGFILE_SIZE_B']) - LOG_LEVEL = getattr(logging, self.conf['LOG_LEVEL'].upper(), None) - #logger = logging.getLogger('examon') - #handler = RotatingFileHandler(self.conf['LOG_FILENAME'], mode='a', maxBytes=LOGFILE_SIZE_B, backupCount=2) - handler = ConcurrentRotatingFileHandler(self.conf['LOG_FILENAME'], mode='a', maxBytes=LOGFILE_SIZE_B, backupCount=2) - #log_formatter = logging.Formatter(fmt='%(asctime)s - %(name)s - %(levelname)s - [%(filename)s] - [%(processName)s] %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') - log_formatter = logging.Formatter(fmt='%(levelname)s - %(asctime)s - [%(processName)s] - [%(filename)s] - %(name)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') - handler.setFormatter(log_formatter) - self.logger.addHandler(handler) - self.logger.setLevel(LOG_LEVEL) - # if run print logs also to stdout - if self.runmode == 'run': - handler = logging.StreamHandler(sys.stdout) - handler.setFormatter(log_formatter) - self.logger.addHandler(handler) - #mp_logging.install_mp_handler() - - def run(self): - self.set_logging() - if ('stop' == self.runmode): - print " Terminating daemon..." - self.logger.info("Terminating daemon...") - self.daemon.stop() - sys.exit(0) - elif self.runmode in ['run','start','restart']: - if self.runmode == 'start': - print "Daemonize.." - self.daemon.start() - elif self.runmode == 'restart': - print "Restarting Daemon.." - self.daemon.restart() - else: - pass - print "Starting jobs..." - self.exec_par() diff --git a/lib/examon-common/examon/plugin/sensorreader.py b/lib/examon-common/examon/plugin/sensorreader.py deleted file mode 100644 index 2bf06be..0000000 --- a/lib/examon-common/examon/plugin/sensorreader.py +++ /dev/null @@ -1,117 +0,0 @@ -import os -import sys -import copy -import time -import json -import logging -import collections -import thread - -from threading import Timer -from examon.db.kairosdb import KairosDB -from examon.transport.mqtt import Mqtt - - -def timeout_handler(): - logger = logging.getLogger(__name__) - logger.error('Timeout in main loop, exiting..') - logger.debug('Process PID: %d' % os.getpid()) - #sys.exit(1) - #thread.interrupt_main() - os._exit(1) - -class SensorReader: - """ - Examon Sensor adapter - """ - def __init__(self, conf, sensor): - self.conf = copy.deepcopy(conf) - self.sensor = sensor - self.tags = collections.OrderedDict() - self.read_data = None - self.dest_client = None - self.comp = self.conf['COMPRESS'] - self.logger = logging.getLogger(__name__) - - # if self.conf['OUT_PROTOCOL'] == 'kairosdb': - # self.dest_client = KairosDB(self.conf['K_SERVERS'], self.conf['K_PORT'], self.conf['K_USER'], self.conf['K_PASSWORD']) - # elif self.conf['OUT_PROTOCOL'] == 'mqtt': - # # TODO: add MQTT format in conf - # self.dest_client = Mqtt(self.conf['MQTT_BROKER'], self.conf['MQTT_PORT'], format=self.conf['MQTT_FORMAT'], outtopic=self.conf['MQTT_TOPIC']) - # self.dest_client.run() - - def add_tag_v(self, v): - """Sanitize tag values""" - if (v is not None) and (v is not u'') and (v is not 'None'): - ret = v.replace(' ','_').replace('/','_').replace('+','_').replace('#','_') - else: - ret = '_' - return ret - - def add_payload_v(self, v): - """Sanitize payload values""" - if (v is not None) and (v is not u'') and (v is not 'None'): - if isinstance(v, basestring): - ret = v.replace(';','_') - else: - ret = v - else: - ret = '_' - return ret - - def add_tags(self, tags): - self.tags = copy.deepcopy(tags) - - def get_tags(self): - return copy.deepcopy(self.tags) - - def run(self): - if not self.read_data: - raise Exception("'read_data' must be implemented!") - - if self.conf['OUT_PROTOCOL'] == 'kairosdb': - self.dest_client = KairosDB(self.conf['K_SERVERS'], self.conf['K_PORT'], self.conf['K_USER'], self.conf['K_PASSWORD']) - elif self.conf['OUT_PROTOCOL'] == 'mqtt': - # TODO: add MQTT format in conf - self.dest_client = Mqtt(self.conf['MQTT_BROKER'], self.conf['MQTT_PORT'], username=self.conf['MQTT_USER'], password=self.conf['MQTT_PASSWORD'], format=self.conf['MQTT_FORMAT'], outtopic=self.conf['MQTT_TOPIC'], dryrun=self.conf['DRY_RUN']) - self.dest_client.run() - - TS = float(self.conf['TS']) - - while True: - try: - self.logger.debug("Start timeout timer") - timeout_timer = Timer(10*TS, timeout_handler) #timeout after 3*sampling time - timeout_timer.start() - - t0 = time.time() - #if self.read_data: - worker_id, payload = self.read_data(self) - t1 = time.time() - #print "Retrieved and processed %d nodes in %f seconds" % (len(res),(t1-t0),) - self.logger.info("Worker [%s] - Retrieved and processed %d metrics in %f seconds" % (worker_id, len(payload),(t1-t0),)) - #print json.dumps(res) - #sys.exit(0) - t0 = time.time() - self.dest_client.put_metrics(payload, comp=self.comp) - t1 = time.time() - #print json.dumps(payload[0:3], indent=4) - # print "Worker %s:...............insert: %d sensors, time: %f sec, insert_rate %f sens/sec" % (worker_id, \ - # len(payload),\ - # (t1-t0),\ - # len(payload)/(t1-t0), ) - self.logger.debug("Worker [%s] - Insert: %d sensors, time: %f sec, insert_rate: %f sens/sec" % (worker_id, \ - len(payload),\ - (t1-t0),\ - len(payload)/(t1-t0), )) - except Exception: - self.logger.exception('Uncaught exception in main loop!') - self.logger.debug("Cancel timeout timer") - timeout_timer.cancel() - continue - - self.logger.debug("Cancel timeout timer") - timeout_timer.cancel() - - self.logger.debug("Start new loop") - time.sleep(TS - (time.time() % TS)) diff --git a/lib/examon-common/examon/transport/__init__.py b/lib/examon-common/examon/transport/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/lib/examon-common/examon/transport/mqtt.py b/lib/examon-common/examon/transport/mqtt.py deleted file mode 100644 index 3ad0efc..0000000 --- a/lib/examon-common/examon/transport/mqtt.py +++ /dev/null @@ -1,189 +0,0 @@ -# -*- coding: utf-8 -*- -""" - Mqtt.py - MQTT protocol handler - - Copyright (c) 2014, francesco.beneventi@unibo.it - -""" - -import sys -import zlib -import gzip -import json -import struct -import StringIO -import logging -import paho.mqtt.client as mosquitto - - - -class Mqtt(object): - """ - MQTT client - """ - def __init__(self, brokerip, brokerport, username=None, password=None, format='csv', intopic=None, outtopic=None, qos=0, retain=False, dryrun=False): - self.brokerip = brokerip - self.brokerport = brokerport - self.intopic = intopic - self.outtopic = outtopic - self.qos = qos - self.retain = retain - self.dryrun = dryrun - self.client = mosquitto.Mosquitto() - if username: - self.client.username_pw_set(username, password) - self.client.on_connect = self.on_connect - self.client.on_message = self.on_message - self.client.on_log = self.on_log - self.status = 1 # ok - self.logger = logging.getLogger(__name__) - - # set msg format: default = 'csv' - if format == 'csv': - self.put_metrics = self._put_metrics_csv - elif format == 'json': - self.put_metrics = self._put_metrics_json - elif format == 'bulk': - self.put_metrics = self._put_metrics_json_bulk - - - def process(self, client, msg): - """ - Stream processing method. Override - """ - pass - - def on_log(self, client, userdata, level, buff): - self.logger.debug('MQTT logs: %s' % (buff)) - - def on_connect(self, client, userdata, flags, rc): # paho - #def on_connect(self, client, userdata, rc): - """ - On connect callback - """ - if int(rc) != 0: - self.logger.error('Error in connect. Result code: %s' % (str(rc))) - self.logger.info('Closing the MQTT connection') - self.client.disconnect() - self.status = 0 # error - else: - self.logger.info("Connected with result code %s" % (str(rc))) - if self.intopic: - self.logger.info("Subscribing to: %s" % (self.intopic)) - self.client.subscribe(self.intopic) - - # The callback for when a PUBLISH message is received from the server. - def on_message(self, client, userdata, msg): - """ - On message callback - """ - self.process(client,msg) - - def _compress(self, payload): - """ - Compress payload. TODO: replace with blosc - """ - s = StringIO.StringIO() - with gzip.GzipFile(fileobj=s, mode='w') as g: - g.write(payload) - return s.getvalue() - - def _put_metrics_csv(self, metrics, comp=False): - """ - One value per message: csv. - Topic is a / sequence obtained from metric['tags'] dict - Payload is a string cat ; - """ - if not self.status: - self.logger.error('Bad client status. Exit.') - sys.exit(1) - - for metric in metrics: - # build value - payload = str(metric['value']).encode('utf-8') - # skip if no value - if payload == '': - continue - payload += (";%.3f" % ((metric['timestamp'])/1000)) - payload = str(payload) - if comp: - payload = self._compress(payload) # TODO: find a better way. This manage both strings and floats - # build topic - topic = '/'.join([(val).replace('/','_').encode('utf-8') for pair in metric['tags'].items() for val in pair]) - topic += '/' + (metric['name']).encode('utf-8') - # sanitize - topic = topic.replace(' ','_').replace('+','_').replace('#','_') - topic = (topic).decode('utf-8') - # publish - self.logger.debug('[MqttPub] Topic: %s - Payload: %s' % (topic,str(payload))) - self._publish(topic, payload) - - - def _put_metrics_json(self, metrics, comp=False): - """ - One value per message: json. - Topic is a pre-defined value (outtopic) - Payload is the json obtained from metric - """ - if not self.status: - self.logger.error('Bad client status. Exit.') - sys.exit(1) - - for metric in metrics: - # build topic - topic = self.outtopic - # build value - if comp: - payload = self._compress(json.dumps(metric)) - else: - payload = json.dumps(metric) - # publish - self.logger.debug('[MqttPub] Topic: %s - Payload: %s' % (topic,json.dumps(metric))) - self._publish(topic, payload) - - - def _put_metrics_json_bulk(self, metrics, comp=True): - """ - Multiple values per message. - Topic is a pre-defined value (outtopic) - Payload is the (compressed) list of metrics - """ - if not self.status: - self.logger.error('Bad client status. Exit.') - sys.exit(1) - - # build topic - topic = self.outtopic - # build value - if comp: - payload = self._compress(json.dumps(metrics)) - else: - payload = json.dumps(metrics) - # publish - self.logger.debug('[MqttPub] Topic: %s - Payload: %s' % (topic,json.dumps(metrics))) - self._publish(topic, payload) - - def _publish(self, topic, payload): - if not self.dryrun: - try: - self.client.publish(topic, payload=payload, qos=self.qos, retain=self.retain) - except: - self.logger.exception('Exception in MQTT publish. Exit.') - sys.exit(1) - - def run(self): - """ - Connect and start MQTT FSM - """ - rc = -1 - self.logger.info('Connecting to MQTT server: %s:%s' % (self.brokerip,self.brokerport)) - try: - rc = self.client.connect(self.brokerip, port=int(self.brokerport)) - self.logger.debug('Connect rc: %d' % (rc)) - if rc != 0: - raise - except: - self.logger.exception('Exception in MQTT connect, rc: %d' % (rc)) - sys.exit(1) - self.logger.info('MQTT started') - self.client.loop_start() diff --git a/lib/examon-common/examon/utils/__init__.py b/lib/examon-common/examon/utils/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/lib/examon-common/examon/utils/config.py b/lib/examon-common/examon/utils/config.py deleted file mode 100644 index ffa98d1..0000000 --- a/lib/examon-common/examon/utils/config.py +++ /dev/null @@ -1,48 +0,0 @@ - -import argparse -import ConfigParser - - -class Config: - def __init__(self, configfile): - self.configfile = configfile - self.defaults = {} - self.parser = argparse.ArgumentParser() - # default args - self.parser.add_argument('runmode', choices=['run','start','restart','stop'], help='Run mode') - self.parser.add_argument('-b', dest='MQTT_BROKER', help='IP address of the MQTT broker') - self.parser.add_argument('-p', dest='MQTT_PORT', help='Port of the MQTT broker') - self.parser.add_argument('-t', dest='MQTT_TOPIC', help='MQTT topic') - self.parser.add_argument('-s', dest='TS', help='Sampling time (seconds)') - self.parser.add_argument('-x', dest='PID_FILENAME', help='pid filename') - self.parser.add_argument('-l', dest='LOG_FILENAME', help='log filename') - self.parser.add_argument('-d', dest='OUT_PROTOCOL', choices=['mqtt','kairosdb'], default='mqtt', help='select where to send data (default: mqtt)') - self.parser.add_argument('-f', dest='MQTT_FORMAT', choices=['csv','json','bulk'], default='csv', help='MQTT payload format (default: csv)') - self.parser.add_argument('--compress', dest='COMPRESS', action='store_true', default=False, help='enable payload compression (default: False)') - #self.parser.add_argument('--version', action='version', version=version) - self.parser.add_argument('--kairosdb-server', dest='K_SERVERS', help='kairosdb servers') - self.parser.add_argument('--kairosdb-port', dest='K_PORT', help='kairosdb port') - self.parser.add_argument('--kairosdb-user', dest='K_USER', help='kairosdb username') - self.parser.add_argument('--kairosdb-password', dest='K_PASSWORD', help='kairosdb password') - self.parser.add_argument('--logfile-size', dest='LOGFILE_SIZE_B', default=5*1024*1024, help='log file size (max) in bytes') - self.parser.add_argument('--loglevel', dest='LOG_LEVEL', choices=['DEBUG','INFO','WARNING','ERROR','CRITICAL'], default='INFO', help='log level') - self.parser.add_argument('--dry-run', dest='DRY_RUN', action='store_true', default=False, help='Data is not sent to the broker if True (default: False)') - self.parser.add_argument('--mqtt-user', dest='MQTT_USER', help='MQTT username', default=None) - self.parser.add_argument('--mqtt-password', dest='MQTT_PASSWORD', help='MQTT password', default=None) - - def get_defaults(self): - config = ConfigParser.RawConfigParser(allow_no_value=True) - config.optionxform = str #preserve caps - config.read(self.configfile) - for section in config.sections(): - self.defaults.update(dict(config.items(section))) - return self.defaults - - def update_optparser(self, parser): - self.parser = parser - - def get_conf(self): - args = vars(self.parser.parse_args()) - conf = self.get_defaults() - conf.update({k: v for k, v in args.items() if v is not None}) - return conf diff --git a/lib/examon-common/examon/utils/daemon.py b/lib/examon-common/examon/utils/daemon.py deleted file mode 100644 index 05a6d9a..0000000 --- a/lib/examon-common/examon/utils/daemon.py +++ /dev/null @@ -1,150 +0,0 @@ -import os -import sys -import signal -import atexit -import time - -from signal import SIGTERM - -class Daemon: - """ - A generic daemon class. - - Usage: subclass the Daemon class and override the run() method - """ - def __init__(self, pidfile, sig=signal.SIGTERM, stdin='/dev/null', stdout='/dev/null', stderr='/dev/null'): - self.stdin = stdin - self.stdout = stdout - self.stderr = stderr - self.pidfile = pidfile - self.sig = sig - - def daemonize(self): - """ - do the UNIX double-fork magic, see Stevens' "Advanced - Programming in the UNIX Environment" for details (ISBN 0201563177) - http://www.erlenstar.demon.co.uk/unix/faq_2.html#SEC16 - """ - try: - pid = os.fork() - if pid > 0: - # exit first parent - sys.exit(0) - except OSError, e: - sys.stderr.write("fork #1 failed: %d (%s)\n" % (e.errno, e.strerror)) - sys.exit(1) - - # decouple from parent environment - #os.chdir("/") - os.setsid() - os.umask(0) - - # do second fork - try: - pid = os.fork() - if pid > 0: - # exit from second parent - sys.exit(0) - except OSError, e: - sys.stderr.write("fork #2 failed: %d (%s)\n" % (e.errno, e.strerror)) - sys.exit(1) - - # redirect standard file descriptors - sys.stdout.flush() - sys.stderr.flush() - si = file(self.stdin, 'r') - so = file(self.stdout, 'a+') - se = file(self.stderr, 'a+', 0) - os.dup2(si.fileno(), sys.stdin.fileno()) - os.dup2(so.fileno(), sys.stdout.fileno()) - os.dup2(se.fileno(), sys.stderr.fileno()) - - # write pidfile - atexit.register(self.delpid) - pid = str(os.getpid()) - #file(self.pidfile,'w+').write("%s\n" % pid) - with open(self.pidfile,'w+') as fp: - fp.write("%s\n" % pid) - - def delpid(self): - os.remove(self.pidfile) - - def check_pid(self, pid): - """ Check For the existence of a unix pid. """ - try: - os.kill(pid, 0) - except OSError: - return False - else: - return True - - def start(self): - """ - Start the daemon - """ - # Check for a pidfile to see if the daemon already runs - try: - pf = file(self.pidfile,'r') - pid = int(pf.read().strip()) - pf.close() - except IOError: - pid = None - - if pid: - if self.check_pid(pid): - #message = "pidfile %s already exist. Daemon already running?\n" - message = "pidfile %s already exist and Daemon is running. Do nothing\n" - sys.stderr.write(message % self.pidfile) - sys.exit(1) - else: - message = "pidfile %s already exist and Daemon is NOT running. Restarting...\n" - sys.stderr.write(message % self.pidfile) - self.stop() - - # Start the daemon - self.daemonize() - self.run() - - def stop(self): - """ - Stop the daemon - """ - # Get the pid from the pidfile - try: - pf = file(self.pidfile,'r') - pid = int(pf.read().strip()) - pf.close() - except IOError: - pid = None - - if not pid: - message = "pidfile %s does not exist. Daemon not running?\n" - sys.stderr.write(message % self.pidfile) - return # not an error in a restart - - # Try killing the daemon process - try: - while True: - os.kill(pid, self.sig) - time.sleep(0.1) - except OSError, err: - err = str(err) - if err.find("No such process") > 0: - if os.path.exists(self.pidfile): - os.remove(self.pidfile) - else: - print str(err) - sys.exit(1) - - def restart(self): - """ - Restart the daemon - """ - self.stop() - self.start() - - def run(self): - """ - You should override this method when you subclass Daemon. It will be called after the process has been - daemonized by start() or restart(). - """ \ No newline at end of file diff --git a/lib/examon-common/examon/utils/executor.py b/lib/examon-common/examon/utils/executor.py deleted file mode 100644 index 4273b7f..0000000 --- a/lib/examon-common/examon/utils/executor.py +++ /dev/null @@ -1,73 +0,0 @@ - -import sys -import time -import copy -import logging -from concurrent.futures import ThreadPoolExecutor, as_completed, ProcessPoolExecutor -from multiprocessing import Process - -class Executor(object): - """ - Execute concurrent workers - """ - def __init__(self, executor='ProcessPool', keepalivesec=60): - self.executor = executor - self.workers = [] - self.keepalivesec = keepalivesec - self.logger = logging.getLogger('examon') - - - def add_worker(self, *args): - self.workers.append(copy.deepcopy(args)) - - - def exec_par(self): - if self.executor == 'ProcessPool': - with ProcessPoolExecutor() as pexecutor: - pfutures = [pexecutor.submit(*worker) for worker in self.workers] - results = [r.result() for r in as_completed(pfutures)] - return results - if self.executor == 'Daemon': - daemons = [] - for worker in self.workers: - if len(worker) > 1: - d = Process(target=worker[0], args=worker[1:]) - else: - d = Process(target=worker[0]) - daemons.append({'d': d, 'worker': worker}) - d.daemon = True - d.start() - try: - ''' - Auto-restart on failure. - Check every keepalivesec seconds if the worker is alive, otherwise - we recreate it. - ''' - n_worker = len(self.workers) - if self.keepalivesec > 0: - while 1: - alive_workers = 0 - time.sleep(self.keepalivesec) - for d in daemons: - if d['d'].is_alive() == False: - self.logger.warning("Process [%s], died. Try to restart..." % (d['d'].name)) - if len(d['worker']) > 1: - d_ = Process(target=d['worker'][0], args=d['worker'][1:]) - else: - d_ = Process(target=d['worker'][0]) - d['d'] = d_ - d_.daemon = True - d_.start() - time.sleep(1) - if d_.is_alive() == True: - alive_workers +=1 - else: - alive_workers +=1 - self.logger.info("%d/%d workers alive" % (alive_workers, n_worker)) - - for d in daemons: - d['d'].join() - print "Workers job finished!" - sys.exit(0) - except KeyboardInterrupt: - print "Interrupted.." \ No newline at end of file diff --git a/lib/examon-common/setup.py b/lib/examon-common/setup.py deleted file mode 100644 index ba38f3b..0000000 --- a/lib/examon-common/setup.py +++ /dev/null @@ -1,20 +0,0 @@ -# -*- coding: utf-8 -*- -from setuptools import setup - -setup(name='examon-common', - version='v0.2.3', - description='Examon common utilities', - url='http://github.com/fbeneventi/examon-common', - author='Francesco Beneventi', - author_email='francesco.beneventi@unibo.it', - license='MIT', - packages=['examon', 'examon.plugin', 'examon.utils', 'examon.db', 'examon.transport'], - install_requires=[ - 'requests == 2.21.0', - 'paho-mqtt == 1.4.0', - 'futures == 3.2.0', - 'setuptools == 40.6.3', - 'concurrent-log-handler == 0.9.16', - 'portalocker == 1.7.1' - ], - zip_safe=False) diff --git a/web/examon-server/.gitignore b/web/examon-server/.gitignore index 83658ec..88471e2 100644 --- a/web/examon-server/.gitignore +++ b/web/examon-server/.gitignore @@ -1,2 +1,6 @@ *.pyc *.log +flask +server.conf +*.bak +__pycache__ diff --git a/web/examon-server/doc.md b/web/examon-server/doc.md new file mode 100644 index 0000000..4ad147d --- /dev/null +++ b/web/examon-server/doc.md @@ -0,0 +1,154 @@ +# Examon API Documentation + +## Base URL + +http://172.16.47.112:5000/api/v1 + +## Authentication + +The authentication method is basic auth, using the provided username and password. + +## Endpoint: POST /examon/jobs/query + +This endpoint is used to query job information in the ExaMon system. The job information are stored in a table having the schema of the SLURM [job response API](https://slurm.schedmd.com/archive/slurm-22.05.10/rest_api.html#v0.0.38_job_response_properties) corresponding to the appropriate SLURM version. +In addition to the standard SLURM job properties, ExaMon adds the `energy` column to the table. It is a stringified JSON object with the following properties: + +- `job_id`: The ID of the job. +- `data_quality_(%)`: The data quality as a percentage. +- `version`: The version of the energy schema. +- `total_energy_consumption`: The total energy consumption for the job. +- `message`: A message about the data quality and score. +- `unit`: The unit of the energy consumption, such as "Wh". + +This endpoint supports pagination. To handle large tables, it is possible to define the time window, using the `tstart` and `tstop` properties, in order to limit the amount of data for each request. + +### Request + +The request should be a JSON object with the following properties: + +- `tags`: An object where each key is a tag name and the value is an array of tag values. For example, to filter by job_id, you would include `"tags": {"job_id": ["5296"]}`. +- `time_zone`: A string representing the time zone, such as "Europe/Rome". +- `aggrby`: This field is currently not used and should be set to null. +- `metrics`: An array of strings representing the job table name, such as `["job_info_E4red"]`. +- `limit`: Limits the number of rows in the table to a given value. +- `tstart`: A number representing the initial bound of the time window used to filter the data in milliseconds since the Unix epoch - Mandatory. +- `tstop`: A number representing the final bound of the time window used to filter the data in milliseconds since the Unix epoch - Optional. +- `groupby`: An array of objects, each with a `name` and `tags` property. The `name` should be "tag" and the `tags` should be an array of strings representing the selected column in the table. Use `"*"` as wildcard to select all columns of the table. + +### Response + +The response will be a serialized Pandas Dataframe in the `records` format, a collection of table rows, where each row is an array of `{Column_Name: Value}` objects. The columns returned are the ones specified in the `groupby` field of the request. +The response will be a JSON array of objects. Each object will have a key for each tag specified in the `groupby` field of the request. + +### Example + +In this example, we query for the `energy` column of the job table `job_info_E4red` corresponding to the job_id `5326`. + + +Request: + +```json +{ + "tags": { + "job_id": ["5326"] + }, + "time_zone": "Europe/Rome", + "aggrby": null, + "metrics": ["job_info_E4red"], + "limit": null, + "tstart": 1000, + "tstop": null, + "groupby": [ + { + "name": "tag", + "tags": ["energy"] + } + ] +} +``` + + +Response: + +```json +[ + { + "energy": "{\"job_id\": 5326, \"data_quality_(%)\": 100, \"version\": \"v0.1\", \"total_energy_consumption\": 6.388100000752343, \"message\": \"Missing nodes (%): 0.000000; Quality score (%): 100.000000; \", \"unit\": \"Wh\"}" + } +] +``` + +Notes: + +- In this example we filter by job_id so to extend the db search to the full table is suggested to use a low value (>0) for the mandatory tstart property. + + +Example + +This is a full example using the Python lunguage. + +```python +import requests +from requests.auth import HTTPBasicAuth + +# Endpoint +api_url = 'http://172.16.47.112:5000/api/v1/examon/jobs/query' + +# Replace these values with your actual username, and password +username = '' +password = '' + +# JSON data to be sent in the request body +data = { + "tags": { + "job_id": [ + "5326" + ] + }, + "time_zone": "Europe/Rome", + "aggrby": None, + "metrics": [ + "job_info_E4red" + ], + "limit": None, + "tstart": 1000, + "tstop": None, + "groupby": [ + { + "name": "tag", + "tags": [ + "energy" + ] + } + ] +} + +headers = { + 'Content-Type': 'application/json', + 'Accept-Encoding': 'gzip, deflate' +} + +# the payload should be encoded as follow: +json_data = json.dumps(json.dumps(data)).encode("utf-8") + +# Set up basic authentication +auth = HTTPBasicAuth(username, password) + +# Send POST request with JSON content and basic authentication +response = requests.post(api_url, data=json_data, auth=auth, headers=headers) + +# Check the response status code +if response.status_code == 200: + print("Request successful. Response:") + print(response.json()) +else: + print(f"Request failed with status code {response.status_code}. Response content:") + print(response.text) +``` + +Response: + +``` +Request successful. Response: +[{"energy":"{\"job_id\": 5326, \"data_quality_(%)\": 100, \"version\": \"v0.1\", \"total_energy_consumption\": 6.388100000752343, \"message\": \"Missing nodes (%): 0.000000; Quality score (%): 100.000000; \", \"unit\": \"Wh\"}"}] +``` \ No newline at end of file diff --git a/web/examon-server/example_server.conf b/web/examon-server/example_server.conf index 784742e..b474418 100644 --- a/web/examon-server/example_server.conf +++ b/web/examon-server/example_server.conf @@ -6,3 +6,7 @@ CASSANDRA_USER = '' CASSANDRA_PASSW = '' EXAMON_SERVER_HOST = 0.0.0.0 EXAMON_SERVER_PORT = 5000 +THREADS_NUM = 8 +SCHEDULER_TYPE = SLURM +CACHE_TYPE = simple +CACHE_TIMEOUT = 18000 \ No newline at end of file diff --git a/web/examon-server/requirements.txt b/web/examon-server/requirements.txt index d9fe025..8639ad1 100644 --- a/web/examon-server/requirements.txt +++ b/web/examon-server/requirements.txt @@ -1,20 +1,8 @@ -cassandra-driver==3.19.0 -certifi==2019.9.11 -chardet==3.0.4 -Click==7.0 -Flask==1.1.1 -Flask-HTTPAuth==3.3.0 -futures==3.3.0 -idna==2.8 -itsdangerous==1.1.0 -Jinja2==2.10.1 -MarkupSafe==1.1.1 -numpy==1.16.5 -pandas>=0.24.2 -python-dateutil==2.8.0 -pytz==2019.2 -requests==2.22.0 -six==1.12.0 -urllib3==1.25.6 -waitress==1.3.1 -Werkzeug==0.16.0 +cassandra-driver==3.29.2 +Flask==3.1.0 +Flask-HTTPAuth==4.8.0 +Flask-Caching +pandas==2.2.3 +pytz==2025.1 +requests==2.32.3 +waitress==3.0.2 diff --git a/web/examon-server/server.py b/web/examon-server/server.py index 5dc4325..d2b4ad0 100644 --- a/web/examon-server/server.py +++ b/web/examon-server/server.py @@ -6,6 +6,7 @@ from cassandra.query import dict_factory from cassandra.util import OrderedMapSerializedKey +import os import sys import json import pandas as pd @@ -20,64 +21,70 @@ import logging from logging.handlers import RotatingFileHandler -import ConfigParser +import configparser +from flask_caching import Cache -LOGFILE_SIZE_B = 5*1024*1024 +LOGFILE_SIZE_B = 5 * 1024 * 1024 LOG_LEVEL = logging.INFO LOGFILE = 'server.log' app = Flask(__name__, - static_url_path='', - static_folder='static/docs/html') + static_url_path='', + static_folder='static/docs/html') # enable gzipped responses gzip = Gzip(app) # enable basic auth auth = HTTPBasicAuth() -""" #conf = json.load(open('conf.json')) -c_auth = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASSW) -cluster = Cluster(contact_points=(CASSANDRA_IP,), auth_provider = c_auth) -session = cluster.connect(CASSANDRA_KEY_SPACE) -queries = {} """ +# Configure Flask-Caching +cache = Cache(app, config={ + 'CACHE_TYPE': 'simple', # Use in-memory cache + 'CACHE_DEFAULT_TIMEOUT': 18000 # 300 minutes +}) @auth.verify_password +@cache.memoize() def verify_password(username, password): ret = req.get(AUTH_URL, auth=RHTTPBasicAuth(username, password)) - logger.info('USER: %s ret: %s' % (username,str(ret.status_code),)) + logger.info('USER: %s ret: %s', username, str(ret.status_code)) if ret.status_code == 200: return True - else: - return False + return False + def get_prep_query(session, stmt): global queries query = queries.get(stmt) if query is None: - query = session.prepare(stmt) - queries[stmt]=query + query = session.prepare(stmt) + queries[stmt] = query return query def pandas_factory(colnames, rows): - # Convert tuple items of 'rows' into list (elements of tuples cannot be replaced) rows = [list(i) for i in rows] # Convert only 'OrderedMapSerializedKey' type list elements into dict for idx_row, i_row in enumerate(rows): for idx_value, i_value in enumerate(i_row): - if type(i_value) is OrderedMapSerializedKey: + if isinstance(i_value, OrderedMapSerializedKey): rows[idx_row][idx_value] = dict(rows[idx_row][idx_value]) - return [pd.DataFrame(rows, columns=colnames)] + df = pd.DataFrame(rows, columns=colnames) + df = localize_timestamps(df, 'UTC') + return [df] - -def get_jobs(stmt): +def get_jobs(stmt, fetch_size=20000): # Set a default fetch size df = pd.DataFrame() - for page in session.execute(stmt, timeout=120.0): - df = df.append(page, ignore_index=True) + # Set the fetch size for the query + statement = session.execute(stmt, timeout=120.0) + statement.fetch_size = fetch_size + for page in statement: + df = pd.concat([df, pd.DataFrame(page)], ignore_index=True) + logger.info('QUERYBUILDER: Number of records: %s', str(len(df))) return df @@ -87,10 +94,11 @@ def qb_get_tables(query): tables = ','.join(query['metrics']) return tables + def qb_get_columns(query): columns = '*' if query['groupby']: - if type(query['groupby']) == list: + if isinstance(query['groupby'], list): if len(query['groupby'][0]['tags']) > 0: columns = ','.join(query['groupby'][0]['tags']) else: @@ -102,43 +110,63 @@ def qb_get_columns(query): columns = '*' return columns + +def qb_get_aggrby(query): + aggrby = '' + if query.get('aggrby') and len(query['aggrby']) > 0: + aggrby = str(query['aggrby'][0]['name']) + return aggrby + + def qb_get_where(query): _where = '' if query['tags'] and (len(query['tags']) > 0): - for k,v in query['tags'].iteritems(): + for k, v in query['tags'].items(): for i in v: _where += ' AND ' - if k.lower() in ['user_id','job_id']: - _where += "{} = {}".format(str(k),str(i)) - elif k == 'node': - _where += "cpus_alloc_layout CONTAINS KEY '{}'".format(str(i)) - else: - _where += "{} = '{}'".format(str(k),str(i)) + # Different handling based on scheduler + if SCHEDULER_TYPE == 'SLURM': + if k in ['user_id', 'job_id']: + _where += "{} = {}".format(str(k), str(i)) + elif k == 'node': + _where += "cpus_alloc_layout CONTAINS KEY '{}'".format(str(i)) + else: + _where += "{} = '{}'".format(str(k), str(i)) + else: # PBS + if k.lower() in ['user_id', 'exit_status']: + _where += "{} = {}".format(str(k), str(i)) + elif k == 'node': + _where += "cpus_alloc_layout CONTAINS KEY '{}'".format(str(i)) + else: + _where += "{} = '{}'".format(str(k), str(i)) return _where + def qb_get_tstart(query): tstart = '' if query['tstart']: tstart = query['tstart'] return tstart + def qb_get_tstop(query): tstop = '' if query['tstop']: tstop = query['tstop'] return tstop + def qb_get_limit(query): limit = '' if query['limit']: limit = str(query['limit']) return limit + def query_builder(query): - """build a cassandra query. + """Build a cassandra query. Receive a serialized Query object and return a CQL query statement - """ cass_query = 'SELECT ' if qb_get_columns(query): @@ -149,11 +177,35 @@ def query_builder(query): if qb_get_tstart(query): tstart = qb_get_tstart(query) cass_query += ' WHERE ' - cass_query += '(start_time, end_time) >= ' + "({},{})".format(tstart, tstart) + + aggrby = qb_get_aggrby(query) + if SCHEDULER_TYPE == 'SLURM': + if aggrby == 'started': + cass_query += 'start_time >= ' + "{}".format(tstart) # return started job between dates + else: + cass_query += '(start_time, end_time) >= ' + "({},{})".format(tstart, tstart) + else: # PBS + if aggrby == 'started': + cass_query += 'stime >= ' + "{}".format(tstart) # return started job between dates + else: + cass_query += '(stime, mtime) >= ' + "({},{})".format(tstart, tstart) # executed job between dates + if qb_get_tstop(query): tstop = qb_get_tstop(query) cass_query += ' AND ' - cass_query += '(start_time, end_time) <= ' + "({},{})".format(tstop, tstop) + + aggrby = qb_get_aggrby(query) + if SCHEDULER_TYPE == 'SLURM': + if aggrby == 'started': + cass_query += 'start_time <= ' + "{}".format(tstop) # return started job between dates + else: + cass_query += '(start_time, end_time) <= ' + "({},{})".format(tstop, tstop) + else: # PBS + if aggrby == 'started': + cass_query += 'stime <= ' + "{}".format(tstop) # return started job between dates + else: + cass_query += '(stime, mtime) <= ' + "({},{})".format(tstop, tstop) # executed job between dates + if qb_get_where(query): cass_query += qb_get_where(query) if qb_get_limit(query): @@ -162,19 +214,30 @@ def query_builder(query): cass_query += ' ALLOW FILTERING' return cass_query +def localize_timestamps(df, tz): + for col in df.columns: + if pd.api.types.is_datetime64_any_dtype(df[col]): + # If the column is naive, localize it to UTC + if df[col].dt.tz is None: + df[col] = df[col].dt.tz_localize(tz) + # If the column has a timezone, convert it to UTC + else: + df[col] = df[col].dt.tz_convert(tz) + return df + @app.route('/') @auth.login_required def index(): return "Examon Server" - + @app.route('/docs') @app.route('/') @auth.login_required def serve_sphinx_docs(path='index.html'): return app.send_static_file(path) - + @app.route('/api/v1/examon/jobs/query', methods=['POST']) @auth.login_required def get_jobs_test(): @@ -182,17 +245,17 @@ def get_jobs_test(): logger.error('QUERY: No payload. Response: 400') abort(400) query = json.loads(request.json) - logger.info('QUERY: Received query: %s' % (query,)) + logger.info('QUERY: Received query: %s', query) try: stmt = query_builder(query) - logger.info('QUERYBUILDER: %s' % (stmt,)) + logger.info('QUERYBUILDER: %s', stmt) df_json = get_jobs(stmt).to_json(date_format='iso', orient='records') except Exception as e: - logger.error('QUERY: %s' % (stmt,)) + logger.error('QUERY: %s', stmt) import traceback - print traceback.format_exc() + print(traceback.format_exc()) if hasattr(e, 'message'): - logger.error('CASSANDRA: %s' % (e.message,)) + logger.error('CASSANDRA: %s', e.message) return jsonify(e.message), 400 logger.error('QUERY: response: 400') abort(400) @@ -207,33 +270,55 @@ def get_jobs_test_v2(): logger.error('QUERY: No payload. Response: 400') abort(400) query = request.json - logger.info('QUERY: Received query: %s' % (query,)) + logger.info('QUERY: Received query: %s', query) try: stmt = query_builder(query) - logger.info('QUERYBUILDER: %s' % (stmt,)) + logger.info('QUERYBUILDER: %s', stmt) df_ = get_jobs(stmt) - logger.info('QUERYBUILDER: Number of records: %s' % (str(len(df_)),)) - #logger.info('energy type: %s' % (df_['energy'].dtype,)) if 'energy' in df_: df_['energy'] = df_['energy'].apply(lambda x: json.loads(x) if not pd.isnull(x) else {}) df_json = df_.to_json(date_format='iso', orient='records') except Exception as e: - logger.error('QUERY: %s' % (stmt,)) + logger.error('QUERY: %s', stmt) import traceback - print traceback.format_exc() + print(traceback.format_exc()) if hasattr(e, 'message'): - logger.error('CASSANDRA: %s' % (e.message,)) + logger.error('CASSANDRA: %s', e.message) return jsonify(e.message), 400 abort(400) logger.info('QUERY: response: 200') + logger.debug('QUERY: response: %s', json.dumps(df_json, indent=4)) #return jsonify(json.loads(df_json)), 200 return Response(df_json, mimetype='application/json') + if __name__ == '__main__': + # logging + logger = logging.getLogger("waitress") + handler = RotatingFileHandler(LOGFILE, mode='a', maxBytes=LOGFILE_SIZE_B, backupCount=2) + log_formatter = logging.Formatter(fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s', + datefmt='%m/%d/%Y %I:%M:%S %p') + handler.setFormatter(log_formatter) + logger.addHandler(handler) + logger.setLevel(LOG_LEVEL) + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(log_formatter) + logger.addHandler(handler) + + # Check if configuration file exists + if not os.path.isfile('server.conf'): + logger.error("Configuration file 'server.conf' not found. Please install the workload scheduler plugin and configure the server.conf file.") + sys.exit(1) # Load Config. - config = ConfigParser.RawConfigParser() + config = configparser.RawConfigParser() config.read('server.conf') + + # Check if keyspace is defined + if not config.has_option('Server', 'CASSANDRA_KEY_SPACE') or not config.get('Server', 'CASSANDRA_KEY_SPACE'): + logger.error("CASSANDRA_KEY_SPACE is not defined in the configuration file.") + sys.exit(1) + AUTH_URL = config.get('Server', 'AUTH_URL') CASSANDRA_IP = config.get('Server', 'CASSANDRA_IP') CASSANDRA_KEY_SPACE = config.get('Server', 'CASSANDRA_KEY_SPACE') @@ -241,25 +326,20 @@ def get_jobs_test_v2(): CASSANDRA_PASSW = config.get('Server', 'CASSANDRA_PASSW') EXAMON_SERVER_HOST = config.get('Server', 'EXAMON_SERVER_HOST') EXAMON_SERVER_PORT = int(config.get('Server', 'EXAMON_SERVER_PORT')) + THREADS_NUM = int(config.get('Server', 'THREADS_NUM')) + - # logging - logger = logging.getLogger("waitress") - handler = RotatingFileHandler(LOGFILE, mode='a', maxBytes=LOGFILE_SIZE_B, backupCount=2) - log_formatter = logging.Formatter(fmt='%(asctime)s - %(name)s - %(levelname)s - %(message)s', datefmt='%m/%d/%Y %I:%M:%S %p') - handler.setFormatter(log_formatter) - logger.addHandler(handler) - logger.setLevel(LOG_LEVEL) - handler = logging.StreamHandler(sys.stdout) - handler.setFormatter(log_formatter) - logger.addHandler(handler) - #conf = json.load(open('conf.json')) + # Load scheduler type (SLURM or PBS) + SCHEDULER_TYPE = str(config.get('Server', 'SCHEDULER_TYPE', fallback='SLURM')) + logger.info("Starting examon server with scheduler type: %s", SCHEDULER_TYPE) + c_auth = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASSW) - cluster = Cluster(contact_points=(CASSANDRA_IP,), auth_provider = c_auth) + cluster = Cluster(contact_points=(CASSANDRA_IP,), auth_provider=c_auth) session = cluster.connect(CASSANDRA_KEY_SPACE) queries = {} # setup cassandra row factory session.row_factory = pandas_factory # run - serve(app, host=EXAMON_SERVER_HOST, port=EXAMON_SERVER_PORT, threads=8) + serve(app, host=EXAMON_SERVER_HOST, port=EXAMON_SERVER_PORT, threads=THREADS_NUM) From bde0038f399ad78fc879bdd1aa1d8d1c77ccf48e Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Mon, 23 Jun 2025 18:32:28 +0200 Subject: [PATCH 03/10] Fix: updated mosquitto conf (version 2.0.11) --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index 3b6df34..e0a7464 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM examonhpc/examon:0.3.0 +FROM examonhpc/examon:0.3.1 ENV EXAMON_HOME /etc/examon_deploy/examon From 81d7a7fb6f4b9194201a639e5b6a3c74dc654c97 Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Tue, 24 Jun 2025 13:12:17 +0200 Subject: [PATCH 04/10] Update: Docker configurations and enhance documentation --- docker-compose.yml | 2 +- docker/kairosdb/Dockerfile | 9 - docs/Administrators/Getting_started.md | 2 +- docs/overrides/home.html | 491 +++++++++++++++++++------ 4 files changed, 386 insertions(+), 118 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1de513a..98fe5c0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,7 +10,7 @@ services: examon: build: context: . - image: examonhpc/examon:0.3.1 + image: examonhpc/examon:latest volumes: - type: bind source: ./web/examon-server diff --git a/docker/kairosdb/Dockerfile b/docker/kairosdb/Dockerfile index 08a0698..761753d 100644 --- a/docker/kairosdb/Dockerfile +++ b/docker/kairosdb/Dockerfile @@ -2,15 +2,6 @@ FROM adoptopenjdk:8-jre-hotspot-focal -# Create a backup of the existing sources.list -#RUN mv /etc/apt/sources.list /etc/apt/sources.list.backup - -# Create a new sources.list file -#RUN touch /etc/apt/sources.list - -# Add the new server repository for focal packages -#RUN echo "deb https://ubuntu.mirror.garr.it/ubuntu/ focal main" > /etc/apt/sources.list - RUN set -eux; \ apt-get update; \ apt-get install -y --no-install-recommends \ diff --git a/docs/Administrators/Getting_started.md b/docs/Administrators/Getting_started.md index 2ac5fa4..cf9aba6 100644 --- a/docs/Administrators/Getting_started.md +++ b/docs/Administrators/Getting_started.md @@ -9,7 +9,7 @@ This setup will install all server-side components of the ExaMon framework: ## Prerequisites Since Cassandra is the component that requires the majority of resources, you can find more details about the suggested hardware configuration of the system that will host the services here: -[Hardware Configuration](https://cassandra.apache.org/doc/latest/operating/hardware.html#:~:text=While%20Cassandra%20can%20be%20made,at%20least%2032GB%20of%20RAM) +[Hardware Configuration](https://cassandra.apache.org/doc/latest/cassandra/managing/operating/hardware.html) To install all the services needed by ExaMon we will use Docker and Docker Compose: diff --git a/docs/overrides/home.html b/docs/overrides/home.html index 26c5c39..81fd417 100644 --- a/docs/overrides/home.html +++ b/docs/overrides/home.html @@ -317,101 +317,359 @@ .logo-grid { - flex-wrap: wrap; /* Allow flex items to wrap to the next row */ - justify-content: center; /* Center items horizontally within the container */ - } + flex-wrap: wrap; + justify-content: center; + align-items: center; + gap: 1rem; + } .logo-grid a { - flex: 1; /* Distribute available space equally among items */ - text-align: center; /* Center the text within each item */ - margin: 10px; /* Add some spacing between items */ - } + flex: 1; + text-align: center; + margin: 10px; + display: flex; + justify-content: center; + align-items: center; + } - /* Add media query using em units for max-width */ - @media screen and (max-width: 76.25em) { + .logo-grid img { + max-width: 100%; + height: auto; + object-fit: contain; + } - .top-hr { - display: grid; - } - /* Change the order of flex items within .second-row */ - .second-row .feature-item:nth-child(1) { - order: 2; /* Change order for the first .feature-item in the second-row */ - } + /* Modern Footer Styles - Material Theme Compatible */ + .modern-footer { + background-color: var(--md-footer-bg-color); + color: var(--md-footer-fg-color); + margin-top: 4rem; + border-top: 1px solid var(--md-default-fg-color--lightest); + } - .second-row .feature-item:nth-child(2) { - order: 1; /* Change order for the second .feature-item in the second-row */ - } + .footer-main { + padding: 3rem 0 2rem; + } - /* Disable Menu drawer: broken in hompage */ - .md-header__button:not([hidden]) { - display: none; - } + .footer-content { + display: grid; + grid-template-columns: 2fr 1fr 1fr 1fr; + gap: 2rem; + max-width: 1200px; + margin: 0 auto; + padding: 0 1rem; + } - .md-sidebar { - display: none; - } - } + .footer-section { + display: flex; + flex-direction: column; + } + .footer-brand { + max-width: 300px; + } - /* footer */ + .footer-logo { + max-height: 60px; + width: auto; + margin-bottom: 1rem; + filter: brightness(0.9); + object-fit: contain; + max-width: 100%; + } - footer { - background-color: #000000de; - color: #fff; - padding: 20px 0; + /* Dark mode logo adjustment */ + [data-md-color-scheme="slate"] .footer-logo { + filter: brightness(1.2); } - .footer-content { - display: flex; - justify-content: space-between; - align-items: center; - max-width: 1200px; - margin: 0 auto; + .footer-description { + color: var(--md-footer-fg-color--light); + line-height: 1.6; + margin-bottom: 1.5rem; + font-size: 0.9rem; + } + + .footer-title { + color: var(--md-footer-fg-color); + font-size: 1.1rem; + font-weight: 600; + margin-bottom: 1rem; + position: relative; + text-align: left; } - .logo-section img { - max-height: 50px; - margin-right: 20px; + .footer-title::after { + content: ''; + position: absolute; + bottom: -0.5rem; + left: 0; + width: 2rem; + height: 2px; + background: var(--md-primary-fg-color); + border-radius: 1px; } - nav ul { + .footer-links { list-style: none; + padding: 0; + margin: 0; + } + + .footer-links li { + margin-bottom: 0.7rem; + } + + .footer-links a { + color: var(--md-footer-fg-color--light); + text-decoration: none; + font-size: 0.9rem; + transition: all 0.3s ease; + position: relative; + } + + .footer-links a:hover { + color: var(--md-footer-fg-color); + padding-left: 0; + } + + .footer-social { display: flex; + gap: 1rem; + margin-top: 1rem; + } + + .social-link { + display: flex; + align-items: center; justify-content: center; - flex-wrap: wrap; /* Allow flex items to wrap to the next row */ + width: 2.5rem; + height: 2.5rem; + /* background: var(--md-default-fg-color--lightest); */ + background: var(--md-footer-fg-color--light); + border-radius: 50%; + color: var(--md-footer-fg-color--lighter); + text-decoration: none; + transition: all 0.3s ease; + flex-shrink: 0; } - nav li { - margin: 0 10px; + .social-link .twemoji { + width: 1.2rem; + height: 1.2rem; + color: var(--md-footer-fg-color--lighter); + display: flex; + align-items: center; + justify-content: center; + flex-shrink: 0; + } + + .social-link .twemoji svg { + width: 100%; + height: 100%; + object-fit: contain; + } + + .social-link:hover { + background: var(--md-primary-fg-color); + color: var(--md-primary-bg-color); + transform: translateY(-2px); + box-shadow: 0 4px 12px var(--md-shadow-z2); + } + + .social-link:hover .twemoji { + color: var(--md-primary-bg-color); } - nav a { + .footer-bottom { + background: var(--md-default-bg-color--lighter); + border-top: 1px solid var(--md-default-fg-color--lightest); + padding: 1.5rem 0; + } + + /* Dark mode footer bottom adjustment */ + [data-md-color-scheme="slate"] .footer-bottom { + background: var(--md-default-bg-color--light); + } + + .footer-bottom-content { + display: flex; + justify-content: space-between; + align-items: center; + max-width: 1200px; + margin: 0 auto; + padding: 0 1rem; + } + + .footer-copyright p { + margin: 0.2rem 0; color: var(--md-footer-fg-color--lighter); + font-size: 0.85rem; + } + + .footer-copyright a { + color: var(--md-footer-fg-color--light); text-decoration: none; - font-size: 0.8rem; - transition: all 0.3s ease-in-out; + transition: color 0.3s ease; } - nav a:hover { - color: #0088cc; + .footer-copyright a:hover { + color: var(--md-footer-fg-color); } - .social-media { + .footer-legal { display: flex; + align-items: center; + gap: 0.5rem; + } + + .footer-legal a { + color: var(--md-footer-fg-color--lighter); + text-decoration: none; + font-size: 0.85rem; + transition: color 0.3s ease; } - .social-icon { - color: #fff; - font-size: 1.5rem; - margin-right: 10px; - transition: all 0.3s ease-in-out; + .footer-legal a:hover { + color: var(--md-footer-fg-color); } - .social-icon:hover { - color: #0088cc; + .separator { + color: var(--md-footer-fg-color--lightest); + font-size: 0.85rem; } + /* --- Responsive Design --- */ + @media screen and (max-width: 1024px) { + /* -- Tablet and Mobile -- */ + + /* Hide navigation */ + .md-sidebar--primary { + display: none; + } + + .top-hr { + display: flex; + flex-direction: column; + gap: 2rem; + margin-right: auto; + margin-left: auto; + padding: 0 .2rem; + max-width: 61rem; + } + + .second-row .feature-item:nth-child(1) { order: 2; } + .second-row .feature-item:nth-child(2) { order: 1; } + + .top-hr .feature-item { + margin-bottom: 1.5rem; + text-align: center; + } + + .feature-item img { + max-width: 100%; + height: auto; + object-fit: contain; + max-height: 400px; + } + + .logo-grid { + display: flex; + flex-direction: column; + gap: 2rem; + width: 100%; + } + + .logo-grid a { + flex: none; + margin: 0; + width: 60%; + max-width: 250px; + } + + .logo-grid img { + width: 100%; + max-height: 160px; + height: auto; + } + + .footer-content { + grid-template-columns: 1fr 1fr; + } + + .footer-brand { + grid-column: 1 / -1; + max-width: 100%; + display: flex; + flex-direction: column; + align-items: center; + } + + .footer-brand .footer-title { + text-align: center; + } + + .footer-brand .footer-title::after { + left: 50%; + transform: translateX(-50%); + } + } + + @media screen and (max-width: 768px) { + /* -- Mobile Only -- */ + + .top-hr { + display: grid; + } + + .footer-content { + grid-template-columns: 1fr; + } + + .footer-section { + text-align: center; + } + + .footer-title { + text-align: center; + } + + .footer-title::after { + left: 50%; + transform: translateX(-50%); + } + + .footer-brand { + text-align: center; + } + + .footer-logo { + max-height: 50px; + } + + .footer-bottom-content { + flex-direction: column; + gap: 1rem; + text-align: center; + } + + .footer-social { + justify-content: center; + flex-wrap: wrap; + } + + .logo-grid a { + width: 80%; + max-width: 300px; + } + + .md-header__button:not([hidden]) { + display: none; + } + .md-sidebar { + display: none; + } + } @@ -577,11 +835,6 @@

Partners

-
-
-
-
-

European Projects

@@ -606,53 +859,77 @@

European Projects

{% block footer %} - +{% endblock %} \ No newline at end of file From 39fb738b2e6b93973bdf6e291eac4c8c423eba75 Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Wed, 25 Jun 2025 11:23:17 +0200 Subject: [PATCH 05/10] Enhance documentation: Add 'Contributing' section and update plugin dev section --- docs/Plugins/examon_pub.ipynb | 8 ++-- docs/contributing.md | 88 +++++++++++++++++++++++++++++++++++ docs/overrides/home.html | 4 +- mkdocs.yml | 1 + web/.gitkeep | 0 5 files changed, 95 insertions(+), 6 deletions(-) create mode 100644 docs/contributing.md create mode 100644 web/.gitkeep diff --git a/docs/Plugins/examon_pub.ipynb b/docs/Plugins/examon_pub.ipynb index 28681d2..3e0eadc 100644 --- a/docs/Plugins/examon_pub.ipynb +++ b/docs/Plugins/examon_pub.ipynb @@ -6,12 +6,12 @@ "metadata": {}, "source": [ "# Example plugin\n", - "This notebook shows how to create a simple Examon publisher using Python (v3)\n", + "This notebook shows how to create a simple Examon publisher using Python.\n", "\n", "## Install \n", - "Install the publisher library.\n", + "Install the latest Examon publisher library.\n", "\n", - "NOTE: This is a development release so the final API may be different in future versions." + "https://github.com/ExamonHPC/examon-common\n" ] }, { @@ -23,7 +23,7 @@ }, "outputs": [], "source": [ - "! python -m pip install --upgrade https://github.com/fbeneventi/releases/releases/latest/download/examon-common-py3.zip" + "! python -m pip install --upgrade git+https://github.com/ExamonHPC/examon-common.git@master" ] }, { diff --git a/docs/contributing.md b/docs/contributing.md new file mode 100644 index 0000000..0480051 --- /dev/null +++ b/docs/contributing.md @@ -0,0 +1,88 @@ +# Contributing to ExaMon + +First off, thank you for considering contributing to our project! + +## How Can I Contribute? + +### Reporting Bugs + +Before creating bug reports, please check the issue list as you might find out that you don't need to create one. When you are creating a bug report, please include as many details as possible: + +* Use a clear and descriptive title +* Describe the exact steps which reproduce the problem +* Provide specific examples to demonstrate the steps +* Describe the behavior you observed after following the steps +* Explain which behavior you expected to see instead and why +* Include screenshots if possible + +### Suggesting Enhancements + +If you have a suggestion for the project, we'd love to hear about it. Please include: + +* A clear and detailed explanation of the feature +* The motivation behind this feature +* Any alternative solutions you've considered +* If applicable, examples from other projects + +### Pull Request Process + +1. Fork the repository and create your branch from `master` +2. If you've added code that should be tested, add tests +3. Ensure the test suite passes +4. Update the documentation if needed +5. Issue that pull request! + +#### Pull Request Guidelines + +* Follow our coding standards (see below) +* Include relevant issue numbers in your PR description +* Update the README.md with details of changes if applicable +* The PR must pass all CI/CD checks [TBD] +* Wait for review from maintainers + +### Development Setup + +1. Fork and clone the repo +3. Create a branch: `git checkout -b my-branch-name` + +### Coding Standards + +* Use consistent code formatting +* Write clear commit messages following [Conventional Commits](https://www.conventionalcommits.org/) or at least the basic specification as in the [Commit Messages](#commit-messages) section. +* Comment your code where necessary +* Write tests for new features +* Keep the code simple and maintainable + +### Commit Messages + +Basic specification example: + +``` +type(scope): description +[optional body] +[optional footer] +``` + +The type should be one of the following: + +| Type | Description | +|------|-------------| +| add | Introduces a new feature or functionality | +| fix | Patches a bug or resolves an issue | +| change | Modifies existing functionality or behavior | +| remove | Deletes or deprecates functionality | +| merge | Combines branches or resolves conflicts | +| doc | Updates documentation or comments | + + +### First Time Contributors + +Looking for work? Check out our issues labeled `good first issue` or `help wanted`. + +## License + +By contributing, you agree that your contributions will be licensed under the same license that covers the project. + +## Questions? + +Don't hesitate to contact the project maintainers if you have any questions! diff --git a/docs/overrides/home.html b/docs/overrides/home.html index 81fd417..b3632c4 100644 --- a/docs/overrides/home.html +++ b/docs/overrides/home.html @@ -900,7 +900,7 @@
  • Administrator Guide
  • User Guide
  • Plugin Development
  • -
  • Query Examples
  • +
  • Tutorials
  • @@ -911,7 +911,7 @@
  • Contact Us
  • Community
  • Report Issues
  • -
  • Contributing
  • +
  • Contributing
  • diff --git a/mkdocs.yml b/mkdocs.yml index f063876..dde70b3 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -71,6 +71,7 @@ nav: - Credits: "credits.md" - Contact Us: "contactus.md" - Publications: "publications.md" + - Contributing: "contributing.md" - Blog: - blog/index.md diff --git a/web/.gitkeep b/web/.gitkeep new file mode 100644 index 0000000..e69de29 From 17a79c23e4c1a69d816f11597662b8cd860e9a4b Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Wed, 2 Jul 2025 20:21:57 +0200 Subject: [PATCH 06/10] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 69aa7fe..3948cdd 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ It can be disabled as described in the [Enable/disable plugins](#enabledisable-t ## Prerequisites Since Cassandra is the component that requires the majority of resources, you can find more details about the suggested hardware configuration of the system that will host the services here: -[Hardware Configuration](https://cassandra.apache.org/doc/latest/operating/hardware.html#:~:text=While%20Cassandra%20can%20be%20made,at%20least%2032GB%20of%20RAM) +[Hardware Configuration](https://cassandra.apache.org/doc/latest/cassandra/managing/operating/hardware.html) To install all the services needed by ExaMon we will use Docker and Docker Compose: From 4da6de7950b5de18c7aca3046de77d8a8b3f4f9d Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Mon, 14 Jul 2025 18:42:07 +0200 Subject: [PATCH 07/10] Enhance: Improved cassandra connection and query execution in the examon server --- web/examon-server/server.py | 116 ++++++++++++++++++++++++++++++------ 1 file changed, 98 insertions(+), 18 deletions(-) diff --git a/web/examon-server/server.py b/web/examon-server/server.py index d2b4ad0..b6d418c 100644 --- a/web/examon-server/server.py +++ b/web/examon-server/server.py @@ -1,8 +1,9 @@ #!flask/bin/python from flask import Flask, jsonify, request, abort, Response -from cassandra.cluster import Cluster +from cassandra.cluster import Cluster, OperationTimedOut, NoHostAvailable from cassandra.auth import PlainTextAuthProvider +from cassandra.policies import ExponentialReconnectionPolicy from cassandra.query import dict_factory from cassandra.util import OrderedMapSerializedKey @@ -24,6 +25,8 @@ import configparser from flask_caching import Cache +import time + LOGFILE_SIZE_B = 5 * 1024 * 1024 LOG_LEVEL = logging.INFO @@ -77,15 +80,37 @@ def pandas_factory(colnames, rows): return [df] -def get_jobs(stmt, fetch_size=20000): # Set a default fetch size +def get_jobs(stmt, fetch_size=20000, max_retries=3): # Set a default fetch size + """Execute query with automatic retry on connection failures.""" + df = pd.DataFrame() - # Set the fetch size for the query - statement = session.execute(stmt, timeout=120.0) - statement.fetch_size = fetch_size - for page in statement: - df = pd.concat([df, pd.DataFrame(page)], ignore_index=True) - logger.info('QUERYBUILDER: Number of records: %s', str(len(df))) - return df + last_exception = None + + for attempt in range(max_retries): + try: + # Set the fetch size for the query + statement = session.execute(stmt, timeout=120.0) + statement.fetch_size = fetch_size + for page in statement: + df = pd.concat([df, pd.DataFrame(page)], ignore_index=True) + logger.info('QUERYBUILDER: Number of records: %s', str(len(df))) + return df + + except (OperationTimedOut, NoHostAvailable) as e: + last_exception = e + logger.warning(f'Connection issue during query execution (attempt {attempt + 1}/{max_retries}): {e}') + + if attempt < max_retries - 1: + # Short delay before retry to allow driver reconnection + time.sleep(1) + continue + else: + logger.error(f'Query failed after {max_retries} attempts due to connection issues') + raise + except Exception as e: + # For non-connection related errors, don't retry + logger.error(f'Query failed with non-connection error: {e}') + raise def qb_get_tables(query): @@ -249,16 +274,24 @@ def get_jobs_test(): try: stmt = query_builder(query) logger.info('QUERYBUILDER: %s', stmt) - df_json = get_jobs(stmt).to_json(date_format='iso', orient='records') + df_json = get_jobs(stmt, max_retries=5).to_json(date_format='iso', orient='records') except Exception as e: logger.error('QUERY: %s', stmt) import traceback print(traceback.format_exc()) + + # Check if it's a connection-related error + if isinstance(e, (OperationTimedOut, NoHostAvailable)): + logger.error('CASSANDRA CONNECTION: %s', str(e)) + return jsonify({'error': 'Database temporarily unavailable, please try again later'}), 503 + + # Handle other errors if hasattr(e, 'message'): logger.error('CASSANDRA: %s', e.message) - return jsonify(e.message), 400 - logger.error('QUERY: response: 400') - abort(400) + return jsonify({'error': e.message}), 400 + else: + logger.error('QUERY: Unexpected error: %s', str(e)) + return jsonify({'error': 'Query execution failed'}), 400 logger.info('QUERY: response: 200') return jsonify(df_json), 200 @@ -282,16 +315,59 @@ def get_jobs_test_v2(): logger.error('QUERY: %s', stmt) import traceback print(traceback.format_exc()) + + # Check if it's a connection-related error + if isinstance(e, (OperationTimedOut, NoHostAvailable)): + logger.error('CASSANDRA CONNECTION: %s', str(e)) + return jsonify({'error': 'Database temporarily unavailable, please try again later'}), 503 + + # Handle other errors if hasattr(e, 'message'): logger.error('CASSANDRA: %s', e.message) - return jsonify(e.message), 400 - abort(400) + return jsonify({'error': e.message}), 400 + else: + logger.error('QUERY: Unexpected error: %s', str(e)) + return jsonify({'error': 'Query execution failed'}), 400 logger.info('QUERY: response: 200') logger.debug('QUERY: response: %s', json.dumps(df_json, indent=4)) #return jsonify(json.loads(df_json)), 200 return Response(df_json, mimetype='application/json') +def connect_to_cassandra_with_retry(cassandra_ip, cassandra_user, cassandra_passw, cassandra_keyspace, max_retries=30, initial_delay=1, max_delay=60): + """Connect to Cassandra with retry logic and exponential backoff. + + """ + delay = initial_delay + last_exception = None + + for attempt in range(max_retries): + try: + logger.info(f"Attempting to connect to Cassandra (attempt {attempt + 1}/{max_retries})...") + + c_auth = PlainTextAuthProvider(username=cassandra_user, password=cassandra_passw) + cluster = Cluster(contact_points=(cassandra_ip,), auth_provider=c_auth, reconnection_policy=ExponentialReconnectionPolicy(base_delay=1, max_delay=60)) + session = cluster.connect(cassandra_keyspace) + + logger.info("Successfully connected to Cassandra") + return cluster, session + + except Exception as e: + last_exception = e + logger.warning(f"Failed to connect to Cassandra (attempt {attempt + 1}/{max_retries}): {e}") + + if attempt < max_retries - 1: # Don't sleep on the last attempt + logger.info(f"Retrying in {delay} seconds...") + time.sleep(delay) + + # Exponential backoff with max limit + delay = min(delay * 2, max_delay) + + # If we get here, all retries failed + logger.error(f"Failed to connect to Cassandra after {max_retries} attempts") + raise last_exception + + if __name__ == '__main__': # logging logger = logging.getLogger("waitress") @@ -334,9 +410,13 @@ def get_jobs_test_v2(): SCHEDULER_TYPE = str(config.get('Server', 'SCHEDULER_TYPE', fallback='SLURM')) logger.info("Starting examon server with scheduler type: %s", SCHEDULER_TYPE) - c_auth = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASSW) - cluster = Cluster(contact_points=(CASSANDRA_IP,), auth_provider=c_auth) - session = cluster.connect(CASSANDRA_KEY_SPACE) + # Connect to Cassandra with retry logic + cluster, session = connect_to_cassandra_with_retry( + cassandra_ip=CASSANDRA_IP, + cassandra_user=CASSANDRA_USER, + cassandra_passw=CASSANDRA_PASSW, + cassandra_keyspace=CASSANDRA_KEY_SPACE + ) queries = {} # setup cassandra row factory From e9ae388cc80e6bb9dba75dd21e3589ee1679b82f Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Tue, 15 Jul 2025 10:15:09 +0200 Subject: [PATCH 08/10] Update: examon-cli to the latest version v0.2.0 --- .../docs/html/downloads/examon-cli-stable.zip | Bin 3979 -> 4962 bytes 1 file changed, 0 insertions(+), 0 deletions(-) diff --git a/web/examon-server/static/docs/html/downloads/examon-cli-stable.zip b/web/examon-server/static/docs/html/downloads/examon-cli-stable.zip index 9dfd98eb881943f222f9734d5e22782cb4305ee6..e422aff55b286cca764f86b773de1373d29d3142 100644 GIT binary patch literal 4962 zcma)=2{_d27stoW5Mv9u)}n@tvG2*wjD3wTW{hPhCSu4|X(D9F5*d_h%M!^_*6gH3 zcFK~h*ODcqEdO!U)h*rr?>z7P=9%YtKj*yXymQ|39YZ)7IS{ZrvZ+5<{r%zV8zX=Q z;NXKi@9rjIe-0x8g@{9-qGqNP0Fp{tZ)d}Qty7k$ncyd`+nt;Tt%EpWE@GVA+&vur zAqYXRzV<-TJrf9`35bJ}lL*Gm(H$s?^1_@$1Hajz{@XVobz(Rd2>`%O3IH5A$cC|o zin^W#`Di&2HXv4l&dj6>UJ}i|)@I zG8XI9+dMu}idM3?Fw`7)ql;TF5!(2PN*6vIY}c@`4CpxtmpNd9{uc8=*p3D)$&&6BAGa87V)^T`GC)hViP(fv8|tOrFGiN*XX``@1$ z-e0XnATVy2iwFcU>;L5En+tmhwg3PZ1jqq^Wa4?`{}txzaUpv1Jt*EB;XbAb3Ys#B zAW*5P)9K{ps-{<1KfC_~_T+?48b4ZOjoshLduYguiD#@5=h(-7v=jc4VKik&;!83| z+DCl=0MTgGb|%i(w6~gBt1^EMHbdjzsVeBqM8RV;#tciAVVbJF*<2d|X=Y|_+9t&1Lub^Ibw1{6VEo@kHu;Jb zWSrB-+uq?Lqz3v;mJ5mn=08}c3X~j#!P%B#tb{{FX%fCHYi-XlRT&$n!@Zk1s$Kk< zAAmU?HeyNN9gks4^w482d{*d(0bM;afC>od#&CoQMI;yOu zh3<%;K3qYMlYXP{zSj~PtW8U9q7NIa#bwb8+PGQrPxjQzN z^2wH~l+K56wRbBo;RIrylRtvxF=^pkZmMW$p1gg@DE8^0Hm_;d5_yB?Mu1b$r29T= zdCii}Nh^^aF@>If=emvvOLo0#dB4PG6W=lhFUaj|@@B7}%;6_NC>+-*ojX>J8-@E@ z!&+8bfsu(TWW<;&2Fw-);rTTN{}>WJgaNJy%KPQOWjd&bHBzY;QW#%_VR`I*lCTyj zFOKQayJ#Io1UeZqFZ#CPZQPI3n`Y+F3K&C$%~-)|^p?qk5l19UH14Ga;ED?$s}Pcn zFr`@{J-!Z;b|BxaK*NE#xpliDPw%M1xlD(vo?hvb590cy=uftkmBtO(*uKE&J@>l# z@s`I<$6Ax8{$Xn}ff`4*`K+=z&ms}3#s$lZ*m?C z_VnI4es?fTS;+XJs&DTy|EusMsxGi~M9UW=ZL;$gE;W2z?xl&nJaaf%%aevoBlM#T z?6vZG40HW0OPgXJ&m5XciMmo>r4-)4IdR+|ODIXYYu#`qgcG`gpjjxBFTL`@)oef$ zTP}?D%!`f8Xku%p9aa8TZGJ9ze;ib8_AjRYr|up94dVCZFHA(_g*YVuV9pExfPX{V z_wD;CSYR>a{ze)U)Q6~(V@QzKz;q%RC{6QeZ%Li$V|5Ky%D8eGUh+acLGqRdie+O2 z*TRrgJf0~sN%baUArPmyy`ZM0OeirXJoojg&pm`Gb1}21Gs3D7N{^p+3mOP7_Y)Q$Kn|FLOlzdRmpTU_qg zE+=Rq;CC_H%8Dh|Tp$fsQzs5JaBg7c{$y3HL*qT8ID$~!viwa7wV zJKdo$YdK^`u2!k5B#NRN7FWxg>HZWzSJBm~a;9fyX;pGfF`_LjUQfQdL#Xi$+gw@# zt$9Y8di!-K4=78an>(U>hwk~=YXeP4h89KMk`v_ES>F;(1w6?gRK-!vMhWR|$xtA$ z>~wPoaeMKryK97h63~Q)T`s z{Jy#A@ay3|*4d`k@n_rRkYHEk14D1vcvKEc1dpbq@AX*FZpc zVrsii)JLo0R~rwRZ*??=CW!If7Lf&F>P~EHCit|fItwT$drN_=?kPl6ih5jH%f(%) z!A5%CmxDvEq=b9BQ@KGRC-US8*4RZeSubts>{7Q(e$VT8-v(N9-E{qtV)z%!wR0Aw zM@m1ewFLMu7!)pUx;oE}IAbQ;RZm5fHeio6j8&V4X&EbU>x}h(E?QxHOWLKNyrv%# zt8EORa-c?uy)z@p)(r~d_gGBa!HtiTtme6>!Hd+IiZ#eFBv(}?p2o#RKX-V1+lPl0 z4}&2~p6aW3Hpx8Ku2>FJY9eietonx@UFNjr8Zxx)^lg(zrf0M33M}HMx3eY`+VW#{ zb!h%Ho8CAX>sJH*=MmnS>+VN!;R{ZX=4<>kTT<$A^jW15?IxzyXJHL@aCg~FeFj%vkU^S_#4 zr z*h$}@JmO=vQPB}Ax9purswOL`~N6sD1|8lLb6y=k?T%Pw1?>mzM>WP#R-#Z_k2ejGCS+&7WD&*a|9`gPA! ze%VNu7JV=s@Asam|NScA614~tngmSfSy9N@MAmtFojH7MacgtNQ1`Af;K|48>%2U; za(@7mq=!|4=be!dQQNx|fV;DdOWTVbWJ}De6%~bc{quj_yIt-FZs7LpJ&|3&W|-mN zadXCtk9>_%NAuj;97d(cgL=E3BuI+gnV;FG^dtT2;PQ^zNAp!#=^cg5>F4RA0%KqG zNR7YtFG0Mar-doB7=- z_?sKZ+rh&VtT^@Bis{StxTRpLGF{kLK~{)37$V~d;^BJ$ISN{-_o8=ejqy|CY7 z&7xJ@8hPn7)(Ka+c=aT{#5~r(nK|%{aohvxUNM!(y|m7x9oH>98x*8Gu&yrg>P*6! zQ2$A{$T*klx&(_FBx^4LRE!aLxjV<+O%T(nBvc8 z-yFVnrZ9@4x9hmY%a=!5tc#sHop8{&k5{!Jarr^|eqi)6^?)Fo>tr*7Gv{=9ty#^f zVdx+8^PgZ1r3<&kR*52vgeYf!Ex`Ar6=?7ApK|=)oF4`GE`S(u;*cfIz4qZdS$}{$ z-}mfb`Mt~9BN7fG{w&7#fj>83yNYvf>~8flk^7U2Kj_bYZ};oszaP5_ba(7ICru3R zfvkfYvTxXf73n@U;8)l^TZkX@>3{8gxB1)~yL+D{c7Q+m`xQ$3kL3GS^qa3ghjX_X z-5q-&fKdH6^8bm%kIm^m-TycAec!+TQU4f_A4|_ZfBVpXdw)P7+K2C@@yD984@c1- z0RO218Nw-uSpfiOiLXpD03e6~05}0aOUubhA!WqmkSJLhaXDGEm=sD(#!*s6Rsto7 Pk`qVCN=ZY-?Ct*t_=KkI literal 3979 zcma)<2|SeR7ssb;V;{>+(pZPrR`+ny0Ip60z=e*}Uzt$?NVD5hQ9PpI|F!Tjr1V=}CBH6)>1CH}1y5c#0E&=@8GY(*C+|Dfk!1k>G0CatI9My(64J5D$9ugG@;4kjFD0_Z(eo;?S z=7F)iimvAoUt2_tnSEl!JZ+=n6aqz=bDz%Rru3A;s16p{C)^jTynH*-op4jzm@;p&)a z4LCO5ZPT4yWq7u3aN1A<#+w-QJu0ulIp9%??-#QZ3>QgvCAM8aetmoA2k|g*EsbF@JjBAQjp7EhrDG3#ma7s zg@x@8n;{rI8wx9m7taQ52al#I{gG3;BzLt01%X~|lAE&pGQ##VxEs?hxJhua?!;fE ze(f70@(WW8hDauQVKDTR|C61c4y0E9&?ODA@*onSNRo-zo%-w37PE9mp?z>=g5?u<+X43e9d&dMjx+3 z8y8^ad~(Kv!|jxUD^_tf7GJ3ZpynPddr^3!iejKvKJn?TrC~PAl6RRcgqoe#Z`xQf zF!34o(JewxRw}7h*iaUioPBMnhdf;xlM}-|qGlKieqUfOpf~JYTRP}ie}Slumc|%2 znaee(!?G2D5_{5=Og>Pac(L#F${9v>yu{S$< zfp8O4T0sT#o%_d|23yfFg&Ny^B}PJ1%rjk_^I|IJ2_R;(j5tq!nBShBH~CQEr!oUQ z)vr-$C*ol=zAV{~eo$){q}9}A52V%($5^DzBJWYtWasCm(~(M~$bOtP=jTH+(I6Is z=zO2EE$XL5_WGlbg&xAV9=~Vy_!R1C-=uMR13dbOn=t7W4@K0w;TXpUYO#2gMAR#Y znJ9;%t>BW1>zJzL8>5UMOr8a_Bk;5H`M3u~vzaxNHduX4oLQM}hcSECYj5V7T%XRG z0548FO*QSt#8M!)uD@KwVomd8nNVqmbR}o9mkJY3`a8F@345DhT2^DTwtL>lvJCw7 za%^M(+&b~$H#wt{*stHkiqn}S&wM3K4z*@|&4@0s?nicue>lmn8U}&7J0pQN*l--k?Ai|$4?RW z(+Is;yZ9Mji!1Epp6!@@&mwWNC14an;X9GK#br^Yh(}nn7*!D(Wd>cWkCe!7LcTrS zzQpviPBs=QWRp5sZKwaGUL}7|`2S@~!nI{qGXVgJ^bdzcCF;U!RZ#sn zTEC9%(OFAFRUKYdb7lGR(XFI1r|N6fqE0c@mnlanxwYP5OEeyg|7i-0qk|14+!{FB z>hAEcxKy>yGJU5IMAESRN~=lidzsr?0dqp6JW-3rxG7=0TY$N-^(_l?bA2>_y<_N= zRKz}~+HVfCA6kcpU$$~{@b~(dg69kPRB9z`zobr3T(cYRwt*xFvk$qMBLbwBsg=V{ zLn_4PI4T?F&*2bo=+w)1`aX`IOs0UPb>G4lW1yUZBq00NLl<`!=6208CC@;$rV{4T zpjy@`WHAFz_;|r9Bh!~=;36@ucW?FiKE737$k^R?TkTI@agDGGa&k}Dg(t*M57NKxI%x=vMY0p;B)Iu)3a1kp@)vO$O&@z2X}tmbn?qq54y1a0?z zOQEW9-!?G_(Me!}RPWWN9{WnEDmd;;Az_+VQC~+o6+DEq{A}oHJB#8wH0*geUAVtm zH>YdcxtSyRj#sp< z8+>S?VxM*5?qAMxXGBPu@^W5OetI#b+ zc}tKH?cY+u(vdY^ve2}Jv4;>=pJ7+{9V4$gkEG{gLf<4Q5sQsC6WZpSlw$_@AM+bu z9t`)NEF9qfv!n6Pm`>`L)T7c73SHFMX>hRS3}1;jJ4yVqadnv^ceH*jNbxEd|I zGI@|i+PI93roOOFJTy1K+=b{efHC*{z6tmcPI(gD$ZIG({B0Ni8=PlA`X`&z0+f)@F{D3=Z`BXp;zfzo2)EgfGAg4h5!8>Am% zu2ket<3F0*Av=ENjAl3Ou+r1xDaWG&hsp8DdT7|v)IH#{u1EV2N|8pD7sXkjGeidt zySkcS`^WM3K$t_^?5)xi+;vjIxlRs;BSCu6sHB@2tX=A7$rA^cdEpkmHf6zJk4~)e zgHc*#KY0XK8x@X;LwP^aP3q*QkglEY4L@Fbf#uZ;DRf)gj^>l)PR|{E5y_uJkHV6# zpD_%8;y(eqC52pun|Pfdqc`tb)F0?Y2v1JV>orzQT~hc$5C0arRQk1OUy)>>mD}$m z{J$~lMLffv9&h?U(&tM5Ui}i-MAEPNwYg+xpjLosn*cWo_*LGBdkpQ~+bF?0?|*#Qze8^{QYM4yVie^}$`^?&{;(T3#QG_5EfQ zuS~Cr-{3dtKCA4N46bi9tL#ggus5_FV-yR0hX4R4{g+19MHQS3U4*k&R6{x_V-@gw h9aI&uNM)P?QdtRWuYg402ugeLcm)SVWi=(${{i5p(5?Ug From f15fcede6e9700784a92340aa7cc05a07835b938 Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Sun, 12 Oct 2025 01:22:53 +0200 Subject: [PATCH 09/10] Update: Dockerfile to version 0.3.2 --- Dockerfile | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Dockerfile b/Dockerfile index e0a7464..e9e4125 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM examonhpc/examon:0.3.1 +FROM examonhpc/examon:0.3.2 ENV EXAMON_HOME /etc/examon_deploy/examon @@ -10,7 +10,7 @@ ADD ./web $EXAMON_HOME/web # Venvs WORKDIR $EXAMON_HOME/scripts -RUN virtualenv py3_env +RUN virtualenv -p $(which python) py3_env ENV PIP $EXAMON_HOME/scripts/ve/bin/pip ENV S_PIP $EXAMON_HOME/scripts/py3_env/bin/pip @@ -25,7 +25,7 @@ RUN $PIP install -r requirements.txt # Web WORKDIR $EXAMON_HOME/web -RUN virtualenv flask +RUN virtualenv -p $(which python) flask RUN CASS_DRIVER_BUILD_CONCURRENCY=8 flask/bin/pip install -r ./examon-server/requirements.txt WORKDIR $EXAMON_HOME/scripts From b98080dca1c296034201133bebe6cb02cdc7a97b Mon Sep 17 00:00:00 2001 From: Francesco Beneventi Date: Sun, 12 Oct 2025 01:53:57 +0200 Subject: [PATCH 10/10] Update: docs --- docs/credits.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/credits.md b/docs/credits.md index d8bf5f6..07758b2 100644 --- a/docs/credits.md +++ b/docs/credits.md @@ -5,5 +5,5 @@ This work is supported by the EU FETHPC projects: - [MULTITHERMAN (g.a. 291125)](https://cordis.europa.eu/project/id/291125) - [ANTAREX (g.a. 671623)](https://antarex.fe.up.pt/) - [IOTWINS (g.a. 857191)](https://www.iotwins.eu/) -- [REGALE (g.a. 956560)](https://regale-project.eu/) +- [REGALE (g.a. 956560)](https://regale-project.eu/) - [The Italian Ministry of Enterprises and Made in Italy ("MIMIT")](https://www.mimit.gov.it/en/) - [GRAPH MASSIVIZER (g.a. 101093202)](https://graph-massivizer.eu/)