Skip to content

Commit

Permalink
Fix comments from Michael
Browse files Browse the repository at this point in the history
  • Loading branch information
sunank200 committed Nov 23, 2023
1 parent 44c4bf2 commit c036d67
Show file tree
Hide file tree
Showing 5 changed files with 205 additions and 169 deletions.
4 changes: 2 additions & 2 deletions airflow/dags/ingestion/ask-astro-load-stackoverflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
ask_astro_env = os.environ.get("ASK_ASTRO_ENV", "dev")

_WEAVIATE_CONN_ID = f"weaviate_{ask_astro_env}"
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDev")
WEAVIATE_CLASS = os.environ.get("WEAVIATE_CLASS", "DocsDevAnkit")
ask_astro_weaviate_hook = AskAstroWeaviateHook(_WEAVIATE_CONN_ID)
stackoverflow_cutoff_date = "2021-09-01"

Expand Down Expand Up @@ -38,7 +38,7 @@ def ask_astro_load_stackoverflow():
"""

stack_overflow_docs = (
task(stack_overflow.extract_stack_overflow)
task(stack_overflow.extract_stack_overflow_archive)
.partial(stackoverflow_cutoff_date=stackoverflow_cutoff_date)
.expand(tag=stackoverflow_tags)
)
Expand Down
12 changes: 11 additions & 1 deletion airflow/dags/ingestion/ask-astro-load.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import datetime
import json
import logging
import os
from pathlib import Path

import pandas as pd
from include.tasks import split
Expand Down Expand Up @@ -81,7 +83,15 @@ def get_schema_and_process(schema_file: str) -> list:
:param schema_file: path to the schema JSON file
"""
class_objects = ask_astro_weaviate_hook.get_schema(schema_file="include/data/schema.json")
try:
class_objects = json.loads(Path(schema_file).read_text())
except FileNotFoundError:
logger.error(f"Schema file {schema_file} not found.")
raise
except json.JSONDecodeError:
logger.error(f"Invalid JSON in the schema file {schema_file}.")
raise

class_objects["classes"][0].update({"class": WEAVIATE_CLASS})

if "classes" not in class_objects:
Expand Down
2 changes: 1 addition & 1 deletion airflow/dags/monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@

import firebase_admin
import requests
from weaviate_provider.hooks.weaviate import WeaviateHook

from airflow.decorators import dag, task
from airflow.exceptions import AirflowException
from airflow.models import TaskInstance
from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
from airflow.providers.weaviate.hooks.weaviate import WeaviateHook
from airflow.utils.context import Context
from airflow.utils.trigger_rule import TriggerRule

Expand Down
2 changes: 1 addition & 1 deletion airflow/include/tasks/extract/blogs.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def extract_astro_blogs(blog_cutoff_date: datetime) -> list[pd.DataFrame]:
df = pd.DataFrame(zip(links, dates), columns=["docLink", "date"])

df["date"] = pd.to_datetime(df["date"]).dt.date
df = df[df["date"] > blog_cutoff_date.date()]
df = df[df["date"] > blog_cutoff_date]
df.drop("date", inplace=True, axis=1)
df.drop_duplicates(inplace=True)

Expand Down
Loading

0 comments on commit c036d67

Please sign in to comment.