-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcli.py
182 lines (168 loc) · 6.65 KB
/
cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
from __future__ import annotations
import argparse
from argparse import RawTextHelpFormatter
import logging
import pathlib
import sys
import threading
import time
import traceback
import urllib3.exceptions
import requests.exceptions
from .api import CKANAPI
from .loggers import setup_logging
from .upload import task
from ._version import version
def monitor_upload_progress(upload_job):
newline = False
while upload_job.state == "parcel":
time.sleep(1)
prev_msg = ""
while upload_job.state == "transfer":
new_msg = upload_job.get_progress_string()
if new_msg != prev_msg:
# Only print something if it is new.
prev_msg = new_msg
print(new_msg, end="\r", flush=True)
newline = True
time.sleep(1)
if newline:
# Only print new line if we printed something before.
print("")
def ascertain_state_or_bust(upload_job, state):
"""If `upload_job.state != `state`, raise an exception"""
if upload_job.state != state:
if upload_job.state == "error":
raise ValueError(f"Job {upload_job} encountered an error:\n"
f"{upload_job.traceback}")
else:
raise ValueError(
f"The upload job {upload_job} should be in the state "
+ f"'{state}', but it's state is '{upload_job.state}'!"
)
def upload_task(path_task: str | pathlib.Path = None,
server: str = "dcor.mpl.mpg.de",
api_key: str = None,
cache_dir: str | pathlib.Path = None,
ret_job: bool = False,
retries_num: int = 10,
retries_wait: float = 30.0):
"""Upload a .dcoraid-task file to a DCOR instance
Parameters
----------
path_task: str or pathlib.Path
path to the .dcoraid-task file
server: str
DCOR server to use (e.g. dcor.mpl.mpg.de)
api_key: str
user API token to use for the upload
cache_dir: str or pathlib.Path
directory used for caching
ret_job: bool
whether to return the :class:`.UploadJob` instance
retries_num: int
number of times to retry when connection problems are encountered
retries_wait: float
seconds to wait in-between retries
"""
# Initialize with None, otherwise we might get issues if parsing
# of the arguments fails or in `finally`.
path_error = None
uj = None
exit_status = 1 # fails by default if there is no success
try:
if path_task is None or api_key is None:
parser = upload_task_parser()
args = parser.parse_args()
path_task = args.path_task
server = args.server
api_key = args.api_key
path_task = pathlib.Path(path_task)
path_error = path_task.parent / (path_task.name + "_error.txt")
for retry in range(retries_num):
try:
print("Initializing.")
# set up the api
api = CKANAPI(server, api_key=api_key)
# load the .dcoraid-task file
uj = task.load_task(path_task,
api=api,
update_dataset_id=True,
cache_dir=cache_dir,
)
print(f"Dataset ID is {uj.dataset_id}.")
print("Compressing uncompressed resources.")
uj.task_compress_resources()
ascertain_state_or_bust(uj, "parcel")
print("Uploading resources.")
# thread that prints the upload progress
monitor_thread = threading.Thread(
target=monitor_upload_progress,
name="Upload Monitor",
args=(uj,),
daemon=True)
monitor_thread.start()
uj.task_upload_resources()
monitor_thread.join()
ascertain_state_or_bust(uj, "online")
print("Verifying upload.")
uj.task_verify_resources()
ascertain_state_or_bust(uj, "done")
print("Done.")
except (urllib3.exceptions.HTTPError,
requests.exceptions.RequestException) as e:
path_error.write_text(f"Encountered Exception: {e}")
print(f"Encountered a transfer error. Retrying {retry + 1}...")
time.sleep(retries_wait)
httperror = e
continue
except BaseException as e:
# let the outer try-except clause handle all other errors
raise e
else:
# Upload successfully completed
break
else:
# We only get here if we "continued" through the entire loop,
# which means no successful upload and only HTTPErrors.
raise httperror
except SystemExit as e:
# e.code is 0 if the user just passed --help or --version
exit_status = e.code
except BaseException:
# Write errors to errors file
print(traceback.format_exc())
if path_error is not None:
path_error.write_text(traceback.format_exc())
else:
if path_error.exists():
path_error.unlink(missing_ok=True)
exit_status = 0
finally:
if ret_job and not exit_status:
return uj
# return sys.exit for testing (monkeypatched)
return sys.exit(exit_status)
def upload_task_parser():
setup_logging("dcoraid")
setup_logging("dclab")
setup_logging("requests", level=logging.INFO)
descr = (
"Upload a .dcoraid-task file to a DCOR instance. Example usage::\n"
+ "\n dcoraid-upload-task upload_job.dcoraid-task "
+ "dcor-dev.mpl.mpg.de eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJqdGkiO"
)
parser = argparse.ArgumentParser(description=descr,
formatter_class=RawTextHelpFormatter)
parser.add_argument('path_task', metavar="PATH", type=str,
help='The .dcoraid-task file')
parser.add_argument('server', metavar="SERVER", type=str,
help='DCOR instance to upload to',
default="dcor.mpl.mpg.de")
parser.add_argument('api_key', metavar="API_KEY", type=str,
help='Your DCOR API token')
parser.add_argument('--cache_dir', metavar="CACHE_DIR", type=str,
help='Cache directory for data compression')
parser.add_argument('--version', action='version',
version=f'dcoraid-upload-task {version}')
return parser