Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compatibility with Python multiprocessing #72

Open
redbaron4 opened this issue Mar 14, 2022 · 4 comments
Open

Compatibility with Python multiprocessing #72

redbaron4 opened this issue Mar 14, 2022 · 4 comments

Comments

@redbaron4
Copy link

redbaron4 commented Mar 14, 2022

We have a custom Python script that is used to perform some calculations on elements on an index. During course of calculations, it is necessary to fetch list of timestamps for a search criteria from a backing index. This is done using the helpers.scan() paradigm.

Since search can take long time (we are searching among millions of documents), our idea was to create a multiprocessing.Pool and then use a map to perform the search so that parallel searches can be performed (We use 3 workers).

The scheme worked till Elasticsearch-7.17. After upgrade to Elasticsearch-8.1.0, we updated the script dependency (elasticsearch-py to 8.1.0) and noticed that random searches began failing with a unable to deserialize error.

Unable to deserialize as JSON: b'{"_scroll_id":"FGluY2x1ZGVfY29udGV4dF91dWlkDnF1ZXJ5VGhlbkZldGNoAxZjLW9VV25EMVJEQ09WVzRUbDRjSDVBAAAAAAAAHpsWQ3JRSHo1eEFTbC0zYVM1aXFhUnFldxY1dUhYV1FXeVN1LVZEWVM3TUN3ZmRnAAAAAAAAIWgWZ0lhNDIycXRSeW1DMnlDZ1VYMEJSZxZLdktJUVNlSVFoLWNxaGdzbXZnMFlRAAAAAAAAKToWX3QzaVBjRzBUQW1aMWdJZTI3MzVVdw==","took":4245,"timed_out":false,"_shards":{"total":15,"successful":15,"skipped":12,"failed":0},"hits":{"total":{"value":1609,"relation":"eq"},"max_score":null,"hits":[LONG HIT]}}HTTP/1.1 200 OK\r\nX-elastic-product: Elasticsearch\r\ncontent-type: application/vnd.elasticsearch+json;compatible-with=8\r\ncontent-length: 140256\r\n\r\n{"_scroll_id":"FGluY2x1ZGVfY29udGV4dF91dWlkDnF1ZXJ5VGhlbkZldGNoAxZjLW9VV25EMVJEQ09WVzRUbDRjSDVBAAAAAAAAHp0WQ3JRSHo1eEFTbC0zYVM1aXFhUnFldxZSWjBnV0haWFJ5U2Q5Tm5wd3BBbzhBAAAAAAAAFHsWN1Z5clJ3WTdSc2FOc0c5S3VfNG1IQRZLdktJUVNlSVFoLWNxaGdzbXZnMFlRAAAAAAAAKTsWX3QzaVBjRzBUQW1aMWdJZTI3MzVVdw==","took":4434,"timed_out":false,"_shards":{"total":15,"successful":15,"skipped":12,"failed":0},"hits":{"total":{"value":3243,"relation":"eq"},"max_score":null,"hits":[{"_index":"conn-2022.03.12","_id":"MXv3en8BEUCwvNkwmp_B","_score":null,"_source":{"@timestamp":"2022-03-11T21:51:54.599Z"},"sort":[562]},{"_index":"conn-2022.03.12","_id":"K2r8en8Bn-M7Q9BPMNt3","_score":null,"_source":{"@timestamp":"2022-03-11T21:56:54.730Z"},"sort":[732]},{"_index":"conn-2022.03.12","_id":"f3Ipe38Bn-M7Q

Notice that it seems like the response has body of another response tacked to it (which is probably what is causing the error).

There is no error if we set number of workers to 1 which makes me suspect that transport is not playing well with multiple workers spawned using multiprocessing.

We initialize the elasticsearch instance once globally and then each "worker" uses that instance to perform the search. Any ideas hw we can make this scheme play well with transport library?

UPDATE

We modified the script so that each worker creates its own ElasticSearch() instance at spawn time (in addtion to one created by script). The workers only ever use their own instance and now the script is working correctly.

@sethmlarson
Copy link
Contributor

This library is thread-safe but isn't safe to fork/access from multiple different processes so your approach of creating a separate instance per forked process is correct.

