-
-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Fix #4333] Implement asynchronous search reindex functionality using celery #4368
[Fix #4333] Implement asynchronous search reindex functionality using celery #4368
Conversation
I have tested with about 10K files in single celery host with 4 workers. The indexing got finished within 20 seconds. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like the approach of using the chain/chord to handle indexing and index creation. Does this work locally with CELERY_ALWAYS_EAGER
, I believe Rob mentioning that in his blog post, or will we just use the existing DED indexing locally?
I'm a little concerned about the complexity of this approach. Is there a reason we're using chunks here when we already have a domain object to chunk on which is the Version
? This feels like extra work to do and code to maintain, when we already have an existing way to think about this problem.
This approach also doesn't use the same code path for indexing as our application code, so now we have two different ways of indexing files, which doesn't seem great.
It really feels like all we need is:
- A management commands that iterates over versions, and sends them to be indexed
- A celery task that takes a version and indexes all the files in that version, which is called both in production as well as in reindexing.
I'm happy to talk about this more. There are likely some design decisions that you made that I don't understand :)
import_projects.py
Outdated
print("unsuccessful", name) | ||
|
||
|
||
def fetch(url=None, all_projects=[]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all_projects should be defined outside the function, it's confusing as defined here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops! This file is pushed mistakenly. I have written it for local purpose of fetching. removing it.
@staticmethod | ||
def _get_models(args): | ||
for model_name in args: | ||
yield apps.get_model(model_name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be broken out into it's own function. It just increases complexity for little value.
"The format is <app_label>.<model_name>") | ||
) | ||
|
||
def handle(self, *args, **options): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a docstring showing how to run it.
), | ||
} | ||
|
||
def _get_actions(self, object_list, action, index_name=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we overriding this? Needs a docstring.
|
||
for document in documents: | ||
if str(document) == document_class: | ||
return document |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the logic here for? Will there be multiple documents for a model ever? Also needs a docstring :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. there can be multiple documents for a model
class FooDocument(DocType):
...
class Meta:
model = Bar
class NewFooDocument(DocType):
...
class Meta:
model = Bar
def _run_reindex_tasks(self, models): | ||
for doc in registry.get_documents(models): | ||
qs = doc().get_queryset() | ||
instance_ids = list(qs.values_list('id', flat=True)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not going to use a ton of memory? This is creating the entire list of objects in memory all at once, instead of streaming them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Its not actually creating list of objects, Its just a list of integers. I think integers take much lower amount of memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty worried this is going to be both slow & perhaps incredibly memory intensive in production when we have 2 million objects in memory here. I'd like to find another approach that streams the data if possible, but we can try this for now and see what happens.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericholscher I think we can try this now to check if anything wrong happens.
I have tested with 2 million integers, I takes about 16 MB
of RAM. so I think its not big issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ericholscher I have optimized the memory usage in 143ce7f
readthedocs/search/tasks.py
Outdated
|
||
|
||
@app.task(queue='web') | ||
def switch_es_index_task(app_label, model_name, index_name, new_index_name): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't put task
at the end of our tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So what suffix
do we put to understand that its a task? I thought its clearer to understand from code that its a task so do not call it directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use any suffix
in our code. We should be keeping the same coding standards here as elsewhere. If we want to change this, we need to change it everywhere, otherwise it's even more confusing having half of our tasks end in task
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. Fixed it in latest commit
Another thought -- should this be implemented as a contribution to DED with a Celery backend, instead of our own custom logic? It sounds like we might be able to do it fix Celery with a setting: https://github.com/sabricot/django-elasticsearch-dsl#elasticsearch_dsl_signal_processor We could also perhaps add a |
Sure, I will surely port it to
Thats really true. But it needs some time to get it reviewed and merged into master. I will open a PR there soon, but to get our deployment soon, we can keep it here untill then. |
We can always deploy a forked version while we wait for it to get accepted. It's true though we don't want to be waiting for review & merge from them, but perhaps it will be quick. |
This is a general purpose management command for indexing all type of documents like
Yeah, thats true. This management command actually reindex all the documents. On the otherside,
As mentioned above, the management command is general purpose. So something specially for |
Yes, it works with |
512138a
to
39ada00
Compare
Right, but that doesn’t scale properly right now. It currently sends an http request per file saved during project build. We need to batch it, which could use similar logic to this. |
I understand. but how do we get the files in batch? Do there any signal that is sent in batch? |
We should just update the code to keep track of the files that are changed, as it works now. We can use the existing |
Good catch @ericholscher. I will fix it in synchronous task. Thanks |
@ericholscher I have fixed the issues as you mentioned and fixed the tests. Also added a comment in #4264 (comment) for testing it in backlog. r? |
@ericholscher I have also fixed #4409 with 612cfb8 . So there will not be any auto indexing to elasticsearch in local development. |
queryset = doc().get_queryset() | ||
# Get latest object from the queryset | ||
latest_object = queryset.latest('modified_date') | ||
latest_object_time = latest_object.modified_date |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should just use the current time, not query the entire queryset for the time. This will be quite a slow query likely in prod.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah! I was also thinking about it.
@ericholscher Fixed the |
This looks good. 👍 |
The management command
reindex_elasticsearch
has been rewritten from scratch using celery tasks.The idea is taken from @robhudson blog post and heavily inspired from the code of mozilla/zamboni.
Need to overwrite some method of
django-elasticsearch-dsl
in order to support zero downtime rebuild (django-es/django-elasticsearch-dsl#75). I am hoping to send a PR to upstream.This fixes #4333
@ericholscher @rtfd/core r?