Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fix #4333] Implement asynchronous search reindex functionality using celery #4368

Merged
merged 15 commits into from
Jul 31, 2018
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
import pytest
from django.conf import settings
from rest_framework.test import APIClient

try:
Expand Down Expand Up @@ -46,7 +47,6 @@ def pytest_configure(config):
def settings_modification(settings):
settings.CELERY_ALWAYS_EAGER = True


@pytest.fixture
def api_client():
return APIClient()
58 changes: 0 additions & 58 deletions readthedocs/core/management/commands/reindex_elasticsearch.py

This file was deleted.

22 changes: 22 additions & 0 deletions readthedocs/projects/migrations/0028_importedfile_modified_date.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9.13 on 2018-07-27 09:54
from __future__ import unicode_literals

from django.db import migrations, models
import django.utils.timezone


class Migration(migrations.Migration):

dependencies = [
('projects', '0027_add_htmlfile_model'),
]

operations = [
migrations.AddField(
model_name='importedfile',
name='modified_date',
field=models.DateTimeField(auto_now=True, default=django.utils.timezone.now, verbose_name='Modified date'),
preserve_default=False,
),
]
2 changes: 2 additions & 0 deletions readthedocs/projects/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django.contrib.auth.models import User
from django.core.urlresolvers import NoReverseMatch, reverse
from django.db import models
from django.utils import timezone
from django.utils.encoding import python_2_unicode_compatible
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
Expand Down Expand Up @@ -911,6 +912,7 @@ class ImportedFile(models.Model):
path = models.CharField(_('Path'), max_length=255)
md5 = models.CharField(_('MD5 checksum'), max_length=255)
commit = models.CharField(_('Commit'), max_length=255)
modified_date = models.DateTimeField(_('Modified date'), auto_now=True)

def get_absolute_url(self):
return resolve(project=self.project, version_slug=self.version.slug, filename=self.path)
Expand Down
4 changes: 4 additions & 0 deletions readthedocs/projects/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@
project_import = django.dispatch.Signal(providing_args=["project"])

files_changed = django.dispatch.Signal(providing_args=["project", "files"])

bulk_post_create = django.dispatch.Signal(providing_args=["instance_list"])

