Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added search quality testing pipeline #1774

Merged
merged 11 commits into from
Jul 6, 2024
14 changes: 12 additions & 2 deletions backend/danswer/configs/app_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,18 @@
# NOTE: this is used if and only if the vespa config server is accessible via a
# different host than the main vespa application
VESPA_CONFIG_SERVER_HOST = os.environ.get("VESPA_CONFIG_SERVER_HOST") or VESPA_HOST
VESPA_PORT = os.environ.get("VESPA_PORT") or "8081"
VESPA_TENANT_PORT = os.environ.get("VESPA_TENANT_PORT") or "19071"


def get_vespa_port() -> str:
hagen-danswer marked this conversation as resolved.
Show resolved Hide resolved
return os.environ.get("VESPA_PORT") or "8081"


def get_vespa_tenant_port() -> str:
return os.environ.get("VESPA_TENANT_PORT") or "19071"


# VESPA_PORT = os.environ.get("VESPA_PORT") or "8081"
# VESPA_TENANT_PORT = os.environ.get("VESPA_TENANT_PORT") or "19071"
# The default below is for dockerized deployment
VESPA_DEPLOYMENT_ZIP = (
os.environ.get("VESPA_DEPLOYMENT_ZIP") or "/app/danswer/vespa-app.zip"
Expand Down
2 changes: 1 addition & 1 deletion backend/danswer/db/connector_credential_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def add_credential_to_connector(
credential_id: int,
cc_pair_name: str | None,
is_public: bool,
user: User,
user: User | None,
hagen-danswer marked this conversation as resolved.
Show resolved Hide resolved
db_session: Session,
) -> StatusResponse[int]:
connector = fetch_connector_by_id(connector_id, db_session)
Expand Down
7 changes: 7 additions & 0 deletions backend/danswer/db/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ def get_sqlalchemy_engine() -> Engine:
return _SYNC_ENGINE


def get_sqlalchemy_engine_for_port_number(port: str) -> Engine:
hagen-danswer marked this conversation as resolved.
Show resolved Hide resolved
global _SYNC_ENGINE
connection_string = build_connection_string(db_api=SYNC_DB_API, port=port)
_SYNC_ENGINE = create_engine(connection_string, pool_size=40, max_overflow=10)
return _SYNC_ENGINE


def get_sqlalchemy_async_engine() -> AsyncEngine:
global _ASYNC_ENGINE
if _ASYNC_ENGINE is None:
Expand Down
68 changes: 48 additions & 20 deletions backend/danswer/document_index/vespa/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
import requests
from retry import retry

from danswer.configs.app_configs import get_vespa_port
from danswer.configs.app_configs import get_vespa_tenant_port
from danswer.configs.app_configs import LOG_VESPA_TIMING_INFORMATION
from danswer.configs.app_configs import VESPA_CONFIG_SERVER_HOST
from danswer.configs.app_configs import VESPA_HOST
from danswer.configs.app_configs import VESPA_PORT
from danswer.configs.app_configs import VESPA_TENANT_PORT
from danswer.configs.chat_configs import DOC_TIME_DECAY
from danswer.configs.chat_configs import EDIT_KEYWORD_QUERY
from danswer.configs.chat_configs import HYBRID_ALPHA
Expand Down Expand Up @@ -70,24 +70,44 @@
from danswer.utils.batching import batch_generator
from danswer.utils.logger import setup_logger

# from danswer.configs.app_configs import VESPA_PORT
# from danswer.configs.app_configs import VESPA_TENANT_PORT

logger = setup_logger()

VESPA_DIM_REPLACEMENT_PAT = "VARIABLE_DIM"
DANSWER_CHUNK_REPLACEMENT_PAT = "DANSWER_CHUNK_NAME"
DOCUMENT_REPLACEMENT_PAT = "DOCUMENT_REPLACEMENT"
DATE_REPLACEMENT = "DATE_REPLACEMENT"


# config server
VESPA_CONFIG_SERVER_URL = f"http://{VESPA_CONFIG_SERVER_HOST}:{VESPA_TENANT_PORT}"
VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2"
# VESPA_CONFIG_SERVER_URL = f"http://{VESPA_CONFIG_SERVER_HOST}:{VESPA_TENANT_PORT}"
# VESPA_APPLICATION_ENDPOINT = f"{VESPA_CONFIG_SERVER_URL}/application/v2"
def vespa_application_endpoint_builder() -> str:
vespa_tenant_port = get_vespa_tenant_port()
url = f"http://{VESPA_CONFIG_SERVER_HOST}:{vespa_tenant_port}"
return f"{url}/application/v2"


# main search application
VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}"
# VESPA_APP_CONTAINER_URL = f"http://{VESPA_HOST}:{VESPA_PORT}"
# danswer_chunk below is defined in vespa/app_configs/schemas/danswer_chunk.sd
DOCUMENT_ID_ENDPOINT = (
f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid"
)
SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/"
# DOCUMENT_ID_ENDPOINT = (
# f"{VESPA_APP_CONTAINER_URL}/document/v1/default/{{index_name}}/docid"
# )
def document_id_endpoint_builder() -> str:
vespa_port = get_vespa_port()
vespa_app_container_url = f"http://{VESPA_HOST}:{vespa_port}"
return f"{vespa_app_container_url}/document/v1/default/{{index_name}}/docid"


