Skip to content

Commit

Permalink
Bug/WI-67: Logging and forcing tapis token retrieval on every request (
Browse files Browse the repository at this point in the history
…#7)

* Tapis token for cached files;logging improvements
* Address review comments
* Remove build file
  • Loading branch information
chandra-tacc authored Nov 3, 2023
1 parent 3689ac1 commit 3cc684f
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
name: Build on push to auth-check
name: Build on push to experiment
on:
push:
branches: [ auth-check ]
branches: [ experiment ]

jobs:
build_commit:
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ flask==2.0.3
gunicorn==20.1.0
requests==2.27.0
pandas==1.3.5
retrying==1.3.4
Werkzeug==2.0.3
47 changes: 38 additions & 9 deletions src/app.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from datetime import datetime
from flask import Flask, jsonify, request
import os
import pandas as pd
import csv
import logging

# from data_processing import *
from data_loading import *


# ----------------------------------------------------------------------------
# DATA PARAMETERS
# ----------------------------------------------------------------------------
Expand Down Expand Up @@ -118,6 +121,25 @@

app = Flask(__name__)
app.debug = True
gunicorn_logger = logging.getLogger('gunicorn.error')
app.logger = logging.getLogger("datastore_app")
app.logger.handlers = gunicorn_logger.handlers
app.logger.setLevel(logging.DEBUG)

logger = logging.getLogger('werkzeug')
logger.addHandler = gunicorn_logger.handlers
logger.setLevel(logging.DEBUG)

@app.before_request
def before_request_log():
app.logger.debug(f"{request.remote_addr} \"{request.method} {request.url}\"")

@app.after_request
def after_request_log(response):
app.logger.debug(f"{request.remote_addr} \"{request.method} {request.url}\" {response.status_code}")
return response



# APIS: try to load new data, if doesn't work, get most recent
@app.route("/api/apis")
Expand All @@ -130,15 +152,17 @@ def api_imaging():
global api_data_index
global api_data_cache
try:
tapis_token = get_tapis_token(request)
if not api_data_index['imaging'] or not check_data_current(datetime.strptime(api_data_index['imaging'], datetime_format)):
api_date = datetime.now().strftime(datetime_format)
imaging_data = get_api_imaging_data(request)
imaging_data = get_api_imaging_data(tapis_token)
if imaging_data:
app.logger.info(f"Caching imaging report data. Date: {api_date}")
api_data_cache['imaging'] = imaging_data
api_data_index['imaging'] = api_date
return jsonify({'date': api_data_index['imaging'], 'data': api_data_cache['imaging']})
except Exception as e:
traceback.print_exc()
app.logger.error(("Error in imaging API request: {0}").format(str(e)))
return jsonify('error: {}'.format(e))

@app.route("/api/consort")
Expand All @@ -147,10 +171,12 @@ def api_consort():
global api_data_index
global api_data_cache
# try:
tapis_token = get_tapis_token(request)
if not api_data_index['consort'] or not check_data_current(datetime.strptime(api_data_index['consort'], datetime_format)):
api_date = datetime.now().strftime(datetime_format)
consort_data_json = get_api_consort_data(request)
consort_data_json = get_api_consort_data(tapis_token)
if consort_data_json:
app.logger.info(f"Caching consort report data. Date: {api_date}")
api_data_cache['consort'] = consort_data_json
api_data_index['consort'] = api_date
return jsonify({'date': api_data_index['consort'], 'data': api_data_cache['consort']})
Expand All @@ -165,10 +191,12 @@ def api_blood():
global api_data_index
global api_data_cache
try:
tapis_token = get_tapis_token(request)
if not api_data_index['blood'] or not check_data_current(datetime.strptime(api_data_index['blood'], datetime_format)):
api_date = datetime.now().strftime(datetime_format)
blood_data, blood_data_request_status = get_api_blood_data(request)
blood_data, blood_data_request_status = get_api_blood_data(tapis_token)
if blood_data:
app.logger.info(f"Caching blood api response data. Date: {api_date}")
api_data_index['blood'] = api_date
api_data_cache['blood'] = blood_data

Expand All @@ -180,7 +208,7 @@ def api_blood():

return jsonify({'date': api_data_index['blood'], 'data': api_data_cache['blood']})
except Exception as e:
traceback.print_exc()
app.logger.error(("Error in blood API request: {0}").format(str(e)))
return jsonify('error: {}'.format(e))


Expand All @@ -192,20 +220,22 @@ def api_subjects():
global subjects_raw_cols_for_reports

try:
tapis_token = get_tapis_token(request)
if not api_data_index['subjects'] or not check_data_current(datetime.strptime(api_data_index['subjects'], datetime_format)):
api_date = datetime.now().strftime(datetime_format)
latest_subjects_json = get_api_subjects_json(request)
latest_subjects_json = get_api_subjects_json(tapis_token)
if latest_subjects_json:
# latest_data = create_clean_subjects(latest_subjects_json, screening_sites, display_terms_dict, display_terms_dict_multi)
latest_data = process_subjects_data(latest_subjects_json,subjects_raw_cols_for_reports,screening_sites, display_terms_dict, display_terms_dict_multi)

app.logger.info(f"Caching subjects api response data. Date: {api_date}")
api_data_cache['subjects'] = latest_data
api_data_index['subjects'] = api_date

return jsonify({'date': api_data_index['subjects'], 'data': api_data_cache['subjects']})
except Exception as e:
traceback.print_exc()
app.logger.error(("Error in subjects API request: {0}").format(str(e)))
return jsonify('error: {}'.format(e))

def api_tester():

global local_subjects_data
Expand Down Expand Up @@ -236,6 +266,5 @@ def api_simple():
else:
return jsonify('not found')


if __name__ == "__main__":
app.run(host='0.0.0.0')
81 changes: 44 additions & 37 deletions src/data_loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@

import datetime
from datetime import datetime
from retrying import retry

import logging
logger = logging.getLogger(__name__)
files_api_root = os.environ.get('FILES_API_ROOT')
portal_api_root = os.environ.get('PORTAL_API_ROOT')
logger = logging.getLogger("datastore_app")


# ----------------------------------------------------------------------------
# Updating data checks
Expand Down Expand Up @@ -198,12 +200,11 @@ def get_local_subjects_raw(data_directory):
# LOAD DATA FROM API
# ----------------------------------------------------------------------------

def get_api_consort_data(api_request,
def get_api_consort_data(tapis_token,
report='consort',
report_suffix = 'consort-data-[mcc]-latest.csv'):
'''Load data for a specified consort file. Handle 500 server errors'''
try:
tapis_token = get_tapis_token(api_request)

if tapis_token:
cosort_columns = ['source','target','value', 'mcc']
Expand All @@ -219,7 +220,7 @@ def get_api_consort_data(api_request,
for mcc in mcc_list:
filename = report_suffix.replace('[mcc]',str(mcc))
csv_url = '/'.join([files_api_root, report, filename])
csv_request = requests.get(csv_url, headers={'X-Tapis-Token': tapis_token})
csv_request = make_report_data_request(csv_url, tapis_token)
csv_content = csv_request.content
try:
csv_df = pd.read_csv(io.StringIO(csv_content.decode('utf-8')), usecols=[0,1,2], header=None)
Expand All @@ -239,7 +240,7 @@ def get_api_consort_data(api_request,
return consort_data_json

else:
logger.warning("Unauthorized attempt to access Consort data")
logger.exception("Unauthorized attempt to access Consort data")
return None

except Exception as e:
Expand All @@ -250,11 +251,9 @@ def get_api_consort_data(api_request,

## Function to rebuild dataset from apis

def get_api_imaging_data(api_request):
def get_api_imaging_data(tapis_token):
''' Load data from imaging api. Return bad status notice if hits Tapis API'''
try:
tapis_token = get_tapis_token(api_request)

try:
if tapis_token:
api_dict = {
'subjects':{'subjects1': 'subjects-1-latest.json','subjects2': 'subjects-2-latest.json'},
Expand All @@ -264,15 +263,15 @@ def get_api_imaging_data(api_request):

# IMAGING
imaging_filepath = '/'.join([files_api_root,'imaging',api_dict['imaging']['imaging']])
imaging_request = requests.get(imaging_filepath, headers={'X-Tapis-Token': tapis_token})
imaging_request = make_report_data_request(imaging_filepath, tapis_token)
if imaging_request.status_code == 200:
imaging = pd.read_csv(io.StringIO(imaging_request.content.decode('utf-8')))
else:
return {'status':'500', 'source': api_dict['imaging']['imaging']}


qc_filepath = '/'.join([files_api_root,'imaging',api_dict['imaging']['qc']])
qc_request = requests.get(qc_filepath, headers={'X-Tapis-Token': tapis_token})
qc_request = make_report_data_request(qc_filepath, tapis_token)
if qc_request.status_code == 200:
qc = pd.read_csv(io.StringIO(qc_request.content.decode('utf-8')))
else:
Expand All @@ -286,7 +285,7 @@ def get_api_imaging_data(api_request):

return imaging_data_json
else:
logger.warning("Unauthorized attempt to access Imaging data")
logger.exception("Unauthorized attempt to access Imaging data")
return None

except Exception as e:
Expand All @@ -295,12 +294,10 @@ def get_api_imaging_data(api_request):


## Function to rebuild dataset from apis
def get_api_blood_data(api_request):
def get_api_blood_data(tapis_token):
''' Load blood data from api'''
try:
current_datetime = datetime.now()
tapis_token = get_tapis_token(api_request)

if tapis_token:
api_dict = {
'subjects':{'subjects1': 'subjects-1-latest.json','subjects2': 'subjects-2-latest.json'},
Expand All @@ -310,10 +307,10 @@ def get_api_blood_data(api_request):

# BLOOD
blood1_filepath = '/'.join([files_api_root,'blood',api_dict['blood']['blood1']])
blood1_request = requests.get(blood1_filepath, headers={'X-Tapis-Token': tapis_token})
blood1_request = make_report_data_request(blood1_filepath, tapis_token)

blood2_filepath = '/'.join([files_api_root,'blood',api_dict['blood']['blood2']])
blood2_request = requests.get(blood2_filepath, headers={'X-Tapis-Token': tapis_token})
blood2_request = make_report_data_request(blood2_filepath, tapis_token)

if blood1_request.status_code == 200:
blood1 = blood1_request.json()
Expand Down Expand Up @@ -345,7 +342,7 @@ def get_api_blood_data(api_request):

return blood_data_json, request_status
else:
logger.warning("Unauthorized attempt to access Blood data")
logger.exception("Unauthorized attempt to access Blood data")
return None

except Exception as e:
Expand All @@ -354,23 +351,21 @@ def get_api_blood_data(api_request):



def get_api_subjects_json(api_request):
def get_api_subjects_json(tapis_token):
''' Load subjects data from api. Note data needs to be cleaned, etc. to create properly formatted data product'''
try:
tapis_token = get_tapis_token(api_request)

try:
if tapis_token:
# Load Json Data
subjects1_filepath = '/'.join([files_api_root,'subjects','subjects-1-latest.json'])
subjects1_request = requests.get(subjects1_filepath, headers={'X-Tapis-Token': tapis_token})
subjects1_request = make_report_data_request(subjects1_filepath, tapis_token)
if subjects1_request.status_code == 200:
subjects1 = subjects1_request.json()
else:
return None
# return {'status':'500', 'source': api_dict['subjects']['subjects1']}

subjects2_filepath = '/'.join([files_api_root,'subjects','subjects-2-latest.json'])
subjects2_request = requests.get(subjects2_filepath, headers={'X-Tapis-Token': tapis_token})
subjects2_request = make_report_data_request(subjects2_filepath, tapis_token)
if subjects2_request.status_code == 200:
subjects2 = subjects2_request.json()
else:
Expand All @@ -382,26 +377,38 @@ def get_api_subjects_json(api_request):

return subjects_json
else:
logger.warning("Unauthorized attempt to access Subjects data")
logger.exception("Unauthorized attempt to access Subjects data")
return None

except Exception as e:
traceback.print_exc()
return None

# Retry handler for requests
@retry(wait_exponential_multiplier=500, wait_exponential_max=5000, stop_max_attempt_number=3)
def make_request_with_retry(url, cookies):
'''Use exponential retry with requests.'''
return requests.get(url, cookies=cookies)

# Get Tapis token if authorized to access data files
def get_tapis_token(api_request):
try:
response = requests.get(portal_api_root + '/auth/tapis/', cookies=api_request.cookies)
#headers={'cookie':'coresessionid=' + api_request.cookies.get('coresessionid')})
if response:
tapis_token = response.json()['token']
return tapis_token
else:
logger.warning("Unauthorized to access tapis token")
raise Exception
except Exception as e:
logger.warning('portal api error: {}'.format(e))
return False
'''Get tapis token using the session cookie. If the session is not authenticated, this will fail.'''
session_id = api_request.cookies.get("coresessionid")
if session_id is None:
raise Exception("Missing session id")
cookies = {'coresessionid':session_id}
response = make_request_with_retry(portal_api_root + '/auth/tapis/', cookies)

response.raise_for_status()
tapis_token = response.json()['token']
logger.info("Received tapis token.")
return tapis_token

def make_report_data_request(url, tapis_token):
logger.info(f"Sending request to {url}")
response = requests.get(url, headers={'X-Tapis-Token': tapis_token})
logger.info(f'Response status code: {response.status_code}')
return response

# ----------------------------------------------------------------------------
# PROCESS SUBJECTS DATA
Expand Down

0 comments on commit 3cc684f

Please sign in to comment.