diff --git a/api/management/commands/tag_ops_learnings.py b/api/management/commands/tag_ops_learnings.py index 5f3f34434b..7ad16c3701 100644 --- a/api/management/commands/tag_ops_learnings.py +++ b/api/management/commands/tag_ops_learnings.py @@ -1,16 +1,17 @@ -import requests -from django.core.management.base import BaseCommand -from api.logger import logger -import pandas as pd -import numpy as np -from retrying import retry import time + import nltk +import numpy as np +import pandas as pd +import requests +from django.contrib.auth.models import User +from django.core.management.base import BaseCommand from nltk.tokenize import LineTokenizer -from api.models import CronJob, CronJobStatus from rest_framework.authtoken.models import Token -from django.contrib.auth.models import User +from retrying import retry +from api.logger import logger +from api.models import CronJob, CronJobStatus CRON_NAME = "extract_tags_for_ops_learnings" CLASSIFY_URL = "https://dreftagging.azurewebsites.net/classify" @@ -25,7 +26,7 @@ class Command(BaseCommand): def set_auth_token(self): user = User.objects.filter(is_superuser=True).first() api_key = Token.objects.filter(user=user)[0].key - self.go_authorization_token = {"Authorization" : "Token " + api_key} + self.go_authorization_token = {"Authorization": "Token " + api_key} def fetch_data(self, dref_final_report, appeal, ops_learning): @@ -35,95 +36,102 @@ def fetchUrl(field): def fetchField(field): dict_field = [] temp_dict = requests.get(GO_API_URL + field + LIMIT_200, headers=self.go_authorization_token).json() - while temp_dict['next']: - dict_field.extend(temp_dict['results']) - temp_dict = fetchUrl(temp_dict['next']) - dict_field.extend(temp_dict['results']) + while temp_dict["next"]: + dict_field.extend(temp_dict["results"]) + temp_dict = fetchUrl(temp_dict["next"]) + dict_field.extend(temp_dict["results"]) return pd.DataFrame.from_dict(dict_field) # read dref final reports, to extract learnings in planned interventions - logger.info('Fetching DREF Final Reports from GO') + logger.info("Fetching DREF Final Reports from GO") dref_final_report = fetchField(dref_final_report) # read appeals to verify which drefs (appeals) are public and which drefs (appeals) are silent - logger.info('Fetching Appeals from GO') + logger.info("Fetching Appeals from GO") appeals = fetchField(appeal) # read ops learning to verify which drefs have already been processed - logger.info('Fetching Operational Learnings from GO') + logger.info("Fetching Operational Learnings from GO") ops_learning = fetchField(ops_learning) - ops_learning['appeal_code'] = [x['code'] for x in ops_learning['appeal']] + ops_learning["appeal_code"] = [x["code"] for x in ops_learning["appeal"]] return dref_final_report, appeals, ops_learning - def filter_final_report(self, final_report, appeal, ops_learning, final_report_is_published=True, appeal_is_published=True, in_ops_learning=False): + def filter_final_report( + self, final_report, appeal, ops_learning, final_report_is_published=True, appeal_is_published=True, in_ops_learning=False + ): if final_report_is_published: - logger.info('Filtering only DREF Final Reports that have been closed') - mask = [x for x in final_report['is_published']] + logger.info("Filtering only DREF Final Reports that have been closed") + mask = [x for x in final_report["is_published"]] final_report = final_report[mask] if appeal_is_published: - logger.info('Filtering only DREF Final Reports that are public') - mask = [x in list(appeal['code']) for x in final_report['appeal_code']] + logger.info("Filtering only DREF Final Reports that are public") + mask = [x in list(appeal["code"]) for x in final_report["appeal_code"]] final_report = final_report[mask] if not in_ops_learning: - logger.info('Filtering only DREF Final Reports that have not been processed yet for operational learning') - list_new_reports = np.setdiff1d(final_report['appeal_code'].unique(),ops_learning['appeal_code'].unique()) + logger.info("Filtering only DREF Final Reports that have not been processed yet for operational learning") + list_new_reports = np.setdiff1d(final_report["appeal_code"].unique(), ops_learning["appeal_code"].unique()) # only reports that are not processed yet - mask = [x in list_new_reports for x in final_report['appeal_code']] + mask = [x in list_new_reports for x in final_report["appeal_code"]] final_report = final_report[mask] if final_report.empty: - logger.warning('There were not find any DREF Final Reports after the filtering process') + logger.warning("There were not find any DREF Final Reports after the filtering process") return None else: - filtered_final_report = final_report[['appeal_code', 'planned_interventions']] - logger.info('There were found %s reports after the filtering process', str(len(filtered_final_report))) + filtered_final_report = final_report[["appeal_code", "planned_interventions"]] + logger.info("There were found %s reports after the filtering process", str(len(filtered_final_report))) return filtered_final_report def split_rows(self, filtered_final_report): def split_planned_interventions(df): - logger.info('Splitting DREF Final Reports per planned intervention') - df = df.explode(column='planned_interventions', ignore_index=True) + logger.info("Splitting DREF Final Reports per planned intervention") + df = df.explode(column="planned_interventions", ignore_index=True) - df['Sector'] = [x['title_display'] for x in df['planned_interventions']] - df['Lessons Learnt'] = [x['lessons_learnt'] for x in df['planned_interventions']] - df['Challenges'] = [x['challenges'] for x in df['planned_interventions']] + df["Sector"] = [x["title_display"] for x in df["planned_interventions"]] + df["Lessons Learnt"] = [x["lessons_learnt"] for x in df["planned_interventions"]] + df["Challenges"] = [x["challenges"] for x in df["planned_interventions"]] - mask_1 = [pd.notna(x) for x in df['Lessons Learnt']] - mask_2 = [pd.notna(x) for x in df['Challenges']] + mask_1 = [pd.notna(x) for x in df["Lessons Learnt"]] + mask_2 = [pd.notna(x) for x in df["Challenges"]] mask = [x or y for x, y in zip(mask_1, mask_2)] df = df[mask] - df.drop(columns='planned_interventions', inplace=True) - - df = df.melt(id_vars=['appeal_code', 'Sector'], value_vars=['Lessons Learnt', 'Challenges'], var_name='Finding', value_name='Excerpts') - df = df[pd.notna(df['Excerpts'])] + df.drop(columns="planned_interventions", inplace=True) + + df = df.melt( + id_vars=["appeal_code", "Sector"], + value_vars=["Lessons Learnt", "Challenges"], + var_name="Finding", + value_name="Excerpts", + ) + df = df[pd.notna(df["Excerpts"])] return df def split_excerpts(df): - logger.info('Splitting unique learnings in each planned intervention') - df['Excerpts_ind'] = [LineTokenizer(blanklines='discard').tokenize(x) for x in df['Excerpts']] - df = df.explode(column='Excerpts_ind', ignore_index=True) - df.drop(columns='Excerpts', inplace=True) + logger.info("Splitting unique learnings in each planned intervention") + df["Excerpts_ind"] = [LineTokenizer(blanklines="discard").tokenize(x) for x in df["Excerpts"]] + df = df.explode(column="Excerpts_ind", ignore_index=True) + df.drop(columns="Excerpts", inplace=True) # remove strings that have less than 5 characters - df['Excerpts_ind'] = [np.nan if pd.notna(x) and len(x) < 5 else x for x in df['Excerpts_ind']] - df = df[pd.notna(df['Excerpts_ind'])] + df["Excerpts_ind"] = [np.nan if pd.notna(x) and len(x) < 5 else x for x in df["Excerpts_ind"]] + df = df[pd.notna(df["Excerpts_ind"])] # catching go format (bullet point) - df['Excerpts_ind'] = [x[2:] if x.startswith('•\t') else x for x in df['Excerpts_ind']] + df["Excerpts_ind"] = [x[2:] if x.startswith("•\t") else x for x in df["Excerpts_ind"]] # catching other formats - df['Excerpts_ind'] = [x[1:] if x.startswith(tuple(['-', '•', '▪', ' '])) else x for x in df['Excerpts_ind']] + df["Excerpts_ind"] = [x[1:] if x.startswith(tuple(["-", "•", "▪", " "])) else x for x in df["Excerpts_ind"]] - df['Excerpts'] = [x.strip() for x in df['Excerpts_ind']] + df["Excerpts"] = [x.strip() for x in df["Excerpts_ind"]] - df.drop(columns='Excerpts_ind', inplace=True) + df.drop(columns="Excerpts_ind", inplace=True) return df @@ -131,18 +139,18 @@ def split_excerpts(df): final_report_learnings = split_excerpts(final_report_interventions) if final_report_interventions.empty: - logger.warning('There were not found any learnings on the DREF Final Reports planned interventions') + logger.warning("There were not found any learnings on the DREF Final Reports planned interventions") return None else: final_report_learnings = split_excerpts(final_report_interventions) return final_report_learnings def tag_data(self, df, tagging, tagging_api_endpoint): - logger.info('Tagging learnings with PER framework') + logger.info("Tagging learnings with PER framework") headers = { - 'accept': 'application/json', - 'Content-Type': 'application/json', + "accept": "application/json", + "Content-Type": "application/json", } url = tagging_api_endpoint @@ -151,25 +159,25 @@ def tag_data(self, df, tagging, tagging_api_endpoint): df.reset_index(inplace=True, drop=True) for i in range(0, len(df)): - data = "\""+df['Excerpts'].iloc[i]+"\"" - data = data.encode('utf-8') + data = '"' + df["Excerpts"].iloc[i] + '"' + data = data.encode("utf-8") response = requests.post(url, headers=headers, data=data) - if (response.status_code == 201) and len(response.json()[0]['tags']) > 0: - df.loc[i, tagging] = response.json()[0]['tags'][0] + if (response.status_code == 201) and len(response.json()[0]["tags"]) > 0: + df.loc[i, tagging] = response.json()[0]["tags"][0] - df['Institution'] = np.empty(len(df)) + df["Institution"] = np.empty(len(df)) for i in range(0, len(df)): - if (df['PER - Component'].iloc[i] == 'Activation of Regional and International Support'): - df.loc[i, 'Institution'] = 'Secretariat' + if df["PER - Component"].iloc[i] == "Activation of Regional and International Support": + df.loc[i, "Institution"] = "Secretariat" else: - df.loc[i, 'Institution'] = 'National Society' + df.loc[i, "Institution"] = "National Society" tagged_data = df return tagged_data def fetch_complementary_data(self, per_formcomponent, primary_sector): - logger.info('Fetching complementary data on PER components ids, sectors ids, finding ids, organisations ids') + logger.info("Fetching complementary data on PER components ids, sectors ids, finding ids, organisations ids") def fetchUrl(field): return requests.get(field).json() @@ -177,29 +185,23 @@ def fetchUrl(field): def fetchField(field): dict_field = [] temp_dict = requests.get(GO_API_URL + field + LIMIT_200).json() - while temp_dict['next']: - dict_field.extend(temp_dict['results']) - temp_dict = fetchUrl(temp_dict['next']) - dict_field.extend(temp_dict['results']) + while temp_dict["next"]: + dict_field.extend(temp_dict["results"]) + temp_dict = fetchUrl(temp_dict["next"]) + dict_field.extend(temp_dict["results"]) return pd.DataFrame.from_dict(dict_field) per_formcomponent = fetchField(per_formcomponent) go_sectors = fetchUrl(GO_API_URL + primary_sector) - dict_per = dict(zip(per_formcomponent['title'], per_formcomponent['id'])) + dict_per = dict(zip(per_formcomponent["title"], per_formcomponent["id"])) - dict_sector = {item['label']: item['key'] for item in go_sectors} + dict_sector = {item["label"]: item["key"] for item in go_sectors} - dict_finding = { - 'Lessons Learnt': 1, - 'Challenges': 2 - } + dict_finding = {"Lessons Learnt": 1, "Challenges": 2} - dict_org = { - 'Secretariat': 1, - 'National Society': 2 - } + dict_org = {"Secretariat": 1, "National Society": 2} mapping_per = { "Activation of Regional and International Support": "Activation of regional and international support", @@ -250,7 +252,7 @@ def fetchField(field): "Unknown": None, "Business continuity": "Business continuity", "emergency Response Procedures (SOP)": "Emergency Response Procedures (SOPs)", - "National Society Specific Areas of intervention": "NS-specific areas of intervention" + "National Society Specific Areas of intervention": "NS-specific areas of intervention", } mapping_sector = { @@ -277,57 +279,59 @@ def fetchField(field): "Multi Purpose Cash": "Livelihoods and basic needs", "Environmental Sustainability": None, "Migration And Displacement": "Migration", - "Coordination And Partnerships":"NS Strengthening", + "Coordination And Partnerships": "NS Strengthening", } return mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding def format_data(self, df, mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding): - logger.info('Formatting data to upload to GO Operational Learning Table') + logger.info("Formatting data to upload to GO Operational Learning Table") - df.loc[:, 'mapped_per'] = [mapping_per[x] if pd.notna(x) else None for x in df['PER - Component']] - df.loc[:, 'id_per'] = [dict_per[x] if pd.notna(x) else None for x in df['mapped_per']] - df.loc[:, 'mapped_sector'] = [mapping_sector[x] if pd.notna(x) else None for x in df['Sector']] - df.loc[:, 'id_sector'] = [dict_sector[x] if pd.notna(x) else None for x in df['mapped_sector']] - df.loc[:, 'id_institution'] = [dict_org[x] for x in df['Institution']] - df.loc[:, 'id_finding'] = [dict_finding[x] for x in df['Finding']] + df.loc[:, "mapped_per"] = [mapping_per[x] if pd.notna(x) else None for x in df["PER - Component"]] + df.loc[:, "id_per"] = [dict_per[x] if pd.notna(x) else None for x in df["mapped_per"]] + df.loc[:, "mapped_sector"] = [mapping_sector[x] if pd.notna(x) else None for x in df["Sector"]] + df.loc[:, "id_sector"] = [dict_sector[x] if pd.notna(x) else None for x in df["mapped_sector"]] + df.loc[:, "id_institution"] = [dict_org[x] for x in df["Institution"]] + df.loc[:, "id_finding"] = [dict_finding[x] for x in df["Finding"]] - formatted_data = df[['appeal_code', 'Excerpts', 'id_per', 'id_sector', 'id_institution', 'id_finding']] + formatted_data = df[["appeal_code", "Excerpts", "id_per", "id_sector", "id_institution", "id_finding"]] return formatted_data def manage_duplicates(self, df): - df = df.groupby(['appeal_code', 'Excerpts', 'id_finding'], as_index=False).agg(list).reset_index() - df.drop(columns=['index'], inplace=True) + df = df.groupby(["appeal_code", "Excerpts", "id_finding"], as_index=False).agg(list).reset_index() + df.drop(columns=["index"], inplace=True) - df['id_per'] = [list(set([y for y in x if pd.notna(y)])) for x in df['id_per']] - df['id_sector'] = [list(set([y for y in x if pd.notna(y)])) for x in df['id_sector']] - df['id_institution'] = [list(set([y for y in x if pd.notna(y)])) for x in df['id_institution']] + df["id_per"] = [list(set([y for y in x if pd.notna(y)])) for x in df["id_per"]] + df["id_sector"] = [list(set([y for y in x if pd.notna(y)])) for x in df["id_sector"]] + df["id_institution"] = [list(set([y for y in x if pd.notna(y)])) for x in df["id_institution"]] deduplicated_data = df return deduplicated_data def post_to_api(self, df, api_post_endpoint): - logger.info('Posting data to GO Operational Learning API') + logger.info("Posting data to GO Operational Learning API") url = api_post_endpoint myobj = {} for i in range(0, len(df)): - myobj[i] = {"learning": df['Excerpts'].iloc[i], - "learning_validated": df['Excerpts'].iloc[i], - "appeal_code": df['appeal_code'].iloc[i], - "type": int(df['id_finding'].iloc[i]), - "type_validated": int(df['id_finding'].iloc[i]), - "sector": df['id_sector'].iloc[i], - "sector_validated": df['id_sector'].iloc[i], - "per_component": df['id_per'].iloc[i], - "per_component_validated": df['id_per'].iloc[i], - "organization": df['id_institution'].iloc[i], - "organization_validated": df['id_institution'].iloc[i], - "is_validated": False} + myobj[i] = { + "learning": df["Excerpts"].iloc[i], + "learning_validated": df["Excerpts"].iloc[i], + "appeal_code": df["appeal_code"].iloc[i], + "type": int(df["id_finding"].iloc[i]), + "type_validated": int(df["id_finding"].iloc[i]), + "sector": df["id_sector"].iloc[i], + "sector_validated": df["id_sector"].iloc[i], + "per_component": df["id_per"].iloc[i], + "per_component_validated": df["id_per"].iloc[i], + "organization": df["id_institution"].iloc[i], + "organization_validated": df["id_institution"].iloc[i], + "is_validated": False, + } # Define a retry decorator @retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, stop_max_attempt_number=5) @@ -353,11 +357,13 @@ def handle(self, *args, **options): self.set_auth_token() # Step 0: Setup - nltk.download('punkt') + nltk.download("punkt") # Step 1: Fetch Data - final_report, appeal, ops_learning = self.fetch_data('dref-final-report', 'appeal', 'ops-learning') - filtered_data = self.filter_final_report(final_report, appeal, ops_learning, final_report_is_published=True, appeal_is_published=True, in_ops_learning=False) + final_report, appeal, ops_learning = self.fetch_data("dref-final-report", "appeal", "ops-learning") + filtered_data = self.filter_final_report( + final_report, appeal, ops_learning, final_report_is_published=True, appeal_is_published=True, in_ops_learning=False + ) rows = 0 if filtered_data is not None: @@ -366,11 +372,15 @@ def handle(self, *args, **options): if split_learnings is not None: # Step 3: Tagging - tagged_data = self.tag_data(split_learnings,'PER - Component' , CLASSIFY_URL) + tagged_data = self.tag_data(split_learnings, "PER - Component", CLASSIFY_URL) # Step 4: Post Processing - mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding = self.fetch_complementary_data('per-formcomponent', 'primarysector') - formatted_data = self.format_data(tagged_data, mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding) + mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding = self.fetch_complementary_data( + "per-formcomponent", "primarysector" + ) + formatted_data = self.format_data( + tagged_data, mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding + ) deduplicated_data = self.manage_duplicates(formatted_data) # Step 5: Post to API Endpoint @@ -384,7 +394,7 @@ def handle(self, *args, **options): "name": CRON_NAME, "message": ("No new learnings added. Done processing ops learnings.",), "num_result": rows, - "status": CronJobStatus.WARNED + "status": CronJobStatus.WARNED, } else: body = { diff --git a/deploy/helm/ifrcgo-helm/values.yaml b/deploy/helm/ifrcgo-helm/values.yaml index b05be39adc..7ba2cd2a3f 100644 --- a/deploy/helm/ifrcgo-helm/values.yaml +++ b/deploy/helm/ifrcgo-helm/values.yaml @@ -150,8 +150,6 @@ cronjobs: schedule: '0 0 * * 0' - command: 'ingest_icrc' schedule: '0 3 * * 0' - - command: 'tag_ops_learnings' - schedule: '4 4 * * 4' elasticsearch: enabled: true