-
Notifications
You must be signed in to change notification settings - Fork 0
/
downstream-beneficiaries.py
284 lines (241 loc) · 10.2 KB
/
downstream-beneficiaries.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
# coding=UTF-8
import argparse
import logging
import os
import numpy
import pygeoprocessing
import pygeoprocessing.routing
import taskgraph
from osgeo import gdal
logging.basicConfig(level=logging.INFO)
FLOAT32_NODATA = float(numpy.finfo(numpy.float32).min)
BYTE_NODATA = 255
LOGGER = logging.getLogger(__name__)
ALGORITHMS = {
"d8": {
"flow_dir": pygeoprocessing.routing.flow_dir_d8,
"flow_accumulation": pygeoprocessing.routing.flow_accumulation_d8,
},
"mfd": {
"flow_dir": pygeoprocessing.routing.flow_dir_mfd,
"flow_accumulation": pygeoprocessing.routing.flow_accumulation_mfd,
}
}
def _sum_population_counts(masked_pop_path):
pixel_sum = 0
masked_pop_nodata = pygeoprocessing.get_raster_info(
masked_pop_path)['nodata'][0]
for block_info, pop_block in pygeoprocessing.iterblocks(
(masked_pop_path, 1)):
valid_pixels = ~numpy.isclose(pop_block, masked_pop_nodata)
# Casting pixel values to ints to minimize numerical error when
# summing.
pixel_sum += numpy.sum(pop_block[valid_pixels].astype(numpy.uint32))
return pixel_sum
def _mask_pop_downstream_of_aois(flow_accum_path, pop_count_path,
target_mask_path):
flow_accum_nodata = pygeoprocessing.get_raster_info(
flow_accum_path)['nodata'][0]
pop_count_nodata = pygeoprocessing.get_raster_info(
pop_count_path)['nodata'][0]
def _mask(flow_accum_array, pop_count_array):
output = numpy.full(flow_accum_array.shape, FLOAT32_NODATA,
dtype=numpy.float32)
pixels_with_aoi_upstream = (
~numpy.isclose(flow_accum_array, flow_accum_nodata) &
~numpy.isclose(pop_count_array, pop_count_nodata) &
(flow_accum_array > 0))
output[pixels_with_aoi_upstream] = (
pop_count_array[pixels_with_aoi_upstream])
return output
pygeoprocessing.raster_calculator(
[(flow_accum_path, 1), (pop_count_path, 1)], _mask, target_mask_path,
gdal.GDT_Float32, pop_count_nodata)
people_downstream_of_aois = _sum_population_counts(target_mask_path)
print(f'People downstream of AOIs: {people_downstream_of_aois}')
def mask_areas_of_interest(aoi_path, target_path):
aoi_nodata = pygeoprocessing.get_raster_info(aoi_path)['nodata'][0]
def _convert(array):
result = numpy.full(array.shape, 0, dtype=numpy.uint8)
result[array == 1] = 1
result[numpy.isclose(array, aoi_nodata)] = BYTE_NODATA
return result
pygeoprocessing.raster_calculator(
[(aoi_path, 1)], _convert, target_path, gdal.GDT_Byte, BYTE_NODATA)
def convert_population_units(
population_raster, target_population_density, to_density=True):
# assume linearly projected
# TODO: how would we handle some other projection, where pixels are not
# equal in size?
raster_info = pygeoprocessing.get_raster_info(population_raster)
pixel_area = abs(raster_info['pixel_size'][0] *
raster_info['pixel_size'][1])
pop_nodata = raster_info['nodata'][0]
def _convert(pop_array):
result = numpy.full(pop_array.shape, FLOAT32_NODATA,
dtype=numpy.float32)
valid_mask = ~numpy.isclose(pop_array, pop_nodata)
if to_density:
result[valid_mask] = pop_array[valid_mask] / pixel_area
else:
result[valid_mask] = pop_array[valid_mask] * pixel_area
return result
pygeoprocessing.raster_calculator(
[(population_raster, 1)], _convert, target_population_density,
gdal.GDT_Float32, FLOAT32_NODATA)
def calculate_downstream_beneficiaries(
dem_path, population_path, areas_of_interest_path, workspace_dir,
algorithm='mfd', n_workers=-1):
algorithm = algorithm.lower()
if algorithm not in ALGORITHMS.keys():
raise ValueError(f"Invalid routing algorithm: {algorithm}")
taskgraph_dir = os.path.join(workspace_dir, '.taskgraph')
if not os.path.exists(taskgraph_dir):
os.makedirs(taskgraph_dir)
graph = taskgraph.TaskGraph(
taskgraph_dir, n_workers=n_workers)
# raster_calculator: convert the population to population per unit area
population_density_path = os.path.join(
workspace_dir, 'population_density.tif')
pop_density_task = graph.add_task(
convert_population_units,
args=(population_path, population_density_path, True),
target_path_list=[population_density_path],
task_name='convert population count to density',
dependent_task_list=[]
)
# align the input raster stack
aligned_dem_path = os.path.join(
workspace_dir, 'aligned_dem.tif')
aligned_population_density_path = os.path.join(
workspace_dir, 'aligned_population_density.tif')
aligned_areas_of_interest_path = os.path.join(
workspace_dir, 'aligned_areas_of_interest.tif')
dem_info = pygeoprocessing.get_raster_info(dem_path)
align_task = graph.add_task(
pygeoprocessing.align_and_resize_raster_stack,
kwargs={
'base_raster_path_list': [
dem_path, population_path, areas_of_interest_path],
'target_raster_path_list': [
aligned_dem_path, aligned_population_density_path,
aligned_areas_of_interest_path],
'resample_method_list': ['bilinear', 'bilinear', 'near'],
'target_pixel_size': dem_info['pixel_size'],
'bounding_box_mode': 'intersection',
'raster_align_index': 0,
'target_projection_wkt': dem_info['projection_wkt']
},
target_path_list=[
aligned_dem_path, aligned_population_density_path,
aligned_areas_of_interest_path,
],
task_name='align rasters',
dependent_task_list=[pop_density_task]
)
# raster_calculator: mask to create raster of 1-value and 0-values.
masked_areas_of_interest_path = os.path.join(
workspace_dir, 'masked_areas_of_interest.tif')
masked_aoi_task = graph.add_task(
mask_areas_of_interest,
args=(aligned_areas_of_interest_path, masked_areas_of_interest_path),
target_path_list=[masked_areas_of_interest_path],
task_name='mask areas of interest',
dependent_task_list=[align_task]
)
# raster_calculator: convert the population back to population count
population_count_path = os.path.join(
workspace_dir, f'aligned_population_count.tif')
pop_count_task = graph.add_task(
convert_population_units,
args=(aligned_population_density_path, population_count_path, False),
target_path_list=[population_density_path],
task_name='convert population density to count',
dependent_task_list=[align_task]
)
# fill the dem
filled_dem_path = os.path.join(
workspace_dir, 'filled_dem.tif')
filled_dem_task = graph.add_task(
pygeoprocessing.routing.fill_pits,
args=((aligned_dem_path, 1), filled_dem_path, workspace_dir),
target_path_list=[filled_dem_path],
task_name='fill pits',
dependent_task_list=[align_task]
)
# flow_direction
flow_dir_path = os.path.join(
workspace_dir, f'flow_dir_{algorithm}.tif')
flow_dir_task = graph.add_task(
ALGORITHMS[algorithm]['flow_dir'],
args=((filled_dem_path, 1), flow_dir_path, workspace_dir),
target_path_list=[flow_dir_path],
task_name=f'flow direction {algorithm}',
dependent_task_list=[filled_dem_task]
)
# weighted flow accumulation.
# This is used to generate a mask of what is downstream of the areas of
# interest.
flow_accum_path = os.path.join(
workspace_dir, 'flow_accumulation.tif')
flow_accum_task = graph.add_task(
ALGORITHMS[algorithm]['flow_accumulation'],
args=((flow_dir_path, 1), flow_accum_path,
(masked_areas_of_interest_path, 1)),
target_path_list=[flow_accum_path],
task_name='weighted flow accumulation',
dependent_task_list=[flow_dir_task, masked_aoi_task]
)
# Mask populations downstream of areas of interest.
masked_pop_path = os.path.join(
workspace_dir, 'pop_downstream_of_areas_of_interest.tif')
masked_pop_path = graph.add_task(
_mask_pop_downstream_of_aois,
args=(flow_accum_path, population_count_path, masked_pop_path),
target_path_list=[masked_pop_path],
task_name='identify pixels downstream of AOIs',
dependent_task_list=[flow_accum_task, pop_count_task]
)
graph.join()
graph.close()
def main():
parser = argparse.ArgumentParser(
description=(
'Count the number of people downstream of pixels of interest.'),
prog='downstream-beneficiaries.py')
parser.add_argument(
'--parallelize', default=False, action='store_true', help=(
'Whether to engage multiple CPU cores for computation'))
parser.add_argument(
'--dem', help='The path to the linearly-projected DEM raster to use.',
required=True)
parser.add_argument(
'--population', help='Raster of population counts per pixel.',
required=True)
parser.add_argument(
'--algorithm', help=(
'The routing algorithm to use. One of "D8" or "MFD". '
'Default: MFD.'),
default="MFD", required=False)
parser.add_argument(
'--areas-of-interest', help=(
'Raster indicating areas of interest. Pixel values of 1 '
'an area of interest, anything else is not an area of interest.'),
required=True)
parser.add_argument('workspace', help='The target workspace directory')
args = parser.parse_args()
calculate_downstream_beneficiaries(
dem_path=args.dem,
population_path=args.population,
areas_of_interest_path=args.areas_of_interest,
workspace_dir=args.workspace,
algorithm=args.algorithm,
n_workers=-1 if not args.parallelize else 2
)
if __name__ == '__main__':
main()
#calculate_downstream_beneficiaries(
# 'DEM_Colombia300m.tif',
# 'LandscanPopulation2017_Colombia.tif',
# 'MaskServiceProvHotspots.tif',
# 'downstream-beneficiaries-workspace')