8
8
class DataProcess (DataPull ):
9
9
10
10
def __init__ (self ):
11
+ super ().__init__ (self )
12
+ self .blocks = self .process_shps ()
13
+ self .process_lodes ()
11
14
12
- self .mov_file_url = "https://www2.census.gov/ces/movs/movs_st_main2005.csv"
13
- self .mov_file_path = "data/raw/movs_st_main2005.csv"
14
- self .shape_file_url = "https://www2.census.gov/geo/tiger/GENZ2018/shp/cb_2018_us_state_500k.zip"
15
- self .shape_file_path = "data/shape_files/states.zip"
16
- self .state_code_file_path = "data/external/state_code.parquet"
17
- self .blocks_file_path = "data/processed/blocks.parquet"
18
- self .lodes_file_path = "data/processed/lodes.parquet"
19
-
20
- self .mov = self .load_mov_data ()
21
- self .shp = self .load_shape_data ()
22
- self .codes = self .load_state_codes ()
23
- self .blocks = self .load_blocks_data ()
24
- self .lodes = self .load_lodes_data ()
25
- self .df = self .create_graph_dataset ()
26
15
27
- def retrieve_shps (self , blocks ) :
16
+ def process_shps (self ) -> pl . DataFrame :
28
17
29
- for state , name in self .codes .select (pl .col ("fips" , "state_name" )).rows ():
30
- print (f"Processing { name } , { state } " )
31
- url = f"https://www2.census.gov/geo/tiger/TIGER2023/TABBLOCK20/tl_2023_{ str (state ).zfill (2 )} _tabblock20.zip"
32
- file_name = f"data/shape_files/{ name } _{ str (state ).zfill (2 )} .zip"
33
- self .retrieve_file (url , file_name )
34
- tmp = gpd .read_file (file_name , engine = "pyogrio" )
35
- tmp = tmp .set_crs (3857 , allow_override = True )
36
- tmp_shp = tmp [["STATEFP20" , "GEOID20" , "geometry" ]].copy ()
37
- tmp_shp ["centroid" ] = tmp_shp .centroid
38
- tmp_shp ['lon' ] = tmp_shp .centroid .x
39
- tmp_shp ['lat' ] = tmp_shp .centroid .y
40
- tmp_block = pl .from_pandas (tmp_shp [["STATEFP20" , "GEOID20" , "lon" , "lat" ]])
41
- blocks = pl .concat ([blocks , tmp_block ], how = "vertical" )
42
- print (f"Finished processing { state } " )
43
- blocks .write_parquet (self .blocks_file_path )
18
+ empty_df = [
19
+ pl .Series ("STATEFP20" , [], dtype = pl .String ),
20
+ pl .Series ("GEOID20" , [], dtype = pl .String ),
21
+ pl .Series ("lon" , [], dtype = pl .Float64 ),
22
+ pl .Series ("lat" , [], dtype = pl .Float64 ),
23
+ ]
24
+ blocks = pl .DataFrame (empty_df ).clear ()
25
+ if not os .path .exists ("data/processed/blocks.parquet" ):
26
+ for state , name in self .codes .select (pl .col ("fips" , "state_name" )).rows ():
27
+ file_name = f"data/shape_files/block_{ name } _{ str (state ).zfill (2 )} .zip"
28
+ tmp = gpd .read_file (file_name , engine = "pyogrio" )
29
+ tmp = tmp .set_crs (3857 , allow_override = True )
30
+ tmp_shp = tmp [["STATEFP20" , "GEOID20" , "geometry" ]].copy ()
31
+ tmp_shp ["centroid" ] = tmp_shp .centroid
32
+ tmp_shp ['lon' ] = tmp_shp .centroid .x
33
+ tmp_shp ['lat' ] = tmp_shp .centroid .y
34
+ tmp_block = pl .from_pandas (tmp_shp [["STATEFP20" , "GEOID20" , "lon" , "lat" ]])
35
+ blocks = pl .concat ([blocks , tmp_block ], how = "vertical" )
36
+ print ("\033 [0;36mPROCESS: \033 [0m" + f"Finished processing { name } Shapes" )
37
+ blocks .sort (by = ["STATEFP20" , "GEOID20" ]).write_parquet ("data/processed/blocks.parquet" )
38
+ return blocks
39
+ else :
40
+ return pl .read_parquet ("data/processed/blocks.parquet" )
44
41
45
- def process_lodes (self , lodes ):
46
-
47
- for state , name , fips in self .codes .select (pl .col ("state_abbr" , "state_name" , "fips" )).rows ():
48
- for year in range (2005 , 2020 ):
49
- url = f"https://lehd.ces.census.gov/data/lodes/LODES8/{ state } /od/{ state } _od_main_JT00_{ year } .csv.gz"
50
- file_name = f"data/raw/lodes_{ state } _{ year } .csv.gz"
51
- try :
52
- self .retrieve_file (url , file_name )
53
- except :
54
- print (f"Failed to download { name } , { state } , { year } " )
55
- continue
56
- value = self .process_lodes (file_name )
57
- tmp_df = pl .DataFrame ([
58
- pl .Series ("state" , [state ], dtype = pl .String ),
59
- pl .Series ("fips" , [fips ], dtype = pl .String ),
60
- pl .Series ("state_abbr" , [name ], dtype = pl .String ),
61
- pl .Series ("year" , [year ], dtype = pl .Int64 ),
62
- pl .Series ("avg_distance" , [value ], dtype = pl .Float64 ),
63
- ])
64
- lodes = pl .concat ([lodes , tmp_df ], how = "vertical" )
65
- print (f"Finished processing { name } , { state } , { year } " )
66
- lodes .write_parquet (self .lodes_file_path )
67
-
68
- def process_lodes (self , path ):
42
+ def process_lodes (self ):
43
+ empty_df = [
44
+ pl .Series ("state" , [], dtype = pl .String ),
45
+ pl .Series ("fips" , [], dtype = pl .String ),
46
+ pl .Series ("state_abbr" , [], dtype = pl .String ),
47
+ pl .Series ("year" , [], dtype = pl .Int64 ),
48
+ pl .Series ("avg_distance" , [], dtype = pl .Float64 ),
49
+ ]
50
+ lodes = pl .DataFrame (empty_df ).clear ()
51
+ if not os .path .exists ("data/processed/lodes.parquet" ):
52
+ for state , name , fips in self .codes .select (pl .col ("state_abbr" , "state_name" , "fips" )).rows ():
53
+ for year in range (2005 , 2020 ):
54
+ file_name = f"data/raw/lodes_{ state } _{ year } .csv.gz"
55
+ try :
56
+ value = self .process_block (file_name )
57
+ except :
58
+ print ("\033 [1;33mWARNING: \033 [0m" + f"Failed to process lodes_{ name } _{ state } _{ year } " )
59
+ continue
60
+ tmp_df = pl .DataFrame ([
61
+ pl .Series ("state" , [state ], dtype = pl .String ),
62
+ pl .Series ("fips" , [fips ], dtype = pl .String ),
63
+ pl .Series ("state_abbr" , [name ], dtype = pl .String ),
64
+ pl .Series ("year" , [year ], dtype = pl .Int64 ),
65
+ pl .Series ("avg_distance" , [value ], dtype = pl .Float64 ),
66
+ ])
67
+ lodes = pl .concat ([lodes , tmp_df ], how = "vertical" )
68
+ if self .debug :
69
+ print ("\033 [0;36mINFO: \033 [0m" + f"Finished processing lodes { name } for { year } " )
70
+ lodes .sort (by = ["state" , "year" ]).write_parquet ("data/processed/lodes.parquet" )
69
71
72
+ def process_block (self , path ) -> float :
70
73
df = pl .read_csv (path , ignore_errors = True )
71
74
df = df .rename ({"S000" : "total_jobs" }).select (pl .col ("w_geocode" , "h_geocode" , "total_jobs" ))
72
-
73
75
dest = self .blocks .rename ({"GEOID20" : "w_geocode" , "lon" : "w_lon" , "lat" : "w_lat" })
74
76
dest = dest .with_columns ((pl .col ("w_geocode" ).cast (pl .Int64 )).alias ("w_geocode" ))
75
-
76
77
origin = self .blocks .rename ({"GEOID20" : "h_geocode" , "lon" : "h_lon" , "lat" : "h_lat" })
77
78
origin = origin .with_columns ((pl .col ("h_geocode" ).cast (pl .Int64 )).alias ("h_geocode" ))
78
-
79
79
df = df .join (origin , on = "h_geocode" , how = "left" )
80
80
df = df .join (dest , on = "w_geocode" , how = "left" )
81
81
df = df .with_columns (
82
- (6371.01 * np .arccos (
83
- np .sin (pl .col ("h_lat" )) * np .sin (pl .col ("w_lat" )) +
84
- np .cos (pl .col ("h_lat" )) * np .cos (pl .col ("w_lat" )) *
85
- np .cos (pl .col ("h_lon" ) - pl .col ("w_lon" ))
86
- )). alias ( "distance" )
87
- )
88
-
82
+ (6371.01 * np .arccos (
83
+ np .sin (pl .col ("h_lat" )) * np .sin (pl .col ("w_lat" )) +
84
+ np .cos (pl .col ("h_lat" )) * np .cos (pl .col ("w_lat" )) *
85
+ np .cos (pl .col ("h_lon" ) - pl .col ("w_lon" ))
86
+ )
87
+ ). alias ( "distance" )
88
+ )
89
89
df = df .filter (pl .col ("distance" ) != np .nan )
90
90
df = df .select (pl .col ("distance" ).sum ().alias ("total_distance" ),
91
91
pl .col ("total_jobs" ).sum ().alias ("total_jobs" ))
92
92
value = df .select ((pl .col ("total_distance" ) / pl .col ("total_jobs" )).alias ("avg_distance" )).item ()
93
- return value
93
+ return value
94
+
95
+ if __name__ == "__main__" :
96
+ DataProcess ()
0 commit comments