Skip to content

Commit

Permalink
Extracting tags for OpsLearning
Browse files Browse the repository at this point in the history
  • Loading branch information
szabozoltan69 committed Feb 27, 2024
1 parent 4820e44 commit 2adf926
Show file tree
Hide file tree
Showing 3 changed files with 899 additions and 131 deletions.
397 changes: 397 additions & 0 deletions api/management/commands/tag_ops_learnings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,397 @@
import requests
from django.core.management.base import BaseCommand
from api.logger import logger
import pandas as pd
import numpy as np
from retrying import retry
import time
import nltk
from nltk.tokenize import sent_tokenize
from api.models import CronJob, CronJobStatus
from rest_framework.authtoken.models import Token
from django.contrib.auth.models import User


CRON_NAME = "extract_tags_for_ops_learnings"
CLASSIFY_URL = "https://dreftagging.azurewebsites.net/classify"
GO_API_URL = "https://goadmin.ifrc.org/api/v2/"
OPS_LEARNING_URL = GO_API_URL + "ops-learning/"
LIMIT_200 = "/?limit=200/"


class Command(BaseCommand):
help = "Extracting tags for Operational Learnings"

def set_auth_token(self):
user = User.objects.filter(is_superuser=True).first()
api_key = Token.objects.filter(user=user)[0].key
self.go_authorization_token = {"Authorization" : "Token " + api_key}

def fetch_data(self, dref_final_report, appeal, ops_learning):

def fetchUrl(field):
return requests.get(field, headers=self.go_authorization_token).json()

def fetchField(field):
dict_field = []
temp_dict = requests.get(GO_API_URL + field + LIMIT_200, headers=self.go_authorization_token).json()
while temp_dict['next']:
dict_field.extend(temp_dict['results'])
temp_dict = fetchUrl(temp_dict['next'])
dict_field.extend(temp_dict['results'])
return pd.DataFrame.from_dict(dict_field)

# read dref final reports, to extract learnings in planned interventions
logger.info('Fetching DREF Final Reports from GO')
dref_final_report = fetchField(dref_final_report)

# read appeals to verify which drefs (appeals) are public and which drefs (appeals) are silent
logger.info('Fetching Appeals from GO')
appeals = fetchField(appeal)

# read ops learning to verify which drefs have already been processed
logger.info('Fetching Operational Learnings from GO')
ops_learning = fetchField(ops_learning)
ops_learning['appeal_code'] = [x['code'] for x in ops_learning['appeal']]

return dref_final_report, appeals, ops_learning

def filter_final_report(self, final_report, appeal, ops_learning, final_report_is_published=True, appeal_is_published=True, in_ops_learning=False):

if final_report_is_published:
logger.info('Filtering only DREF Final Reports that have been closed')
mask = [x for x in final_report['is_published']]
final_report = final_report[mask]

if appeal_is_published:
logger.info('Filtering only DREF Final Reports that are public')
mask = [x in list(appeal['code']) for x in final_report['appeal_code']]
final_report = final_report[mask]

if not in_ops_learning:
logger.info('Filtering only DREF Final Reports that have not been processed yet for operational learning')
list_new_reports = np.setdiff1d(final_report['appeal_code'].unique(),ops_learning['appeal_code'].unique())

# only reports that are not processed yet
mask = [x in list_new_reports for x in final_report['appeal_code']]
final_report = final_report[mask]

if final_report.empty:
logger.warning('There were not find any DREF Final Reports after the filtering process')
return None

else:
filtered_final_report = final_report[['appeal_code', 'planned_interventions']]
logger.info('There were found %s reports after the filtering process', str(len(filtered_final_report)))
return filtered_final_report

def split_rows(self, filtered_final_report):

def split_planned_interventions(df):
logger.info('Splitting DREF Final Reports per planned intervention')
df = df.explode(column='planned_interventions', ignore_index=True)

df['Sector'] = [x['title_display'] for x in df['planned_interventions']]
df['Lessons Learnt'] = [x['lessons_learnt'] for x in df['planned_interventions']]
df['Challenges'] = [x['challenges'] for x in df['planned_interventions']]

mask_1 = [pd.notna(x) for x in df['Lessons Learnt']]
mask_2 = [pd.notna(x) for x in df['Challenges']]
mask = [x or y for x, y in zip(mask_1, mask_2)]
df = df[mask]
df.drop(columns='planned_interventions', inplace=True)

