-
Notifications
You must be signed in to change notification settings - Fork 6
/
Copy pathmain.py
288 lines (238 loc) · 10.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
import requests
import json
import re
import time
import os
from datetime import datetime, timedelta
import pandas as pd
# import pyarrow
# 用来把英文和数字翻译成中文存到文件名里。也许还有别的用途。
translation_table = {
'lastdate': '最后日期',
'historycurve': '迁徙规模指数',
'cityrank': '市百分比',
'provincerank': '省百分比',
'country': '国家',
'province': '省',
'city': '市',
'move_in': '迁入',
'move_out': '迁出',
# 如果你有其他省市数据需要获取,请修改这里
'360000': '江西省',
'360100': '南昌市',
'360700': '赣州市',
'360900': '宜春市',
'360800': '吉安市',
'361100': '上饶市',
'361000': '抚州市',
'360400': '九江市',
'360200': '景德镇市',
'360300': '萍乡市',
'360500': '新余市',
'360600': '鹰潭市',
}
class Types:
"""
存储我们已知和要请求的一些数据。
"""
# 请不要修改data_type
data_type = { # 'lastdate', # 虽然 lastdate 确实是一个接口,但是我们在每次爬取时
# 只需在最开始调用一次就可以了。所以我们在设计时排除之。
# 'historycurve', # 虽然 historycurve 确实是一个接口,但是这个接口会返回
# 所有日期的这个数据。所以只用调用一次就可以了
'cityrank',
'provincerank'}
# 请不要修改dt
dt = {'country',
'province',
'city'}
# 如果你有其他省市数据需要获取,请修改这里
region = {
# 使用Types.region.keys()来找到有哪些省份;使用Types.region["360000"]来查询它是省还是市。
"360000": "province",
'360100': 'city',
'360700': 'city',
'360900': 'city',
'360800': 'city',
'361100': 'city',
'361000': 'city',
'360400': 'city',
'360200': 'city',
'360300': 'city',
'360500': 'city',
'360600': 'city',
}
# 请不要修改move_type
move_type = {'move_in',
'move_out'}
def generate_date_range(start_date_str:str, end_date_str:str):
"""
输入两端的日期,输出包括这两天在内的所有日期。
:param start_date_str: 起始日期,长度为8、格式为年月日的字符串,如"20201231"。
:param end_date_str: 结束日期,格式同上。
:return: 一个list,包含从起始日期开始到结束日期为止的所有日期,格式同上。
"""
# 将输入的日期字符串转换为datetime对象
start_date = datetime.strptime(start_date_str, '%Y%m%d')
end_date = datetime.strptime(end_date_str, '%Y%m%d')
# 初始化日期列表,包含起始日期
date_list = [start_date.strftime('%Y%m%d')]
# 生成日期范围
current_date = start_date
while current_date < end_date:
current_date += timedelta(days=1)
date_list.append(current_date.strftime('%Y%m%d'))
return date_list
def get_timestamp():
"""
获得当前时间戳。
:return: 长度为13的时间戳
"""
return str(int(time.time() * 1000)) # * 1000获取毫秒级时间戳
def get_lastdate():
"""
从API获得目前API有的最晚的日期。
:return: 长度为8、格式为年月日的字符串,如"20201231"。
"""
url = f'http://huiyan.baidu.com/migration/lastdate.jsonp'
response = requests.get(url)
json_data_match = re.search(r'{.*}', response.text)
if json_data_match:
json_data_str = json_data_match.group()
# Decode JSON string, automatically handling Unicode characters
json_data = json.loads(json_data_str)
return_value = json_data['data']['lastdate']
print(f"成功获取到API最晚日期:{return_value}")
return return_value
else:
print('获取API最晚日期时失败')
def get_historycurve(region: str, move_type: str):
"""
从API获得一个区域(省或市)的迁入或迁出的迁徙规模指数。historycurve API会直接返回所有日期。存储文件到文件名。
:param region: 行政区域编码,如300001。
:param move_type: 'move_in' 或 'move_out'
:return: 成功返回 True,失败返回 False
"""
# 注意:这个文件要覆写,因为每次抓到的可能不一样。不需要判断是否已存在文件。
output_file = f'./data/{translation_table[region]}_{translation_table[move_type]}_{translation_table["historycurve"]}.json'
print(f"正在获取"
f' {translation_table[region]}'
f' {translation_table[move_type]} 的'
f' {translation_table["historycurve"]}')
url = f"http://huiyan.baidu.com/migration/historycurve.jsonp?dt={Types.region[region]}&id={region}&type={move_type}"
response = requests.get(url)
json_data_match = re.search(r'{.*}', response.text)
if json_data_match:
json_data_str = json_data_match.group()
# Decode JSON string, automatically handling Unicode characters
json_data = json.loads(json_data_str)
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
print('成功')
return True
else:
print('失败')
return False
# http://huiyan.baidu.com/migration/cityrank.jsonp?
# dt=country&id=0&type=move_in&date=20240130&callback=jsonp_1706674779082_8488633
def download_and_convert_jsonp(data_type: str, dt: str, id: str, move_type: str, date: str, callback: str):
"""
从百度慧眼的API上获得数据并存到本地。
:param data_type: 就是你要获取的信息类型,有且仅有下列几个值:
lastdate,返回最后有数据的日期(昨天);
historycurve,返回从20190112到lastdate为止的迁徙规模指数;
cityrank,返回迁入xx来源地(城市级别);
provincerank,返回xx来源地(省份级别)。
:param dt: 级别。可选的值有:country,province,city。
:param id: 哪里的数据(若dt为国家则不需要id)。参照"行政区划乡镇清单201910"表。级别要和上面的id对应。
:param move_type: 迁入还是迁出。可选的值有:move_in, move_out。
:param date: 要的日期,格式:八位数的年月日,如20240131。data_type为lastdate或historycurve时不需要这个参数。
:param callback: 没什么用,只是为了确认返回值是我这次要的数据罢了。给个时间戳就行。
:param output_file: 该文件要存到的文件名。
:return: boolean 成功返回True,失败返回False
"""
# TODO: 如果要爬大量数据,搞多线程
output_file = f'./data/{translation_table[id]}_' \
f'{translation_table[move_type]}_' \
f'{translation_table[data_type]}_' \
f'{date}.json'
if os.path.exists(output_file):
print("文件已存在")
return False
# Construct the full URL with query parameters
url = f'http://huiyan.baidu.com/migration/{data_type}.jsonp?dt={dt}&id={id}&type={move_type}&date={date}&callback={callback}'
response = requests.get(url)
# Extract JSON data part using regular expression
json_data_match = re.search(r'{.*}', response.text)
if json_data_match:
json_data_str = json_data_match.group()
# Decode JSON string, automatically handling Unicode characters
json_data = json.loads(json_data_str)
# 如果要导出 CSV 文件,可以在这里操作json_data。可以修改下面的代码:
# if json_data['errmsg'] == 'SUCCESS':
# data = json_data['data']['list']
# file = pd.DataFrame({id: data}).T
# csv_filename = f'./data/{translation_table[id]}_' \
# f'{translation_table[move_type]}_' \
# f'{translation_table[data_type]}_' \
# f'{date}.csv'
# file.to_csv(csv_filename, encoding='utf-8')
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(json_data, f, ensure_ascii=False, indent=2)
print('成功')
else:
print('失败')
def get_data(region: str, data_type: str, move_type: str, date: str):
timestamp = get_timestamp()
print(f"正在获取"
f' {translation_table[region]}'
f' {translation_table[move_type]} 的'
f' {translation_table[data_type]},日期为{date}')
download_and_convert_jsonp(
data_type=data_type,
dt=Types.region[region],
id=region,
move_type=move_type,
date=date,
callback=timestamp
)
def get_by_date(from_date, to_date, lastdate):
# 将日期字符串转换为datetime对象
from_date_dt = datetime.strptime(from_date, '%Y%m%d')
to_date_dt = datetime.strptime(to_date, '%Y%m%d')
lastdate_dt = datetime.strptime(lastdate, '%Y%m%d')
print("正在检查日期输入合法性")
# 检查from_date是否小于'20190112'
earlydate = datetime(2019, 1, 12)
if from_date_dt < earlydate:
print("你的起始日期比API目前有的最早日期还早。已经将起始日期设置成API的最早日期,并尝试继续运行。")
from_date_dt = earlydate
# 检查to_date是否大于lastdate
if to_date_dt > lastdate_dt:
print("你的结束日期比API目前有的最晚日期还晚。已经将结束日期设置成API的最晚日期,并尝试继续运行。")
to_date_dt = lastdate_dt
# 检查输入合法性
if from_date_dt > to_date_dt:
print("你的起始日期需要比结束日期早。")
return
print("成功")
# 转换为字符串,以便生成日期范围
from_date = from_date_dt.strftime('%Y%m%d')
to_date = to_date_dt.strftime('%Y%m%d')
for region in Types.region.keys():
for move_type in Types.move_type:
get_historycurve(region, move_type) # 获取他的迁徙规模指数
for date in generate_date_range(from_date, to_date): # 日期
for region in Types.region.keys():
for move_type in Types.move_type:
for data_type in Types.data_type:
get_data(region, data_type, move_type, date)
print("完成")
if __name__ == "__main__":
# 检查文件夹是否存在
if not os.path.exists('data'):
# 如果不存在,创建文件夹
os.makedirs('data')
lastdate = get_lastdate() # 接口给到的最晚日期
# 如果你需要修改起始和结束日期,请修改这里
get_by_date('20240131', '20240131', lastdate)