Skip to content

Commit

Permalink
monitoring: ES indices
Browse files Browse the repository at this point in the history
* Adds ES indices monitoring.

Co-Authored-by: Peter Weber <[email protected]>
  • Loading branch information
rerowep committed Feb 27, 2023
1 parent c4453b1 commit a9230c3
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 55 deletions.
25 changes: 11 additions & 14 deletions rero_ils/modules/monitoring/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class Monitoring(object):
The main idea here is to check the consistency between the database and
the search index. We need to check that all documents presents in the
database are also present in the search index and vice versa.
Addidionaly timestamps could be accessed for monitoring of execution
Furthermore, timestamps could be accessed for monitoring of execution
times of selected functions.
"""

Expand All @@ -73,7 +73,7 @@ class Monitoring(object):
def __init__(self, time_delta=1):
"""Constructor.
:param time_delta: Minutes tu substract from DB EScreation time.
:param time_delta: Minutes to subtract from DB ES creation time.
"""
self.time_delta = int(time_delta)

Expand All @@ -95,8 +95,7 @@ def __str__(self):
db_es = info.get('db-es', '')
count_db = info.get('db', '')
msg = f'{db_es:>7} {doc_type:>6} {count_db:>10}'
index = info.get('index', '')
if index:
if index := info.get('index', ''):
msg += f' {index:>25} {info.get("es", ""):>10}'
result += msg + '\n'
return msg_head + result
Expand Down Expand Up @@ -147,7 +146,7 @@ def get_all_pids(cls, doc_type, with_deleted=False, limit=100000,
:param with_deleted: get also deleted pids.
:param limit: Limit sql query to count size.
:param date: Get all pids <= date.
:returns: pid gernerator.
:returns: pid generator.
"""
query = PersistentIdentifier.query.filter_by(pid_type=doc_type)
if not with_deleted:
Expand Down Expand Up @@ -196,17 +195,16 @@ def get_es_db_missing_pids(self, doc_type, with_deleted=False):
pids_es.pop(pid)
else:
pids_db.append(pid)
pids_es = [v for v in pids_es]
return pids_es, pids_db, pids_es_double, index
return list(pids_es), pids_db, pids_es_double, index

def info(self, with_deleted=False, difference_db_es=False):
"""Info.
Get count details for all records rest endpoints in json format.
:param with_deleted: count also deleted items in database.
:return: dictionary with database, elasticsearch and databse minus
elasticsearch count informations.
:return: dictionary with database, elasticsearch and database minus
elasticsearch count information.
"""
info = {}
for doc_type, endpoint in current_app.config.get(
Expand All @@ -219,8 +217,7 @@ def info(self, with_deleted=False, difference_db_es=False):
doc_type, with_deleted=with_deleted, date=date)
count_db = count_db if isinstance(count_db, int) else 0
info[doc_type]['db'] = count_db
index = endpoint.get('search_index', '')
if index:
if index := endpoint.get('search_index', ''):
count_es = self.get_es_count(index, date=date)
count_es = count_es if isinstance(count_es, int) else 0
db_es = count_db - count_es
Expand All @@ -245,11 +242,11 @@ def info(self, with_deleted=False, difference_db_es=False):
return info