df = df.melt(id_vars=['appeal_code', 'Sector'], value_vars=['Lessons Learnt', 'Challenges'], var_name='Finding', value_name='Excerpts')
df = df[pd.notna(df['Excerpts'])]
return df

def split_excerpts(df):
logger.info('Splitting unique learnings in each planned intervention')
df['Excerpts_ind'] = [sent_tokenize(x) for x in df['Excerpts']]
df = df.explode(column='Excerpts_ind', ignore_index=True)
df.drop(columns='Excerpts', inplace=True)

# remove strings that have less than 5 characters
df['Excerpts_ind'] = [np.nan if pd.notna(x) and len(x) < 5 else x for x in df['Excerpts_ind']]
df = df[pd.notna(df['Excerpts_ind'])]

# catching go format (bullet point)
df['Excerpts_ind'] = [x[2:] if x.startswith('•\t') else x for x in df['Excerpts_ind']]

# catching other formats
df['Excerpts_ind'] = [x[1:] if x.startswith(tuple(['-', '•', '▪', ' '])) else x for x in df['Excerpts_ind']]

df['Excerpts'] = [x.strip() for x in df['Excerpts_ind']]

df.drop(columns='Excerpts_ind', inplace=True)

return df

final_report_interventions = split_planned_interventions(filtered_final_report)
final_report_learnings = split_excerpts(final_report_interventions)

if final_report_interventions.empty:
logger.warning('There were not found any learnings on the DREF Final Reports planned interventions')
return None
else:
final_report_learnings = split_excerpts(final_report_interventions)
return final_report_learnings

def tag_data(self, df, tagging, tagging_api_endpoint):
logger.info('Tagging learnings with PER framework')

headers = {
'accept': 'application/json',
'Content-Type': 'application/json',
}

url = tagging_api_endpoint

df[tagging] = None
df.reset_index(inplace=True, drop=True)

for i in range(0, len(df)):
data = "\""+df['Excerpts'].iloc[i]+"\""
data = data.encode('utf-8')
response = requests.post(url, headers=headers, data=data)
if (response.status_code == 201) and len(response.json()[0]['tags']) > 0:
df.loc[i, tagging] = response.json()[0]['tags'][0]

df['Institution'] = np.empty(len(df))

for i in range(0, len(df)):
if (df['PER - Component'].iloc[i] == 'Activation of Regional and International Support'):
df.loc[i, 'Institution'] = 'Secretariat'
else:
df.loc[i, 'Institution'] = 'National Society'

tagged_data = df
return tagged_data

def fetch_complementary_data(self, per_formcomponent, primary_sector):
logger.info('Fetching complementary data on PER components ids, sectors ids, finding ids, organisations ids')

def fetchUrl(field):
return requests.get(field).json()

def fetchField(field):
dict_field = []
temp_dict = requests.get(GO_API_URL + field + LIMIT_200).json()
while temp_dict['next']:
dict_field.extend(temp_dict['results'])
temp_dict = fetchUrl(temp_dict['next'])
dict_field.extend(temp_dict['results'])
return pd.DataFrame.from_dict(dict_field)

per_formcomponent = fetchField(per_formcomponent)

go_sectors = fetchUrl(GO_API_URL + primary_sector)

dict_per = dict(zip(per_formcomponent['title'], per_formcomponent['id']))

dict_sector = {item['label']: item['key'] for item in go_sectors}

dict_finding = {
'Lessons Learnt': 1,
'Challenges': 2
}

dict_org = {
'Secretariat': 1,
'National Society': 2
}

