Skip to content
44 changes: 40 additions & 4 deletions engine/clients/elasticsearch/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,40 @@
ELASTIC_PORT = 9200
ELASTIC_INDEX = "bench"
ELASTIC_USER = "elastic"
ELASTIC_PASSWORD = "passwd"
import os

from elasticsearch import Elasticsearch

ELASTIC_PORT = int(os.getenv("ELASTIC_PORT", 9200))
ELASTIC_INDEX = os.getenv("ELASTIC_INDEX", "bench")
ELASTIC_USER = os.getenv("ELASTIC_USER", "elastic")
ELASTIC_PASSWORD = os.getenv("ELASTIC_PASSWORD", "passwd")
ELASTIC_API_KEY = os.getenv("ELASTIC_API_KEY", None)
ELASTIC_TIMEOUT = int(os.getenv("ELASTIC_TIMEOUT", 90))


def get_es_client(host, connection_params):
client: Elasticsearch = None
init_params = {
**{
"verify_certs": False,
"request_timeout": ELASTIC_TIMEOUT,
"retry_on_timeout": True,
},
**connection_params,
}
if host.startswith("http"):
url = ""
else:
url = "http://"
url += f"{host}:{ELASTIC_PORT}"
if ELASTIC_API_KEY is None:
client = Elasticsearch(
url,
basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
**init_params,
)
else:
client = Elasticsearch(
url,
api_key=ELASTIC_API_KEY,
**init_params,
)
return client
21 changes: 2 additions & 19 deletions engine/clients/elasticsearch/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,7 @@
from engine.base_client import IncompatibilityError
from engine.base_client.configure import BaseConfigurator
from engine.base_client.distances import Distance
from engine.clients.elasticsearch.config import (
ELASTIC_INDEX,
ELASTIC_PASSWORD,
ELASTIC_PORT,
ELASTIC_USER,
)
from engine.clients.elasticsearch.config import ELASTIC_INDEX, get_es_client


class ElasticConfigurator(BaseConfigurator):
Expand All @@ -25,19 +20,7 @@ class ElasticConfigurator(BaseConfigurator):

def __init__(self, host, collection_params: dict, connection_params: dict):
super().__init__(host, collection_params, connection_params)
init_params = {
**{
"verify_certs": False,
"request_timeout": 90,
"retry_on_timeout": True,
},
**connection_params,
}
self.client = Elasticsearch(
f"http://{host}:{ELASTIC_PORT}",
basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
**init_params,
)
self.client = get_es_client(host, connection_params)

def clean(self):
try:
Expand Down
18 changes: 6 additions & 12 deletions engine/clients/elasticsearch/search.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
import copy
import multiprocessing as mp
import uuid
from typing import List, Tuple

from elasticsearch import Elasticsearch

from engine.base_client.search import BaseSearcher
from engine.clients.elasticsearch.config import (
ELASTIC_INDEX,
ELASTIC_PASSWORD,
ELASTIC_PORT,
ELASTIC_USER,
)
from engine.clients.elasticsearch.config import ELASTIC_INDEX, get_es_client
from engine.clients.elasticsearch.parser import ElasticConditionParser


Expand Down Expand Up @@ -38,12 +34,10 @@ def init_client(cls, host, distance, connection_params: dict, search_params: dic
},
**connection_params,
}
cls.client: Elasticsearch = Elasticsearch(
f"http://{host}:{ELASTIC_PORT}",
basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
**init_params,
)
cls.search_params = search_params
cls.client = get_es_client(host, connection_params)
cls.search_params = copy.deepcopy(search_params)
# pop parallel
cls.search_params.pop("parallel", "1")

@classmethod
def search_one(cls, vector, meta_conditions, top) -> List[Tuple[int, float]]:
Expand Down
21 changes: 2 additions & 19 deletions engine/clients/elasticsearch/upload.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,7 @@
from elasticsearch import Elasticsearch

from engine.base_client.upload import BaseUploader
from engine.clients.elasticsearch.config import (
ELASTIC_INDEX,
ELASTIC_PASSWORD,
ELASTIC_PORT,
ELASTIC_USER,
)
from engine.clients.elasticsearch.config import ELASTIC_INDEX, get_es_client


class ClosableElastic(Elasticsearch):
Expand All @@ -28,19 +23,7 @@ def get_mp_start_method(cls):

@classmethod
def init_client(cls, host, distance, connection_params, upload_params):
init_params = {
**{
"verify_certs": False,
"request_timeout": 90,
"retry_on_timeout": True,
},
**connection_params,
}
cls.client = Elasticsearch(
f"http://{host}:{ELASTIC_PORT}",
basic_auth=(ELASTIC_USER, ELASTIC_PASSWORD),
**init_params,
)
cls.client = get_es_client(host, connection_params)
cls.upload_params = upload_params

@classmethod
Expand Down