Skip to content

Commit

Permalink
resources: speed up the indexing
Browse files Browse the repository at this point in the history
* Replaces flush indexing by an es parameter `refresh='true'`.
* Changes the status in the document index instead of reindex the whole
  document for the circulation operations.
* Updates the maked value of an holding only if the `_masked` field has
  been touched.
* Adds various optimizations.

Signed-off-by: Johnny Mariéthoz <[email protected]>
  • Loading branch information
jma committed Jun 9, 2021
1 parent e38417b commit 5573032
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 83 deletions.
17 changes: 2 additions & 15 deletions rero_ils/modules/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,13 @@
from elasticsearch.helpers import expand_action as default_expand_action
from flask import current_app
from invenio_db import db
from invenio_indexer import current_record_to_index
from invenio_indexer.api import RecordIndexer
from invenio_indexer.signals import before_record_index
from invenio_indexer.utils import _es7_expand_action
from invenio_pidstore.errors import PIDDoesNotExistError
from invenio_pidstore.models import PersistentIdentifier, PIDStatus
from invenio_records.api import Record
from invenio_records_rest.utils import obj_or_import_string
from invenio_search import current_search
from invenio_search.api import RecordsSearch
from jsonschema.exceptions import ValidationError
from kombu.compat import Consumer
Expand Down Expand Up @@ -86,11 +84,6 @@ class Meta:

default_filter = None

@classmethod
def flush(cls):
"""Flush index."""
current_search.flush_and_refresh(cls.Meta.index)


class IlsRecord(Record):
"""ILS Record class."""
Expand Down Expand Up @@ -453,21 +446,15 @@ class IlsRecordsIndexer(RecordIndexer):

def index(self, record):
"""Indexing a record."""
return_value = super().index(record)
index_name, doc_type = current_record_to_index(record)
# TODO: Do we need to flush everytime the ES index?
# Tests depends on this at the moment.
current_search.flush_and_refresh(index_name)
return_value = super().index(record, arguments=dict(refresh='true'))
return return_value

def delete(self, record):
"""Delete a record.
:param record: Record instance.
"""
return_value = super().delete(record)
index_name, doc_type = current_record_to_index(record)
current_search.flush_and_refresh(index_name)
return_value = super().delete(record, refresh='true')
return return_value

def bulk_index(self, record_id_iterator, doc_type=None):
Expand Down
6 changes: 0 additions & 6 deletions rero_ils/modules/holdings/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from dateutil.relativedelta import relativedelta
from flask import current_app
from flask_babelex import gettext as _
from invenio_search import current_search
from invenio_search.api import RecordsSearch
from jinja2 import Environment

Expand Down Expand Up @@ -77,11 +76,6 @@ class Meta:

default_filter = None

@classmethod
def flush(cls):
"""Flush indexes."""
current_search.flush_and_refresh(cls.Meta.index)


class Holding(IlsRecord):
"""Holding class."""
Expand Down
116 changes: 63 additions & 53 deletions rero_ils/modules/items/api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from functools import partial

from elasticsearch.exceptions import NotFoundError
from invenio_search import current_search
from invenio_search import current_search_client

from .circulation import ItemCirculation
from .issue import ItemIssue
Expand Down Expand Up @@ -60,14 +60,6 @@ class Meta:

default_filter = None

@classmethod
def flush(cls):
"""Flush indexes."""
from rero_ils.modules.holdings.api import HoldingsSearch
current_search.flush_and_refresh(DocumentsSearch.Meta.index)
current_search.flush_and_refresh(HoldingsSearch.Meta.index)
current_search.flush_and_refresh(cls.Meta.index)


class Item(ItemCirculation, ItemIssue):
"""Item class."""
Expand Down Expand Up @@ -203,55 +195,74 @@ class ItemsIndexer(IlsRecordsIndexer):

record_cls = Item

def index(self, record):
"""Index an item."""
from ...documents.api import DocumentsIndexer
from ...holdings.api import Holding, HoldingsSearch
@classmethod
def _es_item(cls, record):
"""Get the item from the corresponding index.
:param record: an item object
:returns: the elasticsearch document or {}
"""
try:
es_item = current_search_client.get(
ItemsSearch.Meta.index, record.id)
return es_item['_source']
except NotFoundError:
return {}

@classmethod
def _update_status_in_doc(cls, record, es_item):
"""Update the status of a given item in the document index.
# get the old holding record if exists
items_search = ItemsSearch(). \
filter('term', pid=record.get('pid')). \
source('holding').execute().hits
:param record: an item object
:param es_item: a dict of the elasticsearch item
"""
# retrieve the document in the corresponding es index
document_pid = extracted_data_from_ref(record.get('document'))
doc = next(
DocumentsSearch()
.extra(version=True)
.filter('term', pid=document_pid)
.scan()
)
# update the item status in the document
data = doc.to_dict()
for hold in data.get('holdings', []):
for item in hold.get('items', []):
if item['pid'] == record.pid:
item['status'] = record['status']
break
else:
continue
break
# reindex the document with the same version
current_search_client.index(
index=DocumentsSearch.Meta.index,
id=doc.meta.id,
body=data,
version=doc.meta.version,
version_type='external_gte')

