-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathingest_raw_data.py
144 lines (108 loc) · 5.43 KB
/
ingest_raw_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import os
from pathlib import Path
from PIL import Image
import shutil
from datetime import datetime
import pytz
import gcp_utils
import yaml
import git
metadata_file_name = 'metadata.yaml'
tmp_directory = Path('./tmp')
def process_zips(gcp_bucket):
files = gcp_utils.list_files(gcp_bucket.split('gs://')[1], 'raw-data')
for zipped_stack in files:
if zipped_stack == 'raw-data/':
continue
process_zip(gcp_bucket, os.path.join(gcp_bucket, zipped_stack))
def process_zip(gcp_bucket, zipped_stack):
start_dt = datetime.now()
assert "gs://" in zipped_stack
assert "gs://" in gcp_bucket
# clean up the tmp directory
try:
shutil.rmtree(tmp_directory.as_posix())
except FileNotFoundError:
pass
tmp_directory.mkdir()
is_annotation = 'dmg' in zipped_stack
stack_id = Path(zipped_stack).name.split('.')[0]
split_strings = ['_8bit', '-mtrxdmg', '_dmg', '-dmg']
for s in split_strings:
stack_id = stack_id.split(s)[0]
stack_dir = Path(tmp_directory, stack_id)
if not is_annotation and gcp_utils.remote_folder_exists(os.path.join(gcp_bucket, 'processed-data'),
'/'.join([stack_id] + ["images"])):
print("{} has already been processed! Skipping...".format(os.path.join(stack_id, "images")))
elif is_annotation and gcp_utils.remote_folder_exists(os.path.join(gcp_bucket, 'processed-data'),
'/'.join([stack_id] + ["annotations"])):
print("{} has already been processed! Skipping...".format(os.path.join(stack_id, "annotations")))
else:
os.system("gsutil -m cp -r '{}' '{}'".format(zipped_stack, tmp_directory.as_posix()))
os.system("7za x -y -o'{}' '{}'".format(stack_dir.as_posix(), Path(tmp_directory, Path(zipped_stack).name).as_posix()))
os.remove(Path(tmp_directory, Path(zipped_stack).name).as_posix())
unzipped_dir = next(stack_dir.iterdir())
original_number_of_files_in_zip = len(list(unzipped_dir.iterdir()))
temp_file_name = r'./temp.tif'
for f in Path(unzipped_dir).iterdir():
if f.name[-4:] != '.tif':
# remove any non-image files
os.remove(f.as_posix())
else:
# Old code to convert all images to greyscale (some are already and some aren't)
# Image.open(f).convert("L").save(f)
# The commented code was giving an error due to some compression setting of the SOME images:
# return encoder(mode, *args + extra)
# TypeError: argument 5 should be a str, not PosixPath
# The error seems to be related to the image metadata or to a compression setting of the image.
# A workaround was to copy and rename the image and delete the old one..
shutil.copyfile(f, temp_file_name)
os.remove(f)
im1 = Image.open(temp_file_name).convert("L").save(temp_file_name)
os.rename(temp_file_name, f)
shutil.move(unzipped_dir.as_posix(),
Path(unzipped_dir.parent, 'annotations' if is_annotation else 'images').as_posix())
# get metadata file, if exists
os.system("gsutil -m cp -r '{}' '{}'".format(os.path.join(gcp_bucket, 'processed-data/', stack_id, metadata_file_name),
Path(tmp_directory, stack_id).as_posix()))
try:
with Path(tmp_directory, stack_id, metadata_file_name).open('r') as f:
metadata = yaml.safe_load(f)
except FileNotFoundError:
metadata = {}
metadata.update({'annotations' if is_annotation else 'images': {
'gcp_bucket': gcp_bucket,
'zipped_stack_file': zipped_stack,
'created_datetime': datetime.now(pytz.UTC).strftime('%Y%m%dT%H%M%SZ'),
'original_number_of_files_in_zip': original_number_of_files_in_zip,
'number_of_images': len(list(Path(unzipped_dir.parent, 'annotations' if is_annotation else 'images').iterdir())),
'git_hash': git.Repo(search_parent_directories=True).head.object.hexsha},
'elapsed_minutes': round((datetime.now() - start_dt).total_seconds() / 60, 1)
})
with Path(tmp_directory, stack_id, metadata_file_name).open('w') as f:
yaml.safe_dump(metadata, f)
os.system("gsutil -m cp -n -r '{}' '{}'".format(unzipped_dir.parent.as_posix(),
os.path.join(gcp_bucket, 'processed-data/')))
print('\n Ingest Raw Data Metadata:')
print(metadata)
print('\n')
shutil.rmtree(tmp_directory.as_posix())
if __name__ == "__main__":
import sys
import argparse
argparser = argparse.ArgumentParser(sys.argv[0])
argparser.add_argument(
'--gcp-bucket',
type=str,
help='The GCP bucket where the raw data is located and to use to store the processed stacks.')
argparser.add_argument(
'--zipped-stack',
type=str,
default='',
help='The zipped stack (.zip or .7z) to be processed.')
kw_args = argparser.parse_args().__dict__
if kw_args['zipped_stack'] == '':
process_zips(gcp_bucket=kw_args['gcp_bucket'])
else:
process_zip(gcp_bucket=kw_args['gcp_bucket'],
zipped_stack=kw_args['zipped_stack'])