1
1
# -*- coding: utf-8 -*-
2
- import os
2
+ import os
3
3
import pandas as pd
4
4
from dask import delayed
5
5
import re
9
9
import shutil
10
10
11
11
12
-
13
12
HOME = os .environ ["HOME" ]
14
13
INTAKE_CMIP5_DIR = f"{ HOME } /.intake_cmip5"
15
14
15
+
16
16
@functools .lru_cache (maxsize = 1024 , typed = False )
17
17
def _parse_dirs (root_dir ):
18
- institution_dirs = [os .path .join (root_dir , activity , institution )
19
- for activity in os .listdir (root_dir )
20
- for institution in os .listdir (os .path .join (root_dir , activity ))
21
- if os .path .isdir (os .path .join (root_dir , activity , institution ))]
22
-
23
- model_dirs = [os .path .join (institution_dir , model )
24
- for institution_dir in institution_dirs
25
- for model in os .listdir (institution_dir )
26
- if os .path .isdir (os .path .join (institution_dir , model ))]
27
-
28
- experiment_dirs = [os .path .join (model_dir , exp )
29
- for model_dir in model_dirs
30
- for exp in os .listdir (model_dir )
31
- if os .path .isdir (os .path .join (model_dir , exp ))]
32
-
33
- freq_dirs = [os .path .join (experiment_dir , freq )
34
- for experiment_dir in experiment_dirs
35
- for freq in os .listdir (experiment_dir )
36
- if os .path .isdir (os .path .join (experiment_dir , freq ))]
37
-
38
- realm_dirs = [os .path .join (freq_dir , realm )
39
- for freq_dir in freq_dirs
40
- for realm in os .listdir (freq_dir )
41
- if os .path .isdir (os .path .join (freq_dir , realm ))]
42
-
18
+ institution_dirs = [
19
+ os .path .join (root_dir , activity , institution )
20
+ for activity in os .listdir (root_dir )
21
+ for institution in os .listdir (os .path .join (root_dir , activity ))
22
+ if os .path .isdir (os .path .join (root_dir , activity , institution ))
23
+ ]
24
+
25
+ model_dirs = [
26
+ os .path .join (institution_dir , model )
27
+ for institution_dir in institution_dirs
28
+ for model in os .listdir (institution_dir )
29
+ if os .path .isdir (os .path .join (institution_dir , model ))
30
+ ]
31
+
32
+ experiment_dirs = [
33
+ os .path .join (model_dir , exp )
34
+ for model_dir in model_dirs
35
+ for exp in os .listdir (model_dir )
36
+ if os .path .isdir (os .path .join (model_dir , exp ))
37
+ ]
38
+
39
+ freq_dirs = [
40
+ os .path .join (experiment_dir , freq )
41
+ for experiment_dir in experiment_dirs
42
+ for freq in os .listdir (experiment_dir )
43
+ if os .path .isdir (os .path .join (experiment_dir , freq ))
44
+ ]
45
+
46
+ realm_dirs = [
47
+ os .path .join (freq_dir , realm )
48
+ for freq_dir in freq_dirs
49
+ for realm in os .listdir (freq_dir )
50
+ if os .path .isdir (os .path .join (freq_dir , realm ))
51
+ ]
52
+
43
53
return realm_dirs
44
-
54
+
55
+
45
56
def _get_entry (directory ):
46
- dir_split = directory .split ('/' )
57
+ dir_split = directory .split ("/" )
47
58
entry = {}
48
- entry [' realm' ] = dir_split [- 1 ]
49
- entry [' frequency' ] = dir_split [- 2 ]
50
- entry [' experiment' ] = dir_split [- 3 ]
51
- entry [' model' ] = dir_split [- 4 ]
52
- entry [' institution' ] = dir_split [- 5 ]
59
+ entry [" realm" ] = dir_split [- 1 ]
60
+ entry [" frequency" ] = dir_split [- 2 ]
61
+ entry [" experiment" ] = dir_split [- 3 ]
62
+ entry [" model" ] = dir_split [- 4 ]
63
+ entry [" institution" ] = dir_split [- 5 ]
53
64
return entry
54
-
65
+
66
+
55
67
@delayed
56
68
def parse_directory (directory ):
57
- exclude = set (["files" , "latests" ]) # directories to exclude
69
+ exclude = set (["files" , "latests" ]) # directories to exclude
58
70
59
- columns = ["ensemble" , "experiment" , "file_basename" , "file_fullpath" ,
60
- "frequency" , "institution" , "model" , "root" , "realm" , "varname" ]
71
+ columns = [
72
+ "ensemble" ,
73
+ "experiment" ,
74
+ "file_basename" ,
75
+ "file_fullpath" ,
76
+ "frequency" ,
77
+ "institution" ,
78
+ "model" ,
79
+ "root" ,
80
+ "realm" ,
81
+ "varname" ,
82
+ ]
61
83
df = pd .DataFrame (columns = columns )
62
84
63
85
entry = _get_entry (directory )
@@ -68,17 +90,18 @@ def parse_directory(directory):
68
90
if not files :
69
91
continue
70
92
sfiles = sorted ([f for f in files if os .path .splitext (f )[1 ] == ".nc" ])
71
- if not sfiles : continue
93
+ if not sfiles :
94
+ continue
72
95
73
96
fs = []
74
97
for f in sfiles :
75
98
try :
76
99
f_split = f .split ("_" )
77
- entry [' varname' ] = f_split [0 ]
78
- entry [' ensemble' ] = f_split [- 2 ]
79
- entry [' root' ] = root
80
- entry [' file_basename' ] = f
81
- entry [' file_fullpath' ] = os .path .join (root , f )
100
+ entry [" varname" ] = f_split [0 ]
101
+ entry [" ensemble" ] = f_split [- 2 ]
102
+ entry [" root" ] = root
103
+ entry [" file_basename" ] = f
104
+ entry [" file_fullpath" ] = os .path .join (root , f )
82
105
fs .append (entry )
83
106
except :
84
107
continue
@@ -88,34 +111,45 @@ def parse_directory(directory):
88
111
else :
89
112
temp_df = pd .DataFrame ()
90
113
temp_df .columns = df .columns
91
- df = pd .concat ([temp_df , df ], ignore_index = True )
114
+ df = pd .concat ([temp_df , df ], ignore_index = True , sort = False )
92
115
return df
93
116
94
- def _persist_database (df ):
95
- vYYYYMMDD = r'v\d{4}\d{2}\d{2}'
96
- vN = r'v\d{1}'
97
- v = re .compile ( "|" .join ([vYYYYMMDD , vN ])) # Combine both regex into one
117
+
118
+ def _persist_database (df , path ):
119
+ vYYYYMMDD = (
120
+ r"v\d{4}\d{2}\d{2}"
121
+ ) # TODO: Very dangerous in case the root dir matches the pattern
122
+ vN = r"v\d{1}"
123
+ v = re .compile ("|" .join ([vYYYYMMDD , vN ])) # Combine both regex into one
98
124
df ["version" ] = df .root .str .findall (v )
99
- df ["version" ] = df ["version" ].apply (lambda x : x [0 ] if x else 'v0' )
100
- sorted_df = df .sort_values ("version" ).drop_duplicates (subset = "file_basename" , keep = "last" )\
101
- .reset_index (drop = True )
125
+ df ["version" ] = df ["version" ].apply (lambda x : x [0 ] if x else "v0" )
126
+ sorted_df = (
127
+ df .sort_values ("version" )
128
+ .drop_duplicates (subset = "file_basename" , keep = "last" )
129
+ .reset_index (drop = True )
130
+ )
131
+
132
+ if path :
133
+ INTAKE_CMIP5_DIR = path
134
+
102
135
print (f"**** Persisting CMIP5 database in { INTAKE_CMIP5_DIR } ****" )
103
136
104
137
if os .path .isdir (INTAKE_CMIP5_DIR ):
105
138
shutil .rmtree (INTAKE_CMIP5_DIR )
106
139
os .makedirs (INTAKE_CMIP5_DIR , exist_ok = True )
107
-
140
+
108
141
sorted_df .to_csv (f"{ INTAKE_CMIP5_DIR } /clean_cmip5_database.csv" , index = False )
109
142
df .to_csv (f"{ INTAKE_CMIP5_DIR } /raw_cmip5_database.csv" , index = False )
110
-
143
+
111
144
return sorted_df
112
145
113
- def create_CMIP5Database (root_dir = None ):
146
+
147
+ def create_CMIP5Database (root_dir = None , db_path = None ):
114
148
if not os .path .exists (root_dir ):
115
149
raise NotADirectoryError (f"{ root_dir } does not exist" )
116
-
150
+
117
151
dirs = _parse_dirs (root_dir )
118
152
dfs = [parse_directory (directory ) for directory in dirs ]
119
153
df = dd .from_delayed (dfs ).compute ()
120
- df = _persist_database (df )
121
- return df
154
+ df = _persist_database (df , db_path )
155
+ return df
0 commit comments