old_holdings_pid = None
if items_search.total.value:
old_holdings_pid = items_search[0].holding.pid
def index(self, record):
"""Index an item.
:param record: an item object
"returns: the elastiscsearch client result
"""
# get previous indexed version
es_item = self._es_item(record)

# call the parent
return_value = super().index(record)

# reindex document in background
document_pid = extracted_data_from_ref(record.get('document'))
uid = Document.get_id_by_pid(document_pid)
DocumentsIndexer().index_by_id(uid)
current_search.flush_and_refresh(HoldingsSearch.Meta.index)
current_search.flush_and_refresh(DocumentsSearch.Meta.index)
# set holding masking for standard holdings
new_holdings_pid = extracted_data_from_ref(record['holding']['$ref'])
holding = Holding.get_record_by_pid(new_holdings_pid)
if holding.get('holdings_type') == 'standard':
number_of_unmasked_items = \
Item.get_number_masked_items_by_holdings_pid(new_holdings_pid)
update_holdings = False
# masking holding if all items are masked
if not number_of_unmasked_items and not holding.get('_masked'):
holding['_masked'] = True
holding.update(
data=holding, dbcommit=True, reindex=True)
# unmask holding if at least one of its items is unmasked
elif number_of_unmasked_items and holding.get('_masked'):
holding['_masked'] = False
holding.update(
data=holding, dbcommit=True, reindex=True)
# check if old holding can be deleted
if old_holdings_pid and new_holdings_pid != old_holdings_pid:
old_holding_rec = Holding.get_record_by_pid(old_holdings_pid)
try:
# TODO: Need to split DB and elasticsearch deletion.
old_holding_rec.delete(
force=False, dbcommit=True, delindex=True)
except IlsRecordError.NotDeleted:
pass
# fast document reindex for circulation operations
if es_item and record.get('status') != es_item.get('status'):
self._update_status_in_doc(record, es_item)
return return_value

# reindex document for non circulation operations
document_pid = extracted_data_from_ref(record.get('document'))
doc = Document.get_record_by_pid(document_pid)
doc.reindex()
return return_value

def delete(self, record):
Expand All @@ -266,7 +277,6 @@ def delete(self, record):
document_pid = rec_with_refs['document']['pid']
document = Document.get_record_by_pid(document_pid)
document.reindex()
current_search.flush_and_refresh(DocumentsSearch.Meta.index)

holding = rec_with_refs.get('holding', '')
if holding:
Expand Down
4 changes: 0 additions & 4 deletions rero_ils/modules/items/api/circulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,9 @@ def change_status_commit_and_reindex(self):
Commits and reindex the item.
This method is executed after every successfull circulation action.
"""
from . import ItemsSearch
current_search.flush_and_refresh(
current_circulation.loan_search_cls.Meta.index)
self.status_update(self, dbcommit=True, reindex=True, forceindex=True)
ItemsSearch.flush()

def prior_validate_actions(self, **kwargs):
"""Check if the validate action can be executed or not."""
Expand Down Expand Up @@ -1038,7 +1036,6 @@ def action_filter(self, action, organisation_pid, library_pid, loan,
if action == 'checkout':
if not circ_policy.can_checkout:
data['action_validated'] = False

if action == 'receive':
if (
circ_policy.can_checkout and
Expand All @@ -1048,7 +1045,6 @@ def action_filter(self, action, organisation_pid, library_pid, loan,
):
data['action_validated'] = False
data['new_action'] = 'checkout'

return data

@property
Expand Down
8 changes: 4 additions & 4 deletions rero_ils/modules/items/api/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from ...api import IlsRecord
from ...holdings.models import HoldingTypes
from ...item_types.api import ItemType
from ...libraries.api import Library
from ...locations.api import Location
from ...organisations.api import Organisation
from ...utils import date_string_to_utc, extracted_data_from_ref, \
Expand Down Expand Up @@ -357,7 +356,7 @@ def get_item_by_barcode(cls, barcode, organisation_pid):

def get_organisation(self):
"""Shortcut to the organisation of the item location."""
return self.get_library().get_organisation()
return Organisation.get_record_by_pid(self.organisation_pid)

def get_library(self):
"""Shortcut to the library of the item location."""
Expand Down Expand Up @@ -475,8 +474,9 @@ def holding_library_pid(self):
@property
def organisation_pid(self):
"""Get organisation pid for item."""
library = Library.get_record_by_pid(self.library_pid)
return library.organisation_pid
if self.get('organisation'):
return extracted_data_from_ref(self.get('organisation'))
return self.get_library().organisation_pid

@property
def organisation_view(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/ui/holdings/test_holdings_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,4 @@ def test_holding_delete_after_item_edition(
assert item.holding_pid == holding_lib_fully.pid

holding = Holding.get_record_by_pid(holding_lib_saxon.pid)
assert not holding
assert not holding.get_number_of_items()

0 comments on commit 5573032

Please sign in to comment.