# SEARCH_ENDPOINT = f"{VESPA_APP_CONTAINER_URL}/search/"
def search_endpoint_builder() -> str:
vespa_port = get_vespa_port()
vespa_app_container_url = f"http://{VESPA_HOST}:{vespa_port}"
return f"{vespa_app_container_url}/search/"


_BATCH_SIZE = 128 # Specific to Vespa
_NUM_THREADS = (
Expand All @@ -108,7 +128,7 @@ class _VespaUpdateRequest:
update_request: dict[str, dict]


@retry(tries=3, delay=1, backoff=2)
@retry(tries=5, delay=10, backoff=2)
hagen-danswer marked this conversation as resolved.
Show resolved Hide resolved
def _does_document_exist(
doc_chunk_id: str,
index_name: str,
Expand All @@ -117,8 +137,14 @@ def _does_document_exist(
"""Returns whether the document already exists and the users/group whitelists
Specifically in this case, document refers to a vespa document which is equivalent to a Danswer
chunk. This checks for whether the chunk exists already in the index"""
doc_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{doc_chunk_id}"
doc_fetch_response = http_client.get(doc_url)
doc_url = (
f"{document_id_endpoint_builder().format(index_name=index_name)}/{doc_chunk_id}"
)
try:
hagen-danswer marked this conversation as resolved.
Show resolved Hide resolved
doc_fetch_response = http_client.get(doc_url)
except Exception as e:
raise e

if doc_fetch_response.status_code == 404:
return False

Expand Down Expand Up @@ -151,7 +177,7 @@ def _get_vespa_chunks_by_document_id(
) -> list[dict]:
# Constructing the URL for the Visit API
# NOTE: visit API uses the same URL as the document API, but with different params
url = DOCUMENT_ID_ENDPOINT.format(index_name=index_name)
url = document_id_endpoint_builder().format(index_name=index_name)

# build the list of fields to retrieve
field_set_list = (
Expand Down Expand Up @@ -248,7 +274,7 @@ def _delete_vespa_doc_chunks(
for chunk_id in doc_chunk_ids:
try:
res = http_client.delete(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{chunk_id}"
f"{document_id_endpoint_builder().format(index_name=index_name)}/{chunk_id}"
)
res.raise_for_status()
except httpx.HTTPStatusError as e:
Expand Down Expand Up @@ -367,7 +393,7 @@ def _index_vespa_chunk(
DOCUMENT_SETS: {document_set: 1 for document_set in chunk.document_sets},
}

vespa_url = f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_chunk_id}"
vespa_url = f"{document_id_endpoint_builder().format(index_name=index_name)}/{vespa_chunk_id}"
logger.debug(f'Indexing to URL "{vespa_url}"')
res = http_client.post(
vespa_url, headers=json_header, json={"fields": vespa_document_fields}
Expand Down Expand Up @@ -625,7 +651,7 @@ def _vespa_hit_to_inference_chunk(hit: dict[str, Any]) -> InferenceChunk:
)


@retry(tries=3, delay=1, backoff=2)
@retry(tries=10, delay=2, backoff=2)
def _query_vespa(query_params: Mapping[str, str | int | float]) -> list[InferenceChunk]:
if "query" in query_params and not cast(str, query_params["query"]).strip():
raise ValueError("No/empty query received")
Expand All @@ -640,7 +666,7 @@ def _query_vespa(query_params: Mapping[str, str | int | float]) -> list[Inferenc
)

response = requests.post(
SEARCH_ENDPOINT,
search_endpoint_builder(),
json=params,
)
try:
Expand Down Expand Up @@ -684,7 +710,7 @@ def _query_vespa(query_params: Mapping[str, str | int | float]) -> list[Inferenc
@retry(tries=3, delay=1, backoff=2)
def _inference_chunk_by_vespa_id(vespa_id: str, index_name: str) -> InferenceChunk:
res = requests.get(
f"{DOCUMENT_ID_ENDPOINT.format(index_name=index_name)}/{vespa_id}"
f"{document_id_endpoint_builder().format(index_name=index_name)}/{vespa_id}"
)
res.raise_for_status()

Expand Down Expand Up @@ -755,7 +781,9 @@ def ensure_indices_exist(
index_embedding_dim: int,
secondary_index_embedding_dim: int | None,
) -> None:
deploy_url = f"{VESPA_APPLICATION_ENDPOINT}/tenant/default/prepareandactivate"
deploy_url = (
f"{vespa_application_endpoint_builder()}/tenant/default/prepareandactivate"
)
logger.debug(f"Sending Vespa zip to {deploy_url}")

vespa_schema_path = os.path.join(
Expand Down Expand Up @@ -943,7 +971,7 @@ def update(self, update_requests: list[UpdateRequest]) -> None:
processed_updates_requests.append(
_VespaUpdateRequest(
document_id=document_id,
url=f"{DOCUMENT_ID_ENDPOINT.format(index_name=self.index_name)}/{doc_chunk_id}",
url=f"{document_id_endpoint_builder().format(index_name=self.index_name)}/{doc_chunk_id}",
update_request=update_dict,
)
)
Expand Down
8 changes: 5 additions & 3 deletions backend/scripts/save_load_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from danswer.configs.app_configs import POSTGRES_PASSWORD
from danswer.configs.app_configs import POSTGRES_PORT
from danswer.configs.app_configs import POSTGRES_USER
from danswer.document_index.vespa.index import DOCUMENT_ID_ENDPOINT
from danswer.document_index.vespa.index import document_id_endpoint_builder
from danswer.utils.logger import setup_logger

logger = setup_logger()
Expand Down Expand Up @@ -64,7 +64,7 @@ def save_vespa(filename: str) -> None:
while continuation is not None:
if continuation:
params = {"continuation": continuation}
response = requests.get(DOCUMENT_ID_ENDPOINT, params=params)
response = requests.get(document_id_endpoint_builder(), params=params)
response.raise_for_status()
found = response.json()
continuation = found.get("continuation")
Expand All @@ -86,7 +86,9 @@ def load_vespa(filename: str) -> None:
new_doc = json.loads(line.strip())
doc_id = new_doc["update"].split("::")[-1]
response = requests.post(
DOCUMENT_ID_ENDPOINT + "/" + doc_id, headers=headers, json=new_doc
document_id_endpoint_builder() + "/" + doc_id,
headers=headers,
json=new_doc,
)
response.raise_for_status()

Expand Down
Loading
Loading