-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Extracting tags for OpsLearning + tokenizer
- Loading branch information
1 parent
659b667
commit 520af39
Showing
2 changed files
with
399 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,397 @@ | ||
import requests | ||
from django.core.management.base import BaseCommand | ||
from api.logger import logger | ||
import pandas as pd | ||
import numpy as np | ||
from retrying import retry | ||
import time | ||
import nltk | ||
from nltk.tokenize import LineTokenizer | ||
from api.models import CronJob, CronJobStatus | ||
from rest_framework.authtoken.models import Token | ||
from django.contrib.auth.models import User | ||
|
||
|
||
CRON_NAME = "extract_tags_for_ops_learnings" | ||
CLASSIFY_URL = "https://dreftagging.azurewebsites.net/classify" | ||
GO_API_URL = "https://goadmin.ifrc.org/api/v2/" | ||
OPS_LEARNING_URL = GO_API_URL + "ops-learning/" | ||
LIMIT_200 = "/?limit=200/" | ||
|
||
|
||
class Command(BaseCommand): | ||
help = "Extracting tags for Operational Learnings" | ||
|
||
def set_auth_token(self): | ||
user = User.objects.filter(is_superuser=True).first() | ||
api_key = Token.objects.filter(user=user)[0].key | ||
self.go_authorization_token = {"Authorization" : "Token " + api_key} | ||
|
||
def fetch_data(self, dref_final_report, appeal, ops_learning): | ||
|
||
def fetchUrl(field): | ||
return requests.get(field, headers=self.go_authorization_token).json() | ||
|
||
def fetchField(field): | ||
dict_field = [] | ||
temp_dict = requests.get(GO_API_URL + field + LIMIT_200, headers=self.go_authorization_token).json() | ||
while temp_dict['next']: | ||
dict_field.extend(temp_dict['results']) | ||
temp_dict = fetchUrl(temp_dict['next']) | ||
dict_field.extend(temp_dict['results']) | ||
return pd.DataFrame.from_dict(dict_field) | ||
|
||
# read dref final reports, to extract learnings in planned interventions | ||
logger.info('Fetching DREF Final Reports from GO') | ||
dref_final_report = fetchField(dref_final_report) | ||
|
||
# read appeals to verify which drefs (appeals) are public and which drefs (appeals) are silent | ||
logger.info('Fetching Appeals from GO') | ||
appeals = fetchField(appeal) | ||
|
||
# read ops learning to verify which drefs have already been processed | ||
logger.info('Fetching Operational Learnings from GO') | ||
ops_learning = fetchField(ops_learning) | ||
ops_learning['appeal_code'] = [x['code'] for x in ops_learning['appeal']] | ||
|
||
return dref_final_report, appeals, ops_learning | ||
|
||
def filter_final_report(self, final_report, appeal, ops_learning, final_report_is_published=True, appeal_is_published=True, in_ops_learning=False): | ||
|
||
if final_report_is_published: | ||
logger.info('Filtering only DREF Final Reports that have been closed') | ||
mask = [x for x in final_report['is_published']] | ||
final_report = final_report[mask] | ||
|
||
if appeal_is_published: | ||
logger.info('Filtering only DREF Final Reports that are public') | ||
mask = [x in list(appeal['code']) for x in final_report['appeal_code']] | ||
final_report = final_report[mask] | ||
|
||
if not in_ops_learning: | ||
logger.info('Filtering only DREF Final Reports that have not been processed yet for operational learning') | ||
list_new_reports = np.setdiff1d(final_report['appeal_code'].unique(),ops_learning['appeal_code'].unique()) | ||
|
||
# only reports that are not processed yet | ||
mask = [x in list_new_reports for x in final_report['appeal_code']] | ||
final_report = final_report[mask] | ||
|
||
if final_report.empty: | ||
logger.warning('There were not find any DREF Final Reports after the filtering process') | ||
return None | ||
|
||
else: | ||
filtered_final_report = final_report[['appeal_code', 'planned_interventions']] | ||
logger.info('There were found %s reports after the filtering process', str(len(filtered_final_report))) | ||
return filtered_final_report | ||
|
||
def split_rows(self, filtered_final_report): | ||
|
||
def split_planned_interventions(df): | ||
logger.info('Splitting DREF Final Reports per planned intervention') | ||
df = df.explode(column='planned_interventions', ignore_index=True) | ||
|
||
df['Sector'] = [x['title_display'] for x in df['planned_interventions']] | ||
df['Lessons Learnt'] = [x['lessons_learnt'] for x in df['planned_interventions']] | ||
df['Challenges'] = [x['challenges'] for x in df['planned_interventions']] | ||
|
||
mask_1 = [pd.notna(x) for x in df['Lessons Learnt']] | ||
mask_2 = [pd.notna(x) for x in df['Challenges']] | ||
mask = [x or y for x, y in zip(mask_1, mask_2)] | ||
df = df[mask] | ||
df.drop(columns='planned_interventions', inplace=True) | ||
|
||
df = df.melt(id_vars=['appeal_code', 'Sector'], value_vars=['Lessons Learnt', 'Challenges'], var_name='Finding', value_name='Excerpts') | ||
df = df[pd.notna(df['Excerpts'])] | ||
return df | ||
|
||
def split_excerpts(df): | ||
logger.info('Splitting unique learnings in each planned intervention') | ||
df['Excerpts_ind'] = [LineTokenizer(blanklines='discard').tokenize(x) for x in df['Excerpts']] | ||
df = df.explode(column='Excerpts_ind', ignore_index=True) | ||
df.drop(columns='Excerpts', inplace=True) | ||
|
||
# remove strings that have less than 5 characters | ||
df['Excerpts_ind'] = [np.nan if pd.notna(x) and len(x) < 5 else x for x in df['Excerpts_ind']] | ||
df = df[pd.notna(df['Excerpts_ind'])] | ||
|
||
# catching go format (bullet point) | ||
df['Excerpts_ind'] = [x[2:] if x.startswith('•\t') else x for x in df['Excerpts_ind']] | ||
|
||
# catching other formats | ||
df['Excerpts_ind'] = [x[1:] if x.startswith(tuple(['-', '•', '▪', ' '])) else x for x in df['Excerpts_ind']] | ||
|
||
df['Excerpts'] = [x.strip() for x in df['Excerpts_ind']] | ||
|
||
df.drop(columns='Excerpts_ind', inplace=True) | ||
|
||
return df | ||
|
||
final_report_interventions = split_planned_interventions(filtered_final_report) | ||
final_report_learnings = split_excerpts(final_report_interventions) | ||
|
||
if final_report_interventions.empty: | ||
logger.warning('There were not found any learnings on the DREF Final Reports planned interventions') | ||
return None | ||
else: | ||
final_report_learnings = split_excerpts(final_report_interventions) | ||
return final_report_learnings | ||
|
||
def tag_data(self, df, tagging, tagging_api_endpoint): | ||
logger.info('Tagging learnings with PER framework') | ||
|
||
headers = { | ||
'accept': 'application/json', | ||
'Content-Type': 'application/json', | ||
} | ||
|
||
url = tagging_api_endpoint | ||
|
||
df[tagging] = None | ||
df.reset_index(inplace=True, drop=True) | ||
|
||
for i in range(0, len(df)): | ||
data = "\""+df['Excerpts'].iloc[i]+"\"" | ||
data = data.encode('utf-8') | ||
response = requests.post(url, headers=headers, data=data) | ||
if (response.status_code == 201) and len(response.json()[0]['tags']) > 0: | ||
df.loc[i, tagging] = response.json()[0]['tags'][0] | ||
|
||
df['Institution'] = np.empty(len(df)) | ||
|
||
for i in range(0, len(df)): | ||
if (df['PER - Component'].iloc[i] == 'Activation of Regional and International Support'): | ||
df.loc[i, 'Institution'] = 'Secretariat' | ||
else: | ||
df.loc[i, 'Institution'] = 'National Society' | ||
|
||
tagged_data = df | ||
return tagged_data | ||
|
||
def fetch_complementary_data(self, per_formcomponent, primary_sector): | ||
logger.info('Fetching complementary data on PER components ids, sectors ids, finding ids, organisations ids') | ||
|
||
def fetchUrl(field): | ||
return requests.get(field).json() | ||
|
||
def fetchField(field): | ||
dict_field = [] | ||
temp_dict = requests.get(GO_API_URL + field + LIMIT_200).json() | ||
while temp_dict['next']: | ||
dict_field.extend(temp_dict['results']) | ||
temp_dict = fetchUrl(temp_dict['next']) | ||
dict_field.extend(temp_dict['results']) | ||
return pd.DataFrame.from_dict(dict_field) | ||
|
||
per_formcomponent = fetchField(per_formcomponent) | ||
|
||
go_sectors = fetchUrl(GO_API_URL + primary_sector) | ||
|
||
dict_per = dict(zip(per_formcomponent['title'], per_formcomponent['id'])) | ||
|
||
dict_sector = {item['label']: item['key'] for item in go_sectors} | ||
|
||
dict_finding = { | ||
'Lessons Learnt': 1, | ||
'Challenges': 2 | ||
} | ||
|
||
dict_org = { | ||
'Secretariat': 1, | ||
'National Society': 2 | ||
} | ||
|
||
mapping_per = { | ||
"Activation of Regional and International Support": "Activation of regional and international support", | ||
"Affected Population Selection": "Affected population selection", | ||
"Business Continuity": "Business continuity", | ||
"Cash and Voucher Assistance": "Cash Based Intervention (CBI)", | ||
"Communications in Emergencies": "Communication in emergencies", | ||
"Coordination with Authorities": "Coordination with authorities", | ||
"Coordination with External Agencies and NGOs": "Coordination with External Agencies and NGOs", | ||
"Coordination with Local Community Level Responders": "Coordination with local community level responders", | ||
"Coordination with Movement": "Coordination with Movement", | ||
"DRM Laws, Advocacy and Dissemination": "DRM Laws, Advocacy and Dissemination", | ||
"Early Action Mechanisms": "Early Action Mechanisms", | ||
"Emergency Needs Assessment and Planning": "Emergency Needs Assessment", | ||
"Emergency Operations Centre (EOC)": "Emergency Operations Centre (EOC)", | ||
"Emergency Response Procedures (SOP)": "Emergency Response Procedures (SOPs)", | ||
"Finance and Admin. Policy and Emergency Procedures": "Finance and Admin policy and emergency procedures", | ||
"Hazard, Context and Risk Analysis, Monitoring and Early Warning": "Hazard, Context and Risk Analysis, Monitoring and Early Warning", | ||
"Information and Communication Technology (ICT)": "Information and Communication Technology (ICT)", | ||
"Information Management": "Information Management (IM)", | ||
"Logistics - Logistics Management": "LOGISTICS MANAGEMENT", | ||
"Logistics - Procurement": "PROCUREMENT", | ||
"Logistics - Warehouse and Stock Management": "WAREHOUSE AND STOCK MANAGEMENT", | ||
"Mapping of NS Capacities": "Mapping of NS capacities", | ||
"NS Specific Areas of Intervention": "NS-specific areas of intervention", | ||
"Operations Monitoring, Evaluation, Reporting and Learning": "Operations Monitoring, Evaluation, Reporting and Learning", | ||
"Pre-Disaster Meetings and Agreements": "Pre-disaster meetings and agreements", | ||
"Preparedness Plans and Budgets": "Preparedness plans and budgets", | ||
"Quality and Accountability": "Quality and accountability", | ||
"RC Auxiliary Role, Mandate and Law": "RC auxiliary role, Mandate and Law", | ||
"Resources Mobilisation": "Resource Mobilisation", | ||
"Response and Recovery Planning": "Response and recovery planning", | ||
"Risk Management": "Risk management", | ||
"Safety and Security Management": "Safety and security management", | ||
"Staff and Volunteer Management": "Staff and volunteer management", | ||
"Testing and Learning": "Testing and Learning", | ||
"Cooperation with Private Sector": "Cooperation with private sector", | ||
"Disaster Risk Management Strategy": "DRM Strategy", | ||
"Logistics - Supply Chain Management": "SUPPLY CHAIN MANAGEMENT", | ||
"Logistics - Transportation Management": "FLEET AND TRANSPORTATION MANAGEMENT", | ||
"Scenario Planning": "Scenario planning", | ||
"Civil Military Relations": "Civil Military Relations", | ||
"Disaster Risk Management Policy": "DRM Policy", | ||
"information and Communication Technology (ICT)": "Information and Communication Technology (ICT)", | ||
"Coordination with local community level responders": "Coordination with local community level responders", | ||
"Emergency Response Procedures (SOPs)": "Emergency Response Procedures (SOPs)", | ||
"Logistics - Transport": "FLEET AND TRANSPORTATION MANAGEMENT", | ||
"Unknown": None, | ||
"Business continuity": "Business continuity", | ||
"emergency Response Procedures (SOP)": "Emergency Response Procedures (SOPs)", | ||
"National Society Specific Areas of intervention": "NS-specific areas of intervention" | ||
} | ||
|
||
mapping_sector = { | ||
"Strategies for implementation": None, # No direct match found # no sector(?) | ||
"Disaster Risk Reduction and Climate Action": "DRR", | ||
"Health": "Health (public)", | ||
"Livelihoods and Basic Needs": "Livelihoods and basic needs", | ||
"Migration and Displacement": "Migration", | ||
"Protection, Gender and Inclusion": "PGI", | ||
"Shelter and Settlements": "Shelter", | ||
"Water Sanitation and Hygiene": "WASH", | ||
"Secretariat Services": None, # No direct match found # need to bring it out as IFRC learning | ||
"National Society Strengthening": "NS Strengthening", | ||
"Water, Sanitation And Hygiene": "WASH", | ||
"Protection, Gender And Inclusion": "PGI", | ||
"Shelter Housing And Settlements": "Shelter", | ||
"Livelihoods And Basic Needs": "Livelihoods and basic needs", | ||
"Community Engagement And Accountability": "CEA", | ||
"Multi-purpose Cash": "Livelihoods and basic needs", # No direct match found | ||
"Risk Reduction, Climate Adaptation And Recovery": "DRR", | ||
"Migration": "Migration", | ||
"Education": "Education", | ||
"Shelter and Basic Household Items": "Shelter", | ||
"Multi Purpose Cash": "Livelihoods and basic needs", | ||
"Environmental Sustainability": None, | ||
"Migration And Displacement": "Migration", | ||
"Coordination And Partnerships":"NS Strengthening", | ||
} | ||
|
||
return mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding | ||
|
||
def format_data(self, df, mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding): | ||
logger.info('Formatting data to upload to GO Operational Learning Table') | ||
|
||
df.loc[:, 'mapped_per'] = [mapping_per[x] if pd.notna(x) else None for x in df['PER - Component']] | ||
df.loc[:, 'id_per'] = [dict_per[x] if pd.notna(x) else None for x in df['mapped_per']] | ||
df.loc[:, 'mapped_sector'] = [mapping_sector[x] if pd.notna(x) else None for x in df['Sector']] | ||
df.loc[:, 'id_sector'] = [dict_sector[x] if pd.notna(x) else None for x in df['mapped_sector']] | ||
df.loc[:, 'id_institution'] = [dict_org[x] for x in df['Institution']] | ||
df.loc[:, 'id_finding'] = [dict_finding[x] for x in df['Finding']] | ||
|
||
formatted_data = df[['appeal_code', 'Excerpts', 'id_per', 'id_sector', 'id_institution', 'id_finding']] | ||
|
||
return formatted_data | ||
|
||
def manage_duplicates(self, df): | ||
|
||
df = df.groupby(['appeal_code', 'Excerpts', 'id_finding'], as_index=False).agg(list).reset_index() | ||
df.drop(columns=['index'], inplace=True) | ||
|
||
df['id_per'] = [list(set([y for y in x if pd.notna(y)])) for x in df['id_per']] | ||
df['id_sector'] = [list(set([y for y in x if pd.notna(y)])) for x in df['id_sector']] | ||
df['id_institution'] = [list(set([y for y in x if pd.notna(y)])) for x in df['id_institution']] | ||
|
||
deduplicated_data = df | ||
|
||
return deduplicated_data | ||
|
||
def post_to_api(self, df, api_post_endpoint): | ||
logger.info('Posting data to GO Operational Learning API') | ||
|
||
url = api_post_endpoint | ||
|
||
myobj = {} | ||
for i in range(0, len(df)): | ||
myobj[i] = {"learning": df['Excerpts'].iloc[i], | ||
"learning_validated": df['Excerpts'].iloc[i], | ||
"appeal_code": df['appeal_code'].iloc[i], | ||
"type": int(df['id_finding'].iloc[i]), | ||
"type_validated": int(df['id_finding'].iloc[i]), | ||
"sector": df['id_sector'].iloc[i], | ||
"sector_validated": df['id_sector'].iloc[i], | ||
"per_component": df['id_per'].iloc[i], | ||
"per_component_validated": df['id_per'].iloc[i], | ||
"organization": df['id_institution'].iloc[i], | ||
"organization_validated": df['id_institution'].iloc[i], | ||
"is_validated": False} | ||
|
||
# Define a retry decorator | ||
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, stop_max_attempt_number=5) | ||
def post_request(x): | ||
response = requests.post(url, json=myobj[x], headers=self.go_authorization_token) | ||
response.raise_for_status() # Raise HTTPError for bad responses | ||
return response | ||
|
||
for x in range(0, len(myobj)): | ||
try: | ||
response = post_request(x) | ||
logger.info("Response status: %s" % response.status_code) | ||
time.sleep(1) | ||
except requests.exceptions.HTTPError as errh: | ||
print(f"HTTP Error: {errh}") | ||
time.sleep(5) # Wait before retrying | ||
except requests.exceptions.RequestException as err: | ||
print(f"Request Exception: {err}") | ||
time.sleep(5) # Wait before retrying | ||
|
||
def handle(self, *args, **options): | ||
logger.info("Starting extracting tags for ops learnings") | ||
self.set_auth_token() | ||
|
||
# Step 0: Setup | ||
nltk.download('punkt') | ||
|
||
# Step 1: Fetch Data | ||
final_report, appeal, ops_learning = self.fetch_data('dref-final-report', 'appeal', 'ops-learning') | ||
filtered_data = self.filter_final_report(final_report, appeal, ops_learning, final_report_is_published=True, appeal_is_published=True, in_ops_learning=False) | ||
|
||
rows = 0 | ||
if filtered_data is not None: | ||
# Step 2: Data Preprocessing | ||
split_learnings = self.split_rows(filtered_data) | ||
|
||
if split_learnings is not None: | ||
# Step 3: Tagging | ||
tagged_data = self.tag_data(split_learnings,'PER - Component' , CLASSIFY_URL) | ||
|
||
# Step 4: Post Processing | ||
mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding = self.fetch_complementary_data('per-formcomponent', 'primarysector') | ||
formatted_data = self.format_data(tagged_data, mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding) | ||
deduplicated_data = self.manage_duplicates(formatted_data) | ||
|
||
# Step 5: Post to API Endpoint | ||
self.post_to_api(deduplicated_data, OPS_LEARNING_URL) | ||
|
||
rows = len(deduplicated_data) if deduplicated_data else 0 | ||
logger.info("%s new Operational Learnings ingested to GO API" % rows) | ||
|
||
if rows == 0: | ||
body = { | ||
"name": CRON_NAME, | ||
"message": ("No new learnings added. Done processing ops learnings.",), | ||
"num_result": rows, | ||
"status": CronJobStatus.WARNED | ||
} | ||
else: | ||
body = { | ||
"name": CRON_NAME, | ||
"message": ("Done processing ops learnings",), | ||
"num_result": rows, | ||
"status": CronJobStatus.SUCCESSFUL, | ||
} | ||
|
||
CronJob.sync_cron(body) |
Oops, something went wrong.