Skip to content

Commit

Permalink
Merge pull request #11 from klebgenomics/develop
Browse files Browse the repository at this point in the history
Bump to v1.3.0
  • Loading branch information
tomdstanton authored Nov 5, 2024
2 parents aa7a840 + 0122b62 commit ee7deb0
Show file tree
Hide file tree
Showing 32 changed files with 644 additions and 1,091 deletions.
100 changes: 0 additions & 100 deletions INSTALL.md

This file was deleted.

272 changes: 70 additions & 202 deletions README.md

Large diffs are not rendered by default.

37 changes: 19 additions & 18 deletions __init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@
import os
import json
import multiprocessing
import sys
from datetime import datetime
from collections import OrderedDict


Config = configparser.ConfigParser()
Config.read('applications/kaptive/settings.ini')
Config.read('applications/kaptive_web/settings.ini')
queue_path = Config.get('Path', 'queue_path')
upload_path = Config.get('Path', 'upload_path')

Expand All @@ -17,9 +17,9 @@ def save_json_to_file(f, json_string):
try:
with open(f, 'wt+') as file:
json.dump(json_string, file, indent=4)
print("Wrote to file: " + f)
print("Wrote to file: " + f, file=sys.stderr)
except (IOError, OSError) as e:
print("Error writing file: " + f + "; " + str(e))
print("Error writing file: " + f + "; " + str(e), file=sys.stderr)


# Read JSON Object from a file
Expand All @@ -29,13 +29,13 @@ def read_json_from_file(f):
with open(f, 'r') as file:
data = json.load(file, object_pairs_hook=OrderedDict)
except ValueError as e:
print("Error parsing file " + f + ": " + str(e))
print("Error parsing file " + f + ": " + str(e), file=sys.stderr)
if not f.endswith('.bak'):
os.remove(f)
data = read_json_from_file((f + '.bak'))
save_json_to_file(f, data)
except (IOError, OSError) as e:
print("Error reading file " + f + ": " + str(e))
print("Error reading file " + f + ": " + str(e), file=sys.stderr)
if not f.endswith('.bak'):
os.remove(f)
data = read_json_from_file((f + '.bak'))
Expand All @@ -45,7 +45,8 @@ def read_json_from_file(f):

job_queue_path = os.path.join(queue_path, 'queue')
available_worker = multiprocessing.cpu_count() - 1
if os.path.exists(job_queue_path): # and os.path.getsize(job_queue_path) > 2: catches empty queue (i.e. if file contains {})
if os.path.exists(
job_queue_path): # and os.path.getsize(job_queue_path) > 2: catches empty queue (i.e. if file contains {})
data = OrderedDict() # read_json_from_file returns an OrderedDict even if empty, no need to declare here.
# Put the jobs in processing back to the job queue
data = read_json_from_file(job_queue_path)
Expand All @@ -58,7 +59,7 @@ def read_json_from_file(f):
data['Available worker'] = available_worker
data['Last update (worker)'] = str(datetime.now().strftime('%d %b %Y %H:%M:%S'))
save_json_to_file(job_queue_path, data)
print("Queue file updated.")
print("Queue file updated.", file=sys.stderr)

for i in data['Job queue']:
job_list_path = os.path.join(upload_path, i[0], 'job_list.json')
Expand All @@ -75,16 +76,16 @@ def read_json_from_file(f):
job_name = j['Fasta file']
job_seq = j['Job seq']
save_json_to_file(job_list_path, job_data)
print("Fixed coruppted data in job list.")
print("Fixed corrupted data in job list.", file=sys.stderr)
break
else:
data = OrderedDict()
data['Job queue'] = []
data['Processing queue'] = []
data['Last update (queue)'] = str(datetime.now().strftime('%d %b %Y %H:%M:%S'))
data['Total worker'] = available_worker
data['Available worker'] = available_worker
data['Last update (worker)'] = str(datetime.now().strftime('%d %b %Y %H:%M:%S'))
data = OrderedDict({
'Job queue': [],
'Processing queue': [],
'Last update (queue)': str(datetime.now().strftime('%d %b %Y %H:%M:%S')),
'Total worker': available_worker,
'Available worker': available_worker,
'Last update (worker)': str(datetime.now().strftime('%d %b %Y %H:%M:%S'))
})
save_json_to_file(job_queue_path, data)
print("Queue file created.")

print("Queue file created.", file=sys.stderr)
97 changes: 44 additions & 53 deletions controllers/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import time
from collections import OrderedDict
from datetime import datetime
from pathlib import Path

queue_lock = threading.Lock()
job_list_lock = threading.Lock()
Expand All @@ -14,7 +15,7 @@

