Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Helpers for bulk method such as async_bulk sleep in blocking manner, preventing graceful shutdown #2484

Open
artem-shelkovnikov opened this issue Mar 22, 2024 · 2 comments
Labels
Area: Helpers Category: Enhancement good first issue Issues that are good for new contributors

Comments

@artem-shelkovnikov
Copy link
Member

We tried using async_bulk and async_streaming_bulk helpers to ingest data into Elasticsearch and they work great, but we've found out that they prevent our code from gracefully shutting down when CTRL+C is pressed.

Example code that sleeps: https://github.com/elastic/elasticsearch-py/blob/main/elasticsearch/_async/helpers.py#L241

It would be great to have a way to:

  • Either define how the sleep happens by passing sleep function into the client
  • Make Elasticsearch client internally cancel all sleeps when the client is closed
@artem-shelkovnikov
Copy link
Member Author

Example code that our product calls when using the client: https://github.com/elastic/connectors/blob/main/connectors/es/sink.py (see comment on top of the file on how we collect and ingest data).

In short, we have a SyncOrchestrator class that internally creates two classes:

  • Extractor. This class is responsible to provide a generator that will return documents from 3-rd party system and put it into the MemQueue
  • Sink. This class is responsible to pick up the data from the MemQueue and send it in batches to Elasticsearch. Right now it just sends it with regular bulk request: https://github.com/elastic/connectors/blob/main/connectors/es/sink.py#L149, but ideally we'd love to switch to a helper from the python client.
  • MemQueue itself is there to provide backpressure, limiting the number of items that can in the queue AND total size of items that are in the queue - this way we can to some extent control memory usage of the framework

@pquentin
Copy link
Member

pquentin commented May 17, 2024

Sorry for the delay Artem. I would be happy to implement the first version, allowing the sleep function to be user-defined. Silently cancelling all sleeps/bulks isn't something we'd want in the general case.

@pquentin pquentin added Category: Enhancement good first issue Issues that are good for new contributors Area: Helpers labels Jul 2, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area: Helpers Category: Enhancement good first issue Issues that are good for new contributors
Projects
None yet
Development

No branches or pull requests

2 participants