mapping_per = {
"Activation of Regional and International Support": "Activation of regional and international support",
"Affected Population Selection": "Affected population selection",
"Business Continuity": "Business continuity",
"Cash and Voucher Assistance": "Cash Based Intervention (CBI)",
"Communications in Emergencies": "Communication in emergencies",
"Coordination with Authorities": "Coordination with authorities",
"Coordination with External Agencies and NGOs": "Coordination with External Agencies and NGOs",
"Coordination with Local Community Level Responders": "Coordination with local community level responders",
"Coordination with Movement": "Coordination with Movement",
"DRM Laws, Advocacy and Dissemination": "DRM Laws, Advocacy and Dissemination",
"Early Action Mechanisms": "Early Action Mechanisms",
"Emergency Needs Assessment and Planning": "Emergency Needs Assessment",
"Emergency Operations Centre (EOC)": "Emergency Operations Centre (EOC)",
"Emergency Response Procedures (SOP)": "Emergency Response Procedures (SOPs)",
"Finance and Admin. Policy and Emergency Procedures": "Finance and Admin policy and emergency procedures",
"Hazard, Context and Risk Analysis, Monitoring and Early Warning": "Hazard, Context and Risk Analysis, Monitoring and Early Warning",
"Information and Communication Technology (ICT)": "Information and Communication Technology (ICT)",
"Information Management": "Information Management (IM)",
"Logistics - Logistics Management": "LOGISTICS MANAGEMENT",
"Logistics - Procurement": "PROCUREMENT",
"Logistics - Warehouse and Stock Management": "WAREHOUSE AND STOCK MANAGEMENT",
"Mapping of NS Capacities": "Mapping of NS capacities",
"NS Specific Areas of Intervention": "NS-specific areas of intervention",
"Operations Monitoring, Evaluation, Reporting and Learning": "Operations Monitoring, Evaluation, Reporting and Learning",
"Pre-Disaster Meetings and Agreements": "Pre-disaster meetings and agreements",
"Preparedness Plans and Budgets": "Preparedness plans and budgets",
"Quality and Accountability": "Quality and accountability",
"RC Auxiliary Role, Mandate and Law": "RC auxiliary role, Mandate and Law",
"Resources Mobilisation": "Resource Mobilisation",
"Response and Recovery Planning": "Response and recovery planning",
"Risk Management": "Risk management",
"Safety and Security Management": "Safety and security management",
"Staff and Volunteer Management": "Staff and volunteer management",
"Testing and Learning": "Testing and Learning",
"Cooperation with Private Sector": "Cooperation with private sector",
"Disaster Risk Management Strategy": "DRM Strategy",
"Logistics - Supply Chain Management": "SUPPLY CHAIN MANAGEMENT",
"Logistics - Transportation Management": "FLEET AND TRANSPORTATION MANAGEMENT",
"Scenario Planning": "Scenario planning",
"Civil Military Relations": "Civil Military Relations",
"Disaster Risk Management Policy": "DRM Policy",
"information and Communication Technology (ICT)": "Information and Communication Technology (ICT)",
"Coordination with local community level responders": "Coordination with local community level responders",
"Emergency Response Procedures (SOPs)": "Emergency Response Procedures (SOPs)",
"Logistics - Transport": "FLEET AND TRANSPORTATION MANAGEMENT",
"Unknown": None,
"Business continuity": "Business continuity",
"emergency Response Procedures (SOP)": "Emergency Response Procedures (SOPs)",
"National Society Specific Areas of intervention": "NS-specific areas of intervention"
}

mapping_sector = {
"Strategies for implementation": None, # No direct match found # no sector(?)
"Disaster Risk Reduction and Climate Action": "DRR",
"Health": "Health (public)",
"Livelihoods and Basic Needs": "Livelihoods and basic needs",
"Migration and Displacement": "Migration",
"Protection, Gender and Inclusion": "PGI",
"Shelter and Settlements": "Shelter",
"Water Sanitation and Hygiene": "WASH",
"Secretariat Services": None, # No direct match found # need to bring it out as IFRC learning
"National Society Strengthening": "NS Strengthening",
"Water, Sanitation And Hygiene": "WASH",
"Protection, Gender And Inclusion": "PGI",
"Shelter Housing And Settlements": "Shelter",
"Livelihoods And Basic Needs": "Livelihoods and basic needs",
"Community Engagement And Accountability": "CEA",
"Multi-purpose Cash": "Livelihoods and basic needs", # No direct match found
"Risk Reduction, Climate Adaptation And Recovery": "DRR",
"Migration": "Migration",
"Education": "Education",
"Shelter and Basic Household Items": "Shelter",
"Multi Purpose Cash": "Livelihoods and basic needs",
"Environmental Sustainability": None,
"Migration And Displacement": "Migration",
"Coordination And Partnerships":"NS Strengthening",
}

return mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding

def format_data(self, df, mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding):
logger.info('Formatting data to upload to GO Operational Learning Table')

