-
Notifications
You must be signed in to change notification settings - Fork 4
/
tasks.py
executable file
·156 lines (132 loc) · 5.31 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
"""
Asynchronous tasks that update data in Open Humans.
These tasks:
1. delete any current files in OH if they match the planned upload filename
2. adds a data file
"""
import logging
import os
import json
import shutil
import tempfile
from django.utils import lorem_ipsum
import textwrap
import requests
from celery import shared_task
from django.conf import settings
from open_humans.models import OpenHumansMember
from datetime import datetime
from demotemplate.settings import rr
# Set up logging.
logger = logging.getLogger(__name__)
@shared_task
def xfer_to_open_humans(oh_id, num_submit=0, logger=None, **kwargs):
"""
Transfer data to Open Humans.
num_submit is an optional parameter in case you want to resubmit failed
tasks (see comments in code).
"""
print('Trying to copy data for {} to Open Humans'.format(oh_id))
oh_member = OpenHumansMember.objects.get(oh_id=oh_id)
# Make a tempdir for all temporary files.
# Delete this even if an exception occurs.
tempdir = tempfile.mkdtemp()
try:
add_data_to_open_humans(oh_member, tempdir)
finally:
shutil.rmtree(tempdir)
# Note: Want to re-run tasks in case of a failure?
# You can resubmit a task by calling it again. (Be careful with recursion!)
# e.g. to give up, resubmit, & try again after 10s if less than 5 attempts:
# if num_submit < 5:
# num_submit += 1
# xfer_to_open_humans.apply_async(
# args=[oh_id, num_submit], kwargs=kwargs, countdown=10)
# return
def add_data_to_open_humans(oh_member, tempdir):
"""
Add demonstration file to Open Humans.
This might be a good place to start editing, to add your own project data.
This template is written to provide the function with a tempdir that
will be cleaned up later. You can use the tempdir to stage the creation of
files you plan to upload to Open Humans.
"""
# Create example file.
data_filepath, data_metadata = make_example_datafile(tempdir)
# Remove any files with this name previously added to Open Humans.
delete_oh_file_by_name(oh_member, filename=os.path.basename(data_filepath))
# Upload this file to Open Humans.
upload_file_to_oh(oh_member, data_filepath, data_metadata)
def make_datafile(user_data, metadata, tempdir):
"""
Make a user data file in the tempdir.
"""
filename = 'user_data_' + datetime.today().strftime('%Y%m%d')
filepath = os.path.join(tempdir, filename)
with open(filepath, 'w') as f:
f.write(user_data)
return filepath, metadata
def make_example_datafile(tempdir):
"""
Make a lorem-ipsum file in the tempdir, for demonstration purposes.
"""
filepath = os.path.join(tempdir, 'example_data.txt')
paras = lorem_ipsum.paragraphs(3, common=True)
output_text = '\n'.join(['\n'.join(textwrap.wrap(p)) for p in paras])
with open(filepath, 'w') as f:
f.write(output_text)
metadata = {
'tags': ['example', 'text', 'demo'],
'description': 'File with lorem ipsum text for demonstration purposes',
}
return filepath, metadata
def delete_oh_file_by_name(oh_member, filename):
"""
Delete all project files matching the filename for this Open Humans member.
This deletes files this project previously added to the Open Humans
member account, if they match this filename. Read more about file deletion
API options here:
https://www.openhumans.org/direct-sharing/oauth2-data-upload/#deleting-files
"""
req = requests.post(
settings.OH_DELETE_FILES,
params={'access_token': oh_member.get_access_token()},
data={'project_member_id': oh_member.oh_id,
'file_basename': filename})
req.raise_for_status()
def upload_file_to_oh(oh_member, filepath, metadata):
"""
This demonstrates using the Open Humans "large file" upload process.
The small file upload process is simpler, but it can time out. This
alternate approach is required for large files, and still appropriate
for small files.
This process is "direct to S3" using three steps: 1. get S3 target URL from
Open Humans, 2. Perform the upload, 3. Notify Open Humans when complete.
"""
# Get the S3 target from Open Humans.
upload_url = '{}?access_token={}'.format(
settings.OH_DIRECT_UPLOAD, oh_member.get_access_token())
req1 = requests.post(
upload_url,
data={'project_member_id': oh_member.oh_id,
'filename': os.path.basename(filepath),
'metadata': json.dumps(metadata)})
req1.raise_for_status()
# Upload to S3 target.
with open(filepath, 'rb') as fh:
req2 = requests.put(url=req1.json()['url'], data=fh)
req2.raise_for_status()
# Report completed upload to Open Humans.
complete_url = ('{}?access_token={}'.format(
settings.OH_DIRECT_UPLOAD_COMPLETE, oh_member.get_access_token()))
req3 = requests.post(
complete_url,
data={'project_member_id': oh_member.oh_id,
'file_id': req1.json()['id']})
req3.raise_for_status()
logger.debug('Upload done: "{}" for member {}.'.format(
os.path.basename(filepath), oh_member.oh_id))
@shared_task
def make_request_respectful_get(url, realms, **kwargs):
r = rr.get(url=url, realms=realms, **kwargs)
logger.debug('Request completed. Response: {}'.format(r.text))