# Read config file
Config = configparser.ConfigParser()
Config.read('applications/kaptive/settings.ini')
Config.read('applications/kaptive_web/settings.ini')
base_path = Config.get('Path', 'base_path')
reference_database_path = Config.get('Path', 'reference_database_path')
upload_path = Config.get('Path', 'upload_path')
Expand All @@ -27,67 +28,45 @@
job_queue_path = os.path.join(queue_path, 'queue')

# Setup logger
logger = logging.getLogger("kaptive")
logger = logging.getLogger('kaptive_web') # Setup a new logger
logger.propagate = False # Prevents multi printing, see: https://stackoverflow.com/questions/6729268/log-messages-appearing-twice-with-python-logging
logger.setLevel(logging.DEBUG)

# Get number of tblastn or blastn running (for debug purpose only)
procs = subprocess.check_output(['ps', 'uaxw']).decode('utf-8').splitlines()
blast_procs = [proc for proc in procs if 'blast' in proc]
blast_count = len(blast_procs)
if blast_count > 0:
logger.debug(' Blast found: ' + str(blast_count))
if not logger.handlers: # Prevents multi printing, see: https://stackoverflow.com/questions/6729268/log-messages-appearing-twice-with-python-logging
logger.addHandler(handler := logging.StreamHandler())
handler.setFormatter(
logging.Formatter(
"%(asctime)s %(levelname)s [%(filename)s.%(funcName)s:%(lineno)d] %(message)s",
"%Y-%m-%d %H:%M:%S"
)
)


def index():
return dict()


def jobs():
import uuid
import fnmatch
import re
import uuid

if request.vars.message is not None:
response.flash = request.vars.message

# Generate an UUID for each job submission
session.uuid = str(uuid.uuid4())

# -------------------------------------------------------------------------
# Get a list of reference database files, order by file name but with all
# Klebsiella databases listed first.
# - Copy the database files to this folder.
# - Name the first one (default) 1-xxxx, second one 2-xxxx, so on so forth.
# -------------------------------------------------------------------------
filelist_klebsiella = dict()
filelist_other = dict()
logger.debug(f'[{session.uuid}] Reference database file found:')
for f in sorted(os.listdir(reference_database_path)):
if os.path.isfile(os.path.join(reference_database_path, f)) and fnmatch.fnmatch(f, '*.gbk'):
fname = re.sub('\.gbk$', '', f)
fname = re.sub('_', ' ', fname)
fname = re.sub('\d-', '', fname)
fname = fname.replace(' k ', ' K ').replace(' o ', ' O ')
logger.debug(f'[{session.uuid}] Database Name: ' + f)
if 'klebsiella' in fname.lower():
filelist_klebsiella[f] = fname
else:
filelist_other[f] = fname
# Create sorted list of tuples with all Klebsiella databases preceeding databases of other genus
filelist_klebsiella_sorted = sorted(filelist_klebsiella.items(), key=lambda k: k[0])
filelist_other_sorted = sorted(filelist_other.items(), key=lambda k: k[0])
filelist_sorted = filelist_klebsiella_sorted + filelist_other_sorted
# Merge filelist dicts as this is used below and later
filelist = filelist_klebsiella.copy()
filelist.update(filelist_other)
filelist = {
str(i): i.stem.replace('_', ' ') for i in Path(reference_database_path).glob('*.gbk')
}

if len(filelist) == 0:
logger.error(f'[{session.uuid}] No reference database file found.')
response.flash = 'Internal error. No reference database file found. Please contact us.'

# Create the form
fields = [Field('job_name', label=T('Job name (optional)')),
Field('assembly','upload', requires=[IS_NOT_EMPTY()], label=T('Assembly file*'), custom_store=upload_file),
Field('reference', requires=IS_IN_SET(filelist_sorted, zero=None), label=T('Reference database'))
Field('reference', requires=IS_IN_SET(filelist, zero=None), label=T('Reference database'))
]
if captcha:
fields.append(captcha_field()) # Google reCaptcha v2
Expand All @@ -104,19 +83,27 @@ def jobs():
compression = get_compression_type(file_path)
if compression == 'zip':
process_zip_file(file_dir, file_path)
logger.debug(f'[{session.uuid}] Zip file uploaded: ' + request.vars.assembly.filename)
logger.debug(f'[{session.uuid}] Zip file uploaded: {request.vars.assembly.filename}')
elif compression == 'gz':
process_gz_file(file_dir, request.vars.assembly.filename)
logger.debug(f'[{session.uuid}] GZip file uploaded: ' + request.vars.assembly.filename)
logger.debug(f'[{session.uuid}] GZip file uploaded: {request.vars.assembly.filename}')