def check(self, with_deleted=False, difference_db_es=False):
"""Compaire elasticsearch with database counts.
"""Compare elasticsearch with database counts.
:param with_deleted: count also deleted items in database.
:return: dictionary with all document types with a difference in
databse and elasticsearch counts.
database and elasticsearch counts.
"""
checks = {}
for info, data in self.info(
Expand Down Expand Up @@ -313,6 +310,6 @@ def print_missing(self, doc_type):
)
if 'DB' in missing and missing["DB"]:
click.secho(
f'DB mising {doc_type}: {", ".join(missing["DB"])}',
f'DB missing {doc_type}: {", ".join(missing["DB"])}',
fg='red'
)
22 changes: 14 additions & 8 deletions rero_ils/modules/monitoring/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ def monitoring():
@click.option('-m', '--missing', 'missing', is_flag=True, default=False,
help='Display missing pids.')
@click.option('-d', '--delay', 'delay', default=1,
help='Get ES and DB counts from delay miniutes in the past.')
help='Get ES and DB counts from delay min minutes in the past.')
@with_appcontext
def es_db_counts_cli(missing, delay):
"""Print ES and DB counts.
Prints a table representation of database and elasticsearch counts.
Columes:
Columns:
1. database count minus elasticsearch count
2. document type
3. database count
Expand Down Expand Up @@ -75,7 +75,7 @@ def es_db_counts_cli(missing, delay):
@monitoring.command('es_db_missing')
@click.argument('doc_type')
@click.option('-d', '--delay', 'delay', default=1,
help='Get ES and DB counts from delay miniutes in the past.')
help='Get ES and DB counts from delay minutes in the past.')
@with_appcontext
def es_db_missing_cli(doc_type, delay):
"""Print missing pids informations."""
Expand All @@ -86,23 +86,29 @@ def es_db_missing_cli(doc_type, delay):
@monitoring.command('time_stamps')
@with_appcontext
def time_stamps_cli():
"""Print time_stampss informations."""
cache = current_cache.get('timestamps')
if cache:
"""Print time_stamps information."""
if cache := current_cache.get('timestamps'):
for key, value in cache.items():
time = value.pop('time')
args = [f'{k}={v}' for k, v in value.items()]
click.echo(f'{time}: {key} {" | ".join(args)}')


@monitoring.command()
@monitoring.command('es')
@with_appcontext
def es():
"""Displays elastic search cluster info."""
"""Displays Elasticsearch cluster info."""
for key, value in current_search_client.cluster.health().items():
click.echo(f'{key:<33}: {value}')


@monitoring.command('es_indices')
@with_appcontext
def es_indices():
"""Displays Elasticsearch indices info."""
click.echo(current_search_client.cat.indices(s='index'))


@monitoring.command()
@with_appcontext
def redis():
Expand Down
67 changes: 39 additions & 28 deletions rero_ils/modules/monitoring/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def db_connections():
def es_db_counts():
"""Display count for elasticsearch and documents.
Displays for all document types defind in config.py following informations:
Displays for all document types defined in config.py following information:
- index name for document type
- count of records in database
- count of records in elasticsearch
Expand All @@ -124,7 +124,7 @@ def check_es_db_counts():
If there are no problems the status in returned data will be `green`,
otherwise the status will be `red` and in the returned error
links will be provided with more detailed informations.
links will be provided with more detailed information.
:return: jsonified health status for elasticsearch and database counts
"""
result = {'data': {'status': 'green'}}
Expand Down Expand Up @@ -152,9 +152,9 @@ def check_es_db_counts():
_external=True
)
errors.append({
'id': 'DB_ES_COUNTER_MISSMATCH',
'id': 'DB_ES_COUNTER_MISMATCH',
'links': links,
'code': 'DB_ES_COUNTER_MISSMATCH',
'code': 'DB_ES_COUNTER_MISMATCH',
'title': "DB items counts don't match ES items count.",
'details': msg
})
Expand Down Expand Up @@ -197,11 +197,11 @@ def check_es_db_counts():
def missing_pids(doc_type):
"""Displays details of counts for document type.
Following informations will be displayed:
Following information will be displayed:
- missing pids in database
- missing pids in elasticsearch
- pids indexed multiple times in elasticsearch
If possible, direct links will be provieded to the corresponding records.
If possible, direct links will be provided to the corresponding records.
This view needs an logged in system admin.
:param doc_type: Document type to display.
Expand All @@ -226,24 +226,23 @@ def missing_pids(doc_type):
'details': res.get('ERROR')
}
}
else:
data = {'DB': [], 'ES': [], 'ES duplicate': []}
for pid in res.get('DB'):
if api_url:
data['DB'].append(f'{api_url}?q=pid:"{pid}"')
else:
data['DB'].append(pid)
for pid in res.get('ES'):
if api_url:
data['ES'].append(f'{api_url}{pid}')
else:
data['ES'].append(pid)
for pid in res.get('ES duplicate'):
if api_url:
data['ES duplicate'].append(f'{api_url}?q=pid:"{pid}"')
else:
data['ES duplicate'].append(pid)
return jsonify({'data': data})
data = {'DB': [], 'ES': [], 'ES duplicate': []}
for pid in res.get('DB'):
if api_url:
data['DB'].append(f'{api_url}?q=pid:"{pid}"')
else:
data['DB'].append(pid)
for pid in res.get('ES'):
if api_url:
data['ES'].append(f'{api_url}{pid}')
else:
data['ES'].append(pid)
for pid in res.get('ES duplicate'):
if api_url:
data['ES duplicate'].append(f'{api_url}?q=pid:"{pid}"')
else:
data['ES duplicate'].append(pid)
return jsonify({'data': data})


@api_blueprint.route('/redis')
Expand All @@ -263,14 +262,27 @@ def redis():
@api_blueprint.route('/es')
@check_authentication
def elastic_search():
"""Displays elastic search cluster info.
"""Displays Elasticsearch cluster info.
:return: jsonified elastic search cluster info.
:return: jsonified Elasticsearch cluster info.
"""
info = current_search_client.cluster.health()
return jsonify({'data': info})


