Skip to content

Commit

Permalink
Add more logging and remove asyncio (#86)
Browse files Browse the repository at this point in the history
The asyncio.run call is not actually doing anything here. There is a call
to asyncio within the vespa sdk and the concern is that they could be
interfering with each other. I havent managed to prove this, so I'm also
adding a bunch of logging to get some extra information if the error comes
up again.
  • Loading branch information
olaughter authored Jan 31, 2024
1 parent cfd679a commit cf0f32a
Showing 1 changed file with 19 additions and 10 deletions.
29 changes: 19 additions & 10 deletions src/index/vespa_.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,11 @@ def _get_vespa_instance() -> Vespa:
)


async def _batch_ingest(vespa: Vespa, to_process: Mapping[SchemaName, list]):
def _batch_ingest(vespa: Vespa, to_process: Mapping[SchemaName, list]):
responses: list[VespaResponse] = []
for schema in _SCHEMAS_TO_PROCESS:
if documents := to_process[schema]:
_LOGGER.info(f"Processing {schema}, with {len(documents)} documents")
responses.extend(
vespa.feed_batch(
batch=list(documents),
Expand All @@ -277,13 +278,19 @@ async def _batch_ingest(vespa: Vespa, to_process: Mapping[SchemaName, list]):
)
)

errors = [(r.status_code, r.json) for r in responses if r.status_code >= 300]
if errors:
_LOGGER.error(
"Indexing Failed",
extra={"props": {"error_responses": errors}},
)
raise VespaIndexError("Indexing Failed")
errors = [(r.status_code, r.json) for r in responses if r.status_code >= 300]
if errors:
_LOGGER.error(
"Indexing Failed",
extra={
"props": {
"error_responses": errors,
"document_counts": f"{schema}: {[len(i) for i in to_process.values()]}",
"schema": schema,
},
}
)
raise VespaIndexError("Indexing Failed")


def populate_vespa(
Expand Down Expand Up @@ -321,7 +328,9 @@ def populate_vespa(
)

if len(to_process[DOCUMENT_PASSAGE_SCHEMA]) >= config.VESPA_DOCUMENT_BATCH_SIZE:
asyncio.run(_batch_ingest(vespa, to_process))
_batch_ingest(vespa, to_process)
_LOGGER.info(f"Clearing batch with length: {len(to_process[DOCUMENT_PASSAGE_SCHEMA])}")
to_process.clear()

asyncio.run(_batch_ingest(vespa, to_process))
_LOGGER.info("Final ingest batch")
_batch_ingest(vespa, to_process)

0 comments on commit cf0f32a

Please sign in to comment.