# Get a list of fasta files
fastalist = [f for f in os.listdir(os.path.join(upload_path, session.uuid))
if os.path.isfile(os.path.join(upload_path, session.uuid, f))]
fastafiles = []

allowed_characters = ' a-zA-Z0-9_.-'
fastafiles_invalid = []

no_of_fastas = 0
for f in fastalist:
if is_file_fasta(os.path.join(upload_path, session.uuid, f)):

# Validate inputs
if re.search(fr'[^{allowed_characters}]', f):
fastafiles_invalid.append(f)

# Spaces and hashes cause problems, so rename files to be spaceless and hashless, if needed.
if ' ' in f:
new_f = f.replace(' ', '_')
Expand All @@ -139,18 +126,22 @@ def jobs():
if no_of_fastas == 0:
logger.error(f'[{session.uuid}] No fasta file found in uploaded file.')
redirect(URL(r=request, f='jobs', vars=dict(message=T("No fasta file was found in the uploaded file."))))
fastafiles_string = ', '.join(fastafiles)
if fastafiles_invalid:
fastafiles_invalid_str = ', '.join(fastafiles_invalid)
error_msg = f'Input file contains invalid characters: {fastafiles_invalid_str}. Please include only {allowed_characters}'
logger.error(f'[{session.uuid}] {error_msg}')
redirect(URL(r=request, f='jobs', vars=dict(message=T(error_msg))))

logger.debug(f'[{session.uuid}] Selected reference database: ' + request.vars.reference)

# Save job details to a JSON file
build_meta_json(session.uuid, request.vars.job_name, fastafiles_string, no_of_fastas,
filelist.get(request.vars.reference, None), submit_time)
build_meta_json(session.uuid, request.vars.job_name, fastafiles, filelist[request.vars.reference], submit_time)

# Create empty result file
create_table_file(request.vars.reference)
create_table_file()

# Build job list
build_job_dict(session.uuid, request.vars.reference, submit_time, fastafiles, no_of_fastas, upload_path)
build_job_dict(session.uuid, request.vars.reference, submit_time, fastafiles, upload_path)

# Add job to job queue
add_job_to_queue(job_queue_path, session.uuid, no_of_fastas)
Expand Down Expand Up @@ -205,7 +196,7 @@ def confirmation():

if os.path.exists(result_json_path) and (succeeded_jobs + failed_jobs == total_jobs): # If job finished
content = ''
result_data = read_json_from_file(result_json_path)
result_data = read_json_from_file(result_json_path, json_lines=True)
result_status = 1
elif pending_jobs == 0 and running_jobs == 0:
content = ''
Expand All @@ -217,11 +208,11 @@ def confirmation():
logger.debug(f'[{session.uuid}] No available worker. Job is in the queue.')
result_status = 2
else:
content = 'Processing your job, it usually takes ~1 minute for each assembly file to complete. ' \
content = 'Processing your job, it usually takes < a few seconds for each assembly file to complete. ' \
'This page will refresh every ' + str(refresh_time / 1000) + \
' seconds until the process is completed. Please do not close this page or start a new job.'
if os.path.exists(result_json_path):
result_data = read_json_from_file(result_json_path)
result_data = read_json_from_file(result_json_path, json_lines=True)
else:
logger.debug(f'[{session.uuid}] Cannot find final result JSON file.')
result_status = 2
Expand Down Expand Up @@ -296,16 +287,16 @@ def download():
@cache.action()
def get_svg():
uuid = request.args(0)
assemble_name = request.args(1)
path = os.path.join(upload_path, uuid, 'locus_image', assemble_name + '.svg')
assembly_name = request.args(1)
path = os.path.join(upload_path, uuid, 'locus_image', assembly_name + '.svg')
return response.stream(path)


@cache.action()
def get_png():
uuid = request.args(0)
assemble_name = request.args(1)
path = os.path.join(upload_path, uuid, 'locus_image', assemble_name + '.png')
assembly_name = request.args(1)
path = os.path.join(upload_path, uuid, 'locus_image', assembly_name + '.png')
if os.path.exists(path):
return response.stream(path)
else:
Expand Down
Binary file removed extras/Example_broken_assembly.png
Binary file not shown.
Binary file removed extras/Example_close_match.png
Binary file not shown.
Binary file removed extras/Example_more_distant_match.png
Binary file not shown.
Binary file removed extras/Example_novel_locus.png
Binary file not shown.
Binary file removed extras/Example_novel_variant.png
Binary file not shown.
Binary file removed extras/Example_variant_database_run.png
Binary file not shown.
Loading

0 comments on commit ee7deb0

Please sign in to comment.