@api_blueprint.route('/es_indices')
@check_authentication
def elastic_search_indices():
"""Displays Elasticsearch indices info.
:return: jsonified Elasticsearch indices info.
"""
info = current_search_client.cat.indices(
bytes='b', format='json', s='index')
info = {data['index']: data for data in info}
return jsonify({'data': info})


@api_blueprint.route('/timestamps')
@check_authentication
def timestamps():
Expand All @@ -281,8 +293,7 @@ def timestamps():
:return: jsonified timestamps.
"""
data = {}
time_stamps = current_cache.get('timestamps')
if time_stamps:
if time_stamps := current_cache.get('timestamps'):
for name, values in time_stamps.items():
# make the name safe for JSON export
name = name.replace('-', '_')
Expand Down
2 changes: 1 addition & 1 deletion rero_ils/modules/notifications/subclasses/reminder.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def _exists_similar_notification(self):

def get_communication_channel(self):
"""Get the communication channel to dispatch the notification."""
# For REMINDERS notification, the communication channel tu use is
# For REMINDERS notification, the communication channel to use is
# define into the corresponding circulation policy
if self._cipo_reminder:
channel = self._cipo_reminder.get('communication_channel')
Expand Down
2 changes: 1 addition & 1 deletion rero_ils/modules/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def process_bulk_queue(version_type=None, queue=None, es_bulk_kwargs=None,
"""Process bulk indexing queue.
:param str version_type: Elasticsearch version type.
:param Queue queue: Queue tu use.
:param Queue queue: Queue to use.
:param str routing_key: Routing key to use.
:param dict es_bulk_kwargs: Passed to
:func:`elasticsearch:elasticsearch.helpers.bulk`.
Expand Down
2 changes: 1 addition & 1 deletion tests/api/items/test_items_rest_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

def test_item_dumps(client, item_lib_martigny, org_martigny,
librarian_martigny):
"""Test item dumps and elastic search version."""
"""Test item dumps and Elasticsearch version."""
item_dumps = Item(item_lib_martigny.dumps()).replace_refs()

assert item_dumps.get('organisation').get('pid') == org_martigny.pid
Expand Down
4 changes: 2 additions & 2 deletions tests/api/test_monitoring_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,9 @@ def test_monitoring_check_es_db_counts(app, client, contribution_person_data,
assert get_json(res) == {
'data': {'status': 'red'},
'errors': [{
'code': 'DB_ES_COUNTER_MISSMATCH',
'code': 'DB_ES_COUNTER_MISMATCH',
'details': 'There are 1 items from cont missing in ES.',
'id': 'DB_ES_COUNTER_MISSMATCH',
'id': 'DB_ES_COUNTER_MISMATCH',
'links': {
'about': 'http://localhost/monitoring/check_es_db_counts',
'cont': 'http://localhost/monitoring/missing_pids/cont'
Expand Down

0 comments on commit a9230c3

Please sign in to comment.