bulk_post_delete = django.dispatch.Signal(providing_args=["instance_list"])
30 changes: 22 additions & 8 deletions readthedocs/projects/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
from .constants import LOG_TEMPLATE
from .exceptions import RepositoryError
from .models import ImportedFile, Project, Domain, Feature, HTMLFile
from .signals import before_vcs, after_vcs, before_build, after_build, files_changed
from .signals import before_vcs, after_vcs, before_build, after_build, files_changed, \
bulk_post_create, bulk_post_delete
from readthedocs.builds.constants import (
BUILD_STATE_BUILDING, BUILD_STATE_CLONING, BUILD_STATE_FINISHED,
BUILD_STATE_INSTALLING, LATEST, LATEST_VERBOSE_NAME, STABLE_VERBOSE_NAME)
Expand Down Expand Up @@ -986,6 +987,7 @@ def _manage_imported_files(version, path, commit):
:param commit: Commit that updated path
"""
changed_files = set()
created_html_files = []
for root, __, filenames in os.walk(path):
for filename in filenames:
if fnmatch.fnmatch(filename, '*.html'):
Expand Down Expand Up @@ -1015,15 +1017,27 @@ def _manage_imported_files(version, path, commit):
obj.commit = commit
obj.save()

# Delete the HTMLFile first from previous versions
HTMLFile.objects.filter(project=version.project,
version=version
).exclude(commit=commit).delete()
if model_class == HTMLFile:
# the `obj` is HTMLFile, so add it to the list
created_html_files.append(obj)

# Send bulk_post_create signal for bulk indexing to Elasticsearch
bulk_post_create.send(sender=HTMLFile, instance_list=created_html_files)

# Delete the HTMLFile first from previous commit and
# send bulk_post_delete signal for bulk removing from Elasticsearch
delete_queryset = (HTMLFile.objects.filter(project=version.project, version=version)
.exclude(commit=commit))
# Keep the objects into memory to send it to signal
instance_list = list(delete_queryset)
# Safely delete from database
delete_queryset.delete()
# Always pass the list of instance, not queryset.
bulk_post_delete.send(sender=HTMLFile, instance_list=instance_list)

# Delete ImportedFiles from previous versions
ImportedFile.objects.filter(project=version.project,
version=version
).exclude(commit=commit).delete()
(ImportedFile.objects.filter(project=version.project, version=version)
.exclude(commit=commit).delete())
changed_files = [
resolve_path(
version.project, filename=file, version_slug=version.slug,
Expand Down
1 change: 1 addition & 0 deletions readthedocs/search/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
default_app_config = 'readthedocs.search.apps.SearchConfig'
10 changes: 10 additions & 0 deletions readthedocs/search/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"""Project app config"""

from django.apps import AppConfig


class SearchConfig(AppConfig):
name = 'readthedocs.search'

def ready(self):
from .signals import index_html_file, remove_html_file
27 changes: 5 additions & 22 deletions readthedocs/search/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from elasticsearch_dsl.query import SimpleQueryString, Bool

from readthedocs.projects.models import Project, HTMLFile
from .conf import SEARCH_EXCLUDED_FILE

from readthedocs.search.faceted_search import ProjectSearch, FileSearch
from .conf import SEARCH_EXCLUDED_FILE
from .mixins import RTDDocTypeMixin

project_conf = settings.ES_INDEXES['project']
project_index = Index(project_conf['name'])
Expand All @@ -17,7 +17,7 @@


@project_index.doc_type
class ProjectDocument(DocType):
class ProjectDocument(RTDDocTypeMixin, DocType):

class Meta(object):
model = Project
Expand Down Expand Up @@ -47,11 +47,12 @@ def faceted_search(cls, query, language=None, using=None, index=None):


@page_index.doc_type
class PageDocument(DocType):
class PageDocument(RTDDocTypeMixin, DocType):

class Meta(object):
model = HTMLFile
fields = ('commit',)
ignore_signals = settings.ES_PAGE_IGNORE_SIGNALS

project = fields.KeywordField(attr='project.slug')
version = fields.KeywordField(attr='version.slug')
Expand Down Expand Up @@ -121,21 +122,3 @@ def get_queryset(self):
queryset = (queryset.filter(project__documentation_type='sphinx')
.exclude(name__in=SEARCH_EXCLUDED_FILE))
return queryset

def update(self, thing, refresh=None, action='index', **kwargs):
"""Overwrite in order to index only certain files"""
# Object not exist in the provided queryset should not be indexed
# TODO: remove this overwrite when the issue has been fixed
# See below link for more information
# https://github.com/sabricot/django-elasticsearch-dsl/issues/111
# Moreover, do not need to check if its a delete action
# Because while delete action, the object is already remove from database
if isinstance(thing, HTMLFile) and action != 'delete':
# Its a model instance.
queryset = self.get_queryset()
obj = queryset.filter(pk=thing.pk)
if not obj.exists():
return None

return super(PageDocument, self).update(thing=thing, refresh=refresh,
action=action, **kwargs)
Empty file.
Empty file.
101 changes: 101 additions & 0 deletions readthedocs/search/management/commands/reindex_elasticsearch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import datetime
import logging

from celery import chord, chain
from django.apps import apps
from django.conf import settings
from django.core.management import BaseCommand
from django.utils import timezone
from django_elasticsearch_dsl.registries import registry

from ...tasks import (index_objects_to_es, switch_es_index, create_new_es_index,
index_missing_objects)
from ...utils import chunk_queryset

log = logging.getLogger(__name__)


class Command(BaseCommand):

@staticmethod
def _get_indexing_tasks(app_label, model_name, queryset, document_class, index_name):
queryset = queryset.values_list('id', flat=True)
chunked_queryset = chunk_queryset(queryset, settings.ES_TASK_CHUNK_SIZE)

for chunk in chunked_queryset:
data = {
'app_label': app_label,
'model_name': model_name,
'document_class': document_class,
'index_name': index_name,
'objects_id': list(chunk)
}
yield index_objects_to_es.si(**data)

def _run_reindex_tasks(self, models):
for doc in registry.get_documents(models):
queryset = doc().get_queryset()
# Get latest object from the queryset
index_time = timezone.now()

app_label = queryset.model._meta.app_label
model_name = queryset.model.__name__

index_name = doc._doc_type.index
timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
new_index_name = "{}_{}".format(index_name, timestamp)

pre_index_task = create_new_es_index.si(app_label=app_label,
model_name=model_name,
index_name=index_name,
new_index_name=new_index_name)

indexing_tasks = self._get_indexing_tasks(app_label=app_label, model_name=model_name,
queryset=queryset,
document_class=str(doc),
index_name=new_index_name)

post_index_task = switch_es_index.si(app_label=app_label, model_name=model_name,
index_name=index_name,
new_index_name=new_index_name)

# Task to run in order to add the objects
# that has been inserted into database while indexing_tasks was running
# We pass the creation time of latest object, so its possible to index later items
missed_index_task = index_missing_objects.si(app_label=app_label,
model_name=model_name,
document_class=str(doc),
index_generation_time=index_time)

# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chords
chord_tasks = chord(header=indexing_tasks, body=post_index_task)
# http://celery.readthedocs.io/en/latest/userguide/canvas.html#chain
chain(pre_index_task, chord_tasks, missed_index_task).apply_async()

message = ("Successfully issued tasks for {}.{}, total {} items"
.format(app_label, model_name, queryset.count()))
log.info(message)

def add_arguments(self, parser):
parser.add_argument(
'--models',
dest='models',
type=str,
nargs='*',
help=("Specify the model to be updated in elasticsearch."
"The format is <app_label>.<model_name>")
)

def handle(self, *args, **options):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs a docstring showing how to run it.

"""
Index models into Elasticsearch index asynchronously using celery.

You can specify model to get indexed by passing
`--model <app_label>.<model_name>` parameter.
Otherwise, it will reindex all the models
"""
models = None
if options['models']:
models = [apps.get_model(model_name) for model_name in options['models']]

self._run_reindex_tasks(models=models)
Loading