diff --git a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py index c5ef5d32..533ece43 100644 --- a/airflow/dags/ingestion/ask-astro-load-stackoverflow.py +++ b/airflow/dags/ingestion/ask-astro-load-stackoverflow.py @@ -10,7 +10,7 @@ ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev") _WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}" -WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev") +WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDevAnkit") ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID) stackoverflow_cutoff_date = "2021-09-01" @@ -38,7 +38,7 @@ def ask_astro_load_stackoverflow(): """ stack_overflow_docs = ( - task(stack_overflow.extract_stack_overflow) + task(stack_overflow.extract_stack_overflow_archive) .partial(stackoverflow_cutoff_date=stackoverflow_cutoff_date) .expand(tag=stackoverflow_tags) ) diff --git a/airflow/dags/ingestion/ask-astro-load.py b/airflow/dags/ingestion/ask-astro-load.py index 3d0167d1..817659be 100644 --- a/airflow/dags/ingestion/ask-astro-load.py +++ b/airflow/dags/ingestion/ask-astro-load.py @@ -1,8 +1,10 @@ from __future__ import annotations import datetime +import json import logging import os +from pathlib import Path import pandas as pd from include.tasks import split @@ -81,7 +83,15 @@ def get_schema_and_process(schema_file: str) -> list: :param schema_file: path to the schema JSON file """ - class_objects = ask_astro_weaviate_hook.get_schema(schema_file="include/data/schema.json") + try: + class_objects = json.loads(Path(schema_file).read_text()) + except FileNotFoundError: + logger.error(f"Schema file {schema_file} not found.") + raise + except json.JSONDecodeError: + logger.error(f"Invalid JSON in the schema file {schema_file}.") + raise + class_objects["classes"][0].update({"class": WEAVIATE_CLASS}) if "classes" not in class_objects: diff --git a/airflow/dags/monitor/monitor.py b/airflow/dags/monitor/monitor.py index c38cd631..0403efa8 100644 --- a/airflow/dags/monitor/monitor.py +++ b/airflow/dags/monitor/monitor.py @@ -9,12 +9,12 @@ import firebase_admin import requests -from weaviate_provider.hooks.weaviate import WeaviateHook from airflow.decorators import dag, task from airflow.exceptions import AirflowException from airflow.models import TaskInstance from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator +from airflow.providers.weaviate.hooks.weaviate import WeaviateHook from airflow.utils.context import Context from airflow.utils.trigger_rule import TriggerRule diff --git a/airflow/include/tasks/extract/blogs.py b/airflow/include/tasks/extract/blogs.py index bec05921..1d8d3cd1 100644 --- a/airflow/include/tasks/extract/blogs.py +++ b/airflow/include/tasks/extract/blogs.py @@ -48,7 +48,7 @@ def extract_astro_blogs(blog_cutoff_date: datetime) -> list[pd.DataFrame]: df = pd.DataFrame(zip(links, dates), columns=["docLink", "date"]) df["date"] = pd.to_datetime(df["date"]).dt.date - df = df[df["date"] > blog_cutoff_date.date()] + df = df[df["date"] > blog_cutoff_date] df.drop("date", inplace=True, axis=1) df.drop_duplicates(inplace=True) diff --git a/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py b/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py index 8eaa8cfc..90c354a5 100644 --- a/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py +++ b/airflow/include/tasks/extract/utils/weaviate/ask_astro_weaviate_hook.py @@ -1,8 +1,6 @@ from __future__ import annotations -import json import logging -from pathlib import Path from typing import Any import pandas as pd @@ -22,23 +20,6 @@ def __init__(self, *args, **kwargs): self.logger = logging.getLogger("airflow.task") self.client = self.get_client() - def get_schema(self, schema_file: str) -> list: - """ - Reads the schema from a JSON file. - - :param schema_file: path to the schema JSON file. - """ - try: - class_objects = json.loads(Path(schema_file).read_text()) - except FileNotFoundError: - self.logger.error(f"Schema file {schema_file} not found.") - raise - except json.JSONDecodeError: - self.logger.error(f"Invalid JSON in the schema file {schema_file}.") - raise - - return class_objects - def compare_schema_subset(self, class_object: Any, class_schema: Any) -> bool: """ Recursively check if requested schema/object is a subset of the current schema. @@ -130,58 +111,72 @@ def create_schema(self, class_objects: list, existing: str = "ignore") -> None: self.client.schema.create_class(class_object) self.logger.info(f"Created/updated class {class_name}") - def prepare_data_for_ingestion( - self, dfs: list[pd.DataFrame], class_name: str, existing: str, uuid_column: str, vector_column: str + def generate_uuids( + self, + df: pd.DataFrame, + class_name: str, + column_subset: list[str] | None = None, + vector_column: str | None = None, + uuid_column: str | None = None, ) -> tuple[pd.DataFrame, str]: """ - Prepares data for ingestion into Weaviate. - - :param dfs: A list of dataframes from downstream dynamic tasks. - :param class_name: The name of the class to import data. - :param existing: Strategy to handle existing data ('skip', 'replace', 'upsert'). - :param uuid_column: Name of the column containing the UUID. - :param vector_column: Name of the column containing the vector data. - :return: A concatenated and processed DataFrame ready for ingestion. + Adds UUIDs to a DataFrame, useful for upsert operations where UUIDs must be known before ingestion. + By default, UUIDs are generated using a custom function if 'uuid_column' is not specified. + The function can potentially ingest the same data multiple times with different UUIDs. + + :param df: A dataframe with data to generate a UUID from. + :param class_name: The name of the class use as part of the uuid namespace. + :param uuid_column: Name of the column to create. Default is 'id'. + :param column_subset: A list of columns to use for UUID generation. By default, all columns except + vector_column will be used. + :param vector_column: Name of the column containing the vector data. If specified the vector will be + removed prior to generating the uuid. """ - if existing not in ["skip", "replace", "upsert"]: - raise AirflowException("Invalid parameter for 'existing'. Choices are 'skip', 'replace', 'upsert'") + column_names = df.columns.to_list() - df = pd.concat(dfs, ignore_index=True) + column_subset = column_subset or column_names + column_subset.sort() if uuid_column is None: - column_names = df.columns.to_list() - column_names.sort() + self.logger.info(f"No uuid_column provided. Generating UUIDs as column name {uuid_column}.") df = df[column_names] - - self.logger.info("No uuid_column provided. Generating UUIDs for ingest.") if "id" in column_names: raise AirflowException("Property 'id' already in dataset. Consider renaming or specify 'uuid_column'.") else: uuid_column = "id" - df[uuid_column] = df.drop(columns=[vector_column], inplace=False, errors="ignore").apply( - lambda row: generate_uuid5(identifier=row.to_dict(), namespace=class_name), axis=1 + if uuid_column in column_names: + raise AirflowException( + f"Property {uuid_column} already in dataset. Consider renaming or specify a different 'uuid_column'." ) - df.drop_duplicates(inplace=True) + df[uuid_column] = ( + df[column_subset] + .drop(columns=[vector_column], inplace=False, errors="ignore") + .apply(lambda row: generate_uuid5(identifier=row.to_dict(), namespace=class_name), axis=1) + ) return df, uuid_column - def handle_upsert(self, df: pd.DataFrame, class_name: str, doc_key: str, uuid_column: str) -> (pd.DataFrame, dict): + def identify_upsert_targets( + self, df: pd.DataFrame, class_name: str, doc_key: str, uuid_column: str + ) -> pd.DataFrame: """ Handles the 'upsert' operation for data ingestion. :param df: The DataFrame containing the data to be upserted. :param class_name: The name of the class to import data. - :param doc_key: The document key used for upsert operation. + :param doc_key: The document key used for upsert operation. This is a property of the data that + uniquely identifies all chunks associated with one document. :param uuid_column: The column name containing the UUID. - :return: The DataFrame filtered for objects to insert. """ - if doc_key is None: - raise AirflowException("Must specify 'doc_key' if 'existing=upsert'.") + if doc_key is None or doc_key not in df.columns: + raise AirflowException("Specified doc_key is not specified or not in the dataset.") + + if uuid_column is None or uuid_column not in df.columns: + raise AirflowException("Specified uuid_column is not specified or not in the dataset.") - if df[[doc_key, uuid_column]].duplicated().any(): - raise AirflowException("Duplicate rows found. Remove duplicates before ingest.") + df = df.drop_duplicates(subset=[doc_key, uuid_column], keep="first") current_schema = self.client.schema.get(class_name=class_name) doc_key_schema = [prop for prop in current_schema["properties"] if prop["name"] == doc_key] @@ -191,19 +186,26 @@ def handle_upsert(self, df: pd.DataFrame, class_name: str, doc_key: str, uuid_co elif doc_key_schema[0]["tokenization"] != "field": raise AirflowException("Tokenization for provided doc_key is not set to 'field'. Cannot upsert safely.") - objects_to_upsert = self._objects_to_upsert(df, class_name, doc_key, uuid_column) + ids_df = df.groupby(doc_key)[uuid_column].apply(set).reset_index(name="new_ids") + ids_df["existing_ids"] = ids_df[doc_key].apply( + lambda x: self._query_objects(value=x, doc_key=doc_key, uuid_column=uuid_column, class_name=class_name) + ) + + ids_df["objects_to_insert"] = ids_df.apply(lambda x: list(x.new_ids.difference(x.existing_ids)), axis=1) + ids_df["objects_to_delete"] = ids_df.apply(lambda x: list(x.existing_ids.difference(x.new_ids)), axis=1) + ids_df["unchanged_objects"] = ids_df.apply(lambda x: x.new_ids.intersection(x.existing_ids), axis=1) - return df[df[uuid_column].isin(objects_to_upsert["objects_to_insert"])], objects_to_upsert + return ids_df[[doc_key, "objects_to_insert", "objects_to_delete", "unchanged_objects"]] - def batch_process_data( + def batch_ingest( self, df: pd.DataFrame, class_name: str, uuid_column: str, - vector_column: str, - batch_params: dict, existing: str, - verbose: bool, + vector_column: str | None = None, + batch_params: dict = {}, + verbose: bool = False, ) -> (list, Any): """ Processes the DataFrame and batches the data for ingestion into Weaviate. @@ -215,37 +217,48 @@ def batch_process_data( :param batch_params: Parameters for batch configuration. :param existing: Strategy to handle existing data ('skip', 'replace', 'upsert'). :param verbose: Whether to print verbose output. - :return: List of any objects that failed to be added to the batch. """ - self.client.batch.configure(**batch_params) + batch = self.client.batch.configure(**batch_params) batch_errors = [] - with self.client.batch as batch: - for row_id, row in df.iterrows(): - data_object = row.to_dict() - uuid = data_object[uuid_column] + for row_id, row in df.iterrows(): + data_object = row.to_dict() + uuid = data_object.pop(uuid_column) + vector = data_object.pop(vector_column, None) - # Check if the uuid exists and handle accordingly - if self.client.data_object.exists(uuid=uuid, class_name=class_name): + try: + if self.client.data_object.exists(uuid=uuid, class_name=class_name) is True: if existing == "skip": - if verbose: - self.logger.warning(f"UUID {uuid} exists. Skipping.") + if verbose is True: + self.logger.warning(f"UUID {uuid} exists. Skipping.") continue elif existing == "replace": - if verbose: - self.logger.warning(f"UUID {uuid} exists. Overwriting.") + # Default for weaviate is replace existing + if verbose is True: + self.logger.warning(f"UUID {uuid} exists. Overwriting.") - vector = data_object.pop(vector_column, None) - uuid = data_object.pop(uuid_column) + except Exception as e: + if verbose: + self.logger.error(f"Failed to add row {row_id} with UUID {uuid}. Error: {e}") + batch_errors.append({"uuid": uuid, "result": {"errors": str(e)}}) + continue - try: - batch.add_data_object(class_name=class_name, uuid=uuid, data_object=data_object, vector=vector) - if verbose: - self.logger.info(f"Added row {row_id} with UUID {uuid} for batch import.") - except Exception as e: - if verbose: - self.logger.error(f"Failed to add row {row_id} with UUID {uuid}. Error: {e}") - batch_errors.append({"row_id": row_id, "uuid": uuid, "error": str(e)}) + try: + added_row = batch.add_data_object( + class_name=class_name, uuid=uuid, data_object=data_object, vector=vector + ) + if verbose is True: + self.logger.info(f"Added row {row_id} with UUID {added_row} for batch import.") + + except Exception as e: + if verbose: + self.logger.error(f"Failed to add row {row_id} with UUID {uuid}. Error: {e}") + batch_errors.append({"uuid": uuid, "result": {"errors": str(e)}}) + + results = batch.create_objects() + + if len(results) > 0: + batch_errors += self.process_batch_errors(results=results, verbose=verbose) return batch_errors @@ -255,18 +268,21 @@ def process_batch_errors(self, results: list, verbose: bool) -> list: :param results: Results from the batch operation. :param verbose: Flag to enable verbose logging. - :return: List of error messages. """ - batch_errors = [] + errors = [] for item in results: if "errors" in item["result"]: - item_error = {"id": item["id"], "errors": item["result"]["errors"]} + item_error = {"uuid": item["id"], "errors": item["result"]["errors"]} if verbose: - self.logger.info(item_error) - batch_errors.append(item_error) - return batch_errors - - def handle_upsert_rollback(self, objects_to_upsert: dict, class_name: str, verbose: bool) -> None: + self.logger.info( + f"Error occurred in batch process for {item['id']} with error {item['result']['errors']}" + ) + errors.append(item_error) + return errors + + def handle_upsert_rollback( + self, objects_to_upsert: pd.DataFrame, batch_errors: list, class_name: str, verbose: bool + ) -> list: """ Handles rollback of inserts in case of errors during upsert operation. @@ -274,24 +290,54 @@ def handle_upsert_rollback(self, objects_to_upsert: dict, class_name: str, verbo :param class_name: Name of the class in Weaviate. :param verbose: Flag to enable verbose logging. """ - for uuid in objects_to_upsert["objects_to_insert"]: - self.logger.info(f"Removing id {uuid} for rollback.") - if self.client.data_object.exists(uuid=uuid, class_name=class_name): - self.client.data_object.delete(uuid=uuid, class_name=class_name, consistency_level="ALL") - elif verbose: - self.logger.info(f"UUID {uuid} does not exist. Skipping deletion.") - - for uuid in objects_to_upsert["objects_to_delete"]: - if verbose: - self.logger.info(f"Deleting id {uuid} for successful upsert.") - if self.client.data_object.exists(uuid=uuid, class_name=class_name): - self.client.data_object.delete(uuid=uuid, class_name=class_name) - elif verbose: - self.logger.info(f"UUID {uuid} does not exist. Skipping deletion.") + rollback_errors = [] + + error_uuids = {error["uuid"] for error in batch_errors} + + objects_to_upsert["rollback_doc"] = objects_to_upsert.objects_to_insert.apply( + lambda x: any(error_uuids.intersection(x)) + ) + + objects_to_upsert["successful_doc"] = objects_to_upsert.objects_to_insert.apply( + lambda x: error_uuids.isdisjoint(x) + ) + + rollback_objects = objects_to_upsert[objects_to_upsert.rollback_doc].objects_to_insert.to_list() + rollback_objects = {item for sublist in rollback_objects for item in sublist} + + delete_objects = objects_to_upsert[objects_to_upsert.successful_doc].objects_to_delete.to_list() + delete_objects = {item for sublist in delete_objects for item in sublist} + + for uuid in rollback_objects: + try: + if self.client.data_object.exists(uuid=uuid, class_name=class_name): + self.logger.info(f"Removing id {uuid} for rollback.") + self.client.data_object.delete(uuid=uuid, class_name=class_name, consistency_level="ALL") + elif verbose: + self.logger.info(f"UUID {uuid} does not exist. Skipping deletion during rollback.") + except Exception as e: + rollback_errors.append({"uuid": uuid, "result": {"errors": str(e)}}) + if verbose: + self.logger.info(f"Error in rolling back id {uuid}. Error: {str(e)}") + + for uuid in delete_objects: + try: + if self.client.data_object.exists(uuid=uuid, class_name=class_name): + if verbose: + self.logger.info(f"Deleting id {uuid} for successful upsert.") + self.client.data_object.delete(uuid=uuid, class_name=class_name) + elif verbose: + self.logger.info(f"UUID {uuid} does not exist. Skipping deletion.") + except Exception as e: + rollback_errors.append({"uuid": uuid, "result": {"errors": str(e)}}) + if verbose: + self.logger.info(f"Error in rolling back id {uuid}. Error: {str(e)}") + + return rollback_errors def ingest_data( self, - dfs: list[pd.DataFrame], + dfs: list[pd.DataFrame] | pd.DataFrame, class_name: str, existing: str = "skip", doc_key: str = None, @@ -301,45 +347,63 @@ def ingest_data( verbose: bool = True, ) -> list: """ - This task concatenates multiple dataframes from upstream dynamic tasks and vectorized with import to weaviate. - The operator returns a list of any objects that failed to import. - - A 'uuid' is generated based on the content and metadata (the git sha, document url, the document source and a - concatenation of the headers) and Weaviate will create the vectors. - - Upsert and logic relies on a 'doc_key' which is a uniue representation of the document. Because documents can - be represented as multiple chunks (each with a UUID which is unique in the DB) the doc_key is a way to represent - all chunks associated with an ingested document. - - :param dfs: A list of dataframes from downstream dynamic tasks - :param class_name: The name of the class to import data. Class should be created with weaviate schema. - :param existing: Whether to 'upsert', 'skip' or 'replace' any existing documents. Default is 'skip'. - :param doc_key: If using upsert you must specify a doc_key which uniquely identifies a document which may or may - not include multiple (unique) chunks. - :param vector_column: For pre-embedded data specify the name of the column containing the embedding vector - :param uuid_column: For data with pre-generated UUID specify the name of the column containing the UUID - :param batch_params: Additional parameters to pass to the weaviate batch configuration - :param verbose: Whether to print verbose output + Ingests data into Weaviate, handling upserts and rollbacks, and returns a list of objects that failed to import. + This function ingests data from pandas DataFrame(s) into a specified class in Weaviate. It supports various + modes of handling existing data (upsert, skip, replace). Upsert logic uses 'doc_key' as a unique document + identifier, enabling document-level atomicity during ingestion. Rollback is performed for any document + encountering errors during ingest. The function returns a list of objects that failed to import for further + handling. + + :param dfs: A single pandas DataFrame or a list of pandas DataFrames to be ingested. + :param class_name: Name of the class in Weaviate schema where data is to be ingested. + :param existing: Strategy for handling existing data: 'upsert', 'skip', or 'replace'. Default is 'skip'. + :param doc_key: Column in DataFrame uniquely identifying each document, required for 'upsert' operations. + :param uuid_column: Column with pre-generated UUIDs. If not provided, UUIDs will be generated. + :param vector_column: Column with embedding vectors for pre-embedded data. + :param batch_params: Additional parameters for Weaviate batch configuration. + :param verbose: Flag to enable verbose output during the ingestion process. """ global objects_to_upsert + if existing not in ["skip", "replace", "upsert"]: + raise AirflowException("Invalid parameter for 'existing'. Choices are 'skip', 'replace', 'upsert'") - df, uuid_column = self.prepare_data_for_ingestion(dfs, class_name, existing, uuid_column, vector_column) + df = pd.concat(dfs, ignore_index=True) + + if uuid_column is None: + df, uuid_column = self.generate_uuids( + df=df, class_name=class_name, vector_column=vector_column, uuid_column=uuid_column + ) if existing == "upsert": - df, objects_to_upsert = self.handle_upsert(df, class_name, doc_key, uuid_column) + objects_to_upsert = self.identify_upsert_targets( + df=df, class_name=class_name, doc_key=doc_key, uuid_column=uuid_column + ) + + objects_to_insert = {item for sublist in objects_to_upsert.objects_to_insert.tolist() for item in sublist} + + # subset df with only objects that need to be inserted + df = df[df[uuid_column].isin(objects_to_insert)] self.logger.info(f"Passing {len(df)} objects for ingest.") - batch_errors = self.batch_process_data( - df, class_name, uuid_column, vector_column, batch_params, existing, verbose + batch_errors = self.batch_ingest( + df=df, + class_name=class_name, + uuid_column=uuid_column, + vector_column=vector_column, + batch_params=batch_params, + existing=existing, + verbose=verbose, ) - batch_errors += self.process_batch_errors(batch_errors, verbose) + batch_errors = self.process_batch_errors(results=batch_errors, verbose=True) if existing == "upsert" and batch_errors: - self.logger.warning("Error during upsert. Rolling back all inserts.") - self.handle_upsert_rollback(objects_to_upsert, class_name, verbose) + self.logger.warning("Error during upsert. Rolling back all inserts for docs with errors.") + self.handle_upsert_rollback( + objects_to_upsert=objects_to_upsert, batch_errors=batch_errors, class_name=class_name, verbose=verbose + ) return batch_errors @@ -361,41 +425,6 @@ def _query_objects(self, value: Any, doc_key: str, class_name: str, uuid_column: return {additional["_additional"]["id"] for additional in existing_uuids} - def _objects_to_upsert(self, df: pd.DataFrame, class_name: str, doc_key: str, uuid_column: str) -> dict: - """ - Identify the objects that need to be inserted, deleted, or remain unchanged in the Weaviate database for an - upsert operation. This method processes the given DataFrame to determine which objects (based on their UUIDs) - should be inserted into, deleted from, or left unchanged in the database. It groups the DataFrame by the - document key and then compares the resulting groups against existing data in Weaviate to determine the - required action for each group. - - :param df: The DataFrame containing the data to be upserted. - :param class_name: The name of the class to query. - :param doc_key: The name of the property to query. - :param uuid_column: The name of the column containing the UUID. - """ - if doc_key is None: - # Return an empty dictionary or handle the situation as needed - return {} - ids_df = df.groupby(doc_key)[uuid_column].apply(set).reset_index(name="new_ids") - ids_df["existing_ids"] = ids_df[doc_key].apply( - lambda x: self._query_objects(value=x, doc_key=doc_key, uuid_column=uuid_column, class_name=class_name) - ) - - ids_df["objects_to_insert"] = ids_df.apply(lambda x: list(x.new_ids.difference(x.existing_ids)), axis=1) - ids_df["objects_to_delete"] = ids_df.apply(lambda x: list(x.existing_ids.difference(x.new_ids)), axis=1) - ids_df["unchanged_objects"] = ids_df.apply(lambda x: x.new_ids.intersection(x.existing_ids), axis=1) - - objects_to_insert = [item for sublist in ids_df.objects_to_insert.tolist() for item in sublist] - objects_to_delete = [item for sublist in ids_df.objects_to_delete.tolist() for item in sublist] - unchanged_objects = [item for sublist in ids_df.unchanged_objects.tolist() for item in sublist] - - return { - "objects_to_insert": objects_to_insert, - "objects_to_delete": objects_to_delete, - "unchanged_objects": unchanged_objects, - } - def import_baseline( self, seed_baseline_url: str, @@ -411,11 +440,8 @@ def import_baseline( This task ingests data from a baseline of pre-embedded data. This is useful for evaluation and baselining changes over time. This function is used as a python_callable with the weaviate_import decorator. The returned dictionary is passed to the WeaviateImportDataOperator for ingest. The operator returns a list of any objects - that failed to import. - - seed_baseline_url is a URI for a parquet file of pre-embedded data. - - Any existing documents are replaced. The assumption is that this is a first import of data and older data + that failed to import. seed_baseline_url is a URI for a parquet file of pre-embedded data. Any existing + documents are replaced. The assumption is that this is a first import of data and older data should be removed. :param class_name: The name of the class to import data. Class should be created with weaviate schema. @@ -442,7 +468,7 @@ def import_baseline( df = pd.read_parquet(seed_filename) return self.ingest_data( - dfs=[df], + dfs=df, class_name=class_name, existing=existing, doc_key=doc_key,