What configuration were you using in 7.17 that was working but now isn't in 8.0+?

@redbaron4
Copy link
Author

@sethmlarson The earlier approach was to have a single global ElasticSearch() instance and then fork workers all of which accessed the same global ElasticSearch instance. This worked well till 7.17 but broke after upgrade to 8.1. To fix this, we changed to one instance per worker (in addtion to global instance).

Would it be possible to add information regarding thread/process safety to the manual?

@sethmlarson
Copy link
Contributor

@redbaron4 I understand. Could you copy and paste the code you were using in 7.17 so I can see how the client was configured and try to reproduce the problem?

@redbaron4
Copy link
Author

redbaron4 commented Mar 16, 2022

Sorry I misunderstood your earlier comment.

Here's how I configure the client

ES.Elasticsearch(
            connection_string, sniff_on_connection_fail=True,
            sniff_on_start=True, sniffer_timeout=5 * timeout,
            timeout=timeout, sniff_timeout=timeout // 2,
            max_retries=retries, retry_on_timeout=True)

where timeout, retries and connection_string are user supplied options. The configuration line is same for both versions of elasticsearch.

The usage pattern is that this is part of function init_es which is called once at program startup. The resulting instance is stored in global variable and accessible via another function get_es

_ESINSTANCE = None


def init_es(connection_string, timeout=60, retries=5):
    timeout = util.cast_int(timeout, lower=10, upper=3600, default=60)
    retries = util.cast_int(retries, lower=1, upper=10, default=5)
    global _ESINSTANCE
    if _ESINSTANCE is None:
        _ESINSTANCE = ES.Elasticsearch(
            connection_string, sniff_on_connection_fail=True,
            sniff_on_start=True, sniffer_timeout=5 * timeout,
            timeout=timeout, sniff_timeout=timeout // 2,
            max_retries=retries, retry_on_timeout=True)


def get_es():
    global _ESINSTANCE
    if _ESINSTANCE is None:
        raise ValueError("Global ES not initialized. Call init_es first")
    return _ESINSTANCE

Any function that needs to use the client calls get_es() and then uses the obtained client. Specifically, each worker may call the following function repeatedly (ESD is elasticsearch_dsl)

def lookup_details(hit_dict, index_name, summ_cols, search_cols,
                   timestamp_col="@timestamp", fetch_cols=None):
    """
    Function that is used to lookup event details (from original index)
    :param hit_dict: A dictionary consisting of one "hit"
    :param index_name: The name of index to search for details
    :param summ_cols: A list of columns that will be picked up from
                      "hit" to be searched in the index
    :param search_cols: A list if columns that will be searched. The length
                        should be same as `summ_cols`
    :param timestamp_col: The name of column in the index which has timestamp
                          info. Defaults to "@timestamp". Is used to
                          create filter based on first_seen and last_seen
                          entries in "hit"
    :param fetch_cols: A list of attributes to fetch from the index for
                       matches. Defaults to None which means all attributes
                       are fetched.
    Returns list of "hits" from searched index
    """
    if len(summ_cols) != len(search_cols):
        raise ValueError("summ_cols should be same length as search_cols")
    es = get_es()
    fseen = hit_dict["first_seen"]
    lseen = hit_dict["last_seen"]
    # Collect values of columns to be queried
    qry_cols = [hit_dict[x] for x in summ_cols]
    src_srch = ESD.Search(using=es, index=index_name)
    if fetch_cols:
        src_srch = src_srch.source(includes=fetch_cols)
    src_fltr = src_srch.filter(
        "range", **{timestamp_col: {"lte": lseen, "gte": fseen}})
    # Build rest of search filter
    entry_dict = {}
    for entry in zip(search_cols, qry_cols):
        d = dict([(entry[0], [entry[1], ], )])
        entry_dict.update(d)
    entry_fltr = None
    for d in entry_dict:
        if entry_fltr is None:
            exist = getattr(src_fltr, "filter")
        else:
            exist = getattr(entry_fltr, "filter")
        entry_fltr = exist("terms", **{d: entry_dict[d]})
    return [x.to_dict() for x in entry_fltr.scan()]

This is the function which is getting garbled response when scan() is called and long running searches (5-10s) are being run on 2 or more workers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants