-
Notifications
You must be signed in to change notification settings - Fork 0
/
count_lengths_of_roads.py
175 lines (147 loc) · 7.59 KB
/
count_lengths_of_roads.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
import os
import sys
from ecoshard import geoprocessing
from osgeo import gdal
from osgeo import ogr
from osgeo import osr
import geopandas as gpd
WORK = {
'IDN': {
'road_vector_path': r"D:\repositories\wwf-sipa\data\infrastructure_polygons\IDN_All_Roads_Merged.gpkg",
'conservation_rasters': [
r"D:\repositories\wwf-sipa\workspace_postprocess_number_of_people_benefitting\working_dir\coastal_benefit_areas_idn_dem_10_IDN_conservation_inf_service_overlap_count.tif",
r"D:\repositories\wwf-sipa\workspace_postprocess_number_of_people_benefitting\working_dir\10_IDN_conservation_inf_service_overlap_count_downstream_mask.tif",
],
'restoration_rasters': [
r"D:\repositories\wwf-sipa\workspace_postprocess_number_of_people_benefitting\working_dir\coastal_benefit_areas_idn_dem_10_IDN_restoration_service_overlap_count.tif",
r"D:\repositories\wwf-sipa\workspace_postprocess_number_of_people_benefitting\working_dir\10_IDN_restoration_service_overlap_count_downstream_mask.tif",
],
'target_epsg': 23830,
'municipalities_path': (
r"D:\repositories\wwf-sipa\data\admin_boundaries\IDN_adm1.gpkg",
'NAME_1',),
},
'PH': {
'road_vector_path': r"D:\repositories\wwf-sipa\data\infrastructure_polygons\PH_All_Roads_Merged.gpkg",
'conservation_rasters': [
r"D:\repositories\wwf-sipa\workspace_postprocess_number_of_people_benefitting\working_dir\coastal_benefit_areas_ph_dem_10_PH_conservation_inf_service_overlap_count.tif",
r"D:\repositories\wwf-sipa\workspace_postprocess_number_of_people_benefitting\working_dir\10_PH_conservation_inf_service_overlap_count_downstream_mask.tif",
],
'restoration_rasters': [
r"D:\repositories\wwf-sipa\workspace_postprocess_number_of_people_benefitting\working_dir\coastal_benefit_areas_ph_dem_10_PH_restoration_service_overlap_count.tif",
r"D:\repositories\wwf-sipa\workspace_postprocess_number_of_people_benefitting\working_dir\10_PH_restoration_service_overlap_count_downstream_mask.tif",
],
'target_epsg': 3121,
'municipalities_path': (
r"D:\repositories\wwf-sipa\data\admin_boundaries\PH_gdam2.gpkg",
'NAME_2',),
},
}
WORKING_DIR = 'road_length_workspace'
os.makedirs(WORKING_DIR, exist_ok=True)
def _merge_masks_op(a, b):
return (a >= 1) | (b >= 1)
def progress_callback(complete, message, _):
"""
A callback function that displays the progress.
Parameters:
- complete: A float representing the percentage of the task completed.
- message: A string message (not always provided).
- _: An ignored user data parameter.
"""
print(f"Progress: {complete*100:.2f}%", end="\r")
sys.stdout.flush()
def main():
for country_id in ['PH', 'IDN']:
for raster_list_id in ['conservation_rasters', 'restoration_rasters']:
raster_list = WORK[country_id][raster_list_id]
mask_raster_path = os.path.join(WORKING_DIR, f'{raster_list_id}.tif')
geoprocessing.raster_calculator(
[(path, 1) for path in raster_list], _merge_masks_op,
mask_raster_path,
gdal.GDT_Byte, 0)
# Open the raster file
raster = gdal.Open(mask_raster_path)
band = raster.GetRasterBand(1)
srs = osr.SpatialReference()
srs.ImportFromWkt(raster.GetProjection())
print(srs)
# Create the destination data source
target_layername = f'{country_id}_{raster_list_id}'
driver = ogr.GetDriverByName("GPKG")
overlap_polygon_path = os.path.join(
WORKING_DIR, f'{target_layername}.gpkg')
target_vector = driver.CreateDataSource(overlap_polygon_path)
target_layer = target_vector.CreateLayer(
target_layername, srs=srs)
# Add a field
fd = ogr.FieldDefn("DN", ogr.OFTInteger)
target_layer.CreateField(fd)
dst_field = 0
# Create a mask band that excludes the NoData value
nodata = band.GetNoDataValue()
if nodata is not None:
# Create a temporary dataset to hold the mask
drv = gdal.GetDriverByName('MEM')
mask_ds = drv.Create('', raster.RasterXSize, raster.RasterYSize, 1, gdal.GDT_Byte)
mask_band = mask_ds.GetRasterBand(1)
# Use raster calculator to create a mask: 1 for data pixels, 0 for nodata pixels
mask_band.WriteArray(
(band.ReadAsArray() != nodata).astype(int))
# Polygonize
gdal.Polygonize(
band, mask_band, target_layer, dst_field, [],
callback=progress_callback)
target_layer = None
target_vector = None
band = None
raster = None
# Load the line vector layer
line_gdf = gpd.read_file(WORK[country_id]['road_vector_path'])
# Load the polygon vector layer (used for clipping)
polygon_gdf = gpd.read_file(overlap_polygon_path)
if line_gdf.crs != polygon_gdf.crs:
print('reproject')
line_gdf = line_gdf.to_crs(polygon_gdf.crs)
print('do clip')
# Perform the clipping operation
clipped_lines = gpd.clip(line_gdf, polygon_gdf)
print('save')
# Save the clipped output
clipped_lines.to_file(os.path.join(WORKING_DIR, f'{country_id}_{raster_list_id}.gpkg'))
print('all done')
def summarize_length_by_region(
original_line_vector_path, clipped_line_vector_path, target_epsg, summary_vector_path, summary_vector_field,
target_table_path):
with open(target_table_path, 'w') as table:
table.write('region name,beneficiary road length,total road length\n')
# Load the line vector layer
print('read files')
original_line_gdf = gpd.read_file(original_line_vector_path)
clipped_line_gdf = gpd.read_file(clipped_line_vector_path)
summary_gdf = gpd.read_file(summary_vector_path)
print(f'project to {target_epsg}')
clipped_line_gdf = clipped_line_gdf.to_crs(epsg=target_epsg)
original_line_gdf = original_line_gdf.to_crs(epsg=target_epsg)
summary_gdf = summary_gdf.to_crs(epsg=target_epsg)
for field_id in summary_gdf[summary_vector_field].unique():
print(f'clip {field_id}')
subset_gdf = summary_gdf[
summary_gdf[summary_vector_field] == field_id]
subset_road = gpd.clip(clipped_line_gdf, subset_gdf)
original_road = gpd.clip(original_line_gdf, subset_gdf)
table.write(f'{field_id},{subset_road.length.sum()/1000},{original_road.length.sum()/1000}\n')
if __name__ == '__main__':
for country_id in ['PH', 'IDN']:
for raster_list_id in ['conservation_rasters', 'restoration_rasters']:
road_vector_path = WORK[country_id]['road_vector_path']
clipped_road_vector_path = os.path.join(WORKING_DIR, f'{country_id}_{raster_list_id}.gpkg')
summary_vector_path, summary_vector_field = WORK[country_id]['municipalities_path']
target_table_path = os.path.join(
WORKING_DIR, f'{country_id}_{raster_list_id}_summary.csv')
summarize_length_by_region(
road_vector_path,
clipped_road_vector_path, WORK[country_id]['target_epsg'],
summary_vector_path, summary_vector_field,
target_table_path)
#main()