df.loc[:, 'mapped_per'] = [mapping_per[x] if pd.notna(x) else None for x in df['PER - Component']]
df.loc[:, 'id_per'] = [dict_per[x] if pd.notna(x) else None for x in df['mapped_per']]
df.loc[:, 'mapped_sector'] = [mapping_sector[x] if pd.notna(x) else None for x in df['Sector']]
df.loc[:, 'id_sector'] = [dict_sector[x] if pd.notna(x) else None for x in df['mapped_sector']]
df.loc[:, 'id_institution'] = [dict_org[x] for x in df['Institution']]
df.loc[:, 'id_finding'] = [dict_finding[x] for x in df['Finding']]

formatted_data = df[['appeal_code', 'Excerpts', 'id_per', 'id_sector', 'id_institution', 'id_finding']]

return formatted_data

def manage_duplicates(self, df):

df = df.groupby(['appeal_code', 'Excerpts', 'id_finding'], as_index=False).agg(list).reset_index()
df.drop(columns=['index'], inplace=True)

df['id_per'] = [list(set([y for y in x if pd.notna(y)])) for x in df['id_per']]
df['id_sector'] = [list(set([y for y in x if pd.notna(y)])) for x in df['id_sector']]
df['id_institution'] = [list(set([y for y in x if pd.notna(y)])) for x in df['id_institution']]

deduplicated_data = df

return deduplicated_data

def post_to_api(self, df, api_post_endpoint):
logger.info('Posting data to GO Operational Learning API')

url = api_post_endpoint

myobj = {}
for i in range(0, len(df)):
myobj[i] = {"learning": df['Excerpts'].iloc[i],
"learning_validated": df['Excerpts'].iloc[i],
"appeal_code": df['appeal_code'].iloc[i],
"type": int(df['id_finding'].iloc[i]),
"type_validated": int(df['id_finding'].iloc[i]),
"sector": df['id_sector'].iloc[i],
"sector_validated": df['id_sector'].iloc[i],
"per_component": df['id_per'].iloc[i],
"per_component_validated": df['id_per'].iloc[i],
"organization": df['id_institution'].iloc[i],
"organization_validated": df['id_institution'].iloc[i],
"is_validated": False}

# Define a retry decorator
@retry(wait_exponential_multiplier=1000, wait_exponential_max=10000, stop_max_attempt_number=5)
def post_request(x):
response = requests.post(url, json=myobj[x], headers=self.go_authorization_token)
response.raise_for_status() # Raise HTTPError for bad responses
return response

for x in range(0, len(myobj)):
try:
response = post_request(x)
logger.info("Response status: %s" % response.status_code)
time.sleep(1)
except requests.exceptions.HTTPError as errh:
print(f"HTTP Error: {errh}")
time.sleep(5) # Wait before retrying
except requests.exceptions.RequestException as err:
print(f"Request Exception: {err}")
time.sleep(5) # Wait before retrying

def handle(self, *args, **options):
logger.info("Starting extracting tags for ops learnings")
self.set_auth_token()

# Step 0: Setup
nltk.download('punkt')

# Step 1: Fetch Data
final_report, appeal, ops_learning = self.fetch_data('dref-final-report', 'appeal', 'ops-learning')
filtered_data = self.filter_final_report(final_report, appeal, ops_learning, final_report_is_published=True, appeal_is_published=True, in_ops_learning=False)

rows = 0
if filtered_data is not None:
# Step 2: Data Preprocessing
split_learnings = self.split_rows(filtered_data)

if split_learnings is not None:
# Step 3: Tagging
tagged_data = self.tag_data(split_learnings,'PER - Component' , CLASSIFY_URL)

# Step 4: Post Processing
mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding = self.fetch_complementary_data('per-formcomponent', 'primarysector')
formatted_data = self.format_data(tagged_data, mapping_per, dict_per, mapping_sector, dict_sector, dict_org, dict_finding)
deduplicated_data = self.manage_duplicates(formatted_data)

# Step 5: Post to API Endpoint
self.post_to_api(deduplicated_data, OPS_LEARNING_URL)

rows = len(deduplicated_data) if deduplicated_data else 0
logger.info("%s new Operational Learnings ingested to GO API" % rows)

if rows == 0:
body = {
"name": CRON_NAME,
"message": ("No new learnings added. Done processing ops learnings.",),
"num_result": rows,
"status": CronJobStatus.WARNED
}
else:
body = {
"name": CRON_NAME,
"message": ("Done processing ops learnings",),
"num_result": rows,
"status": CronJobStatus.SUCCESSFUL,
}

CronJob.sync_cron(body)
Loading

0 comments on commit 2adf926

Please sign in to comment.