-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathacme-zerossl.py
1237 lines (953 loc) · 47.8 KB
/
acme-zerossl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import hashlib
import hmac
import re
import time
import binascii
import base64
import logging
import os
import sys
import requests
from cryptography import x509
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec, utils, rsa
from cryptography.hazmat.backends import default_backend
from cryptography.x509.oid import NameOID
from cryptography.x509.oid import ExtendedKeyUsageOID
# 忽略 warnings.warn
requests.packages.urllib3.disable_warnings()
# 日志
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] [%(funcName)-24s]# %(message)s",
handlers=[logging.StreamHandler(sys.stdout)],
)
logger = logging.getLogger(__name__)
# TODO add RSA2048
DOMAIN_KEY_RSA = False
DOMAIN_KEY_RSA_LEN = '2048'
DEFAULT_CA = 'zerossl'
DEFAULT_TYPE = 'dns'
DEFAULT_ACCOUNT_KEY_LENGTH = 'ec-256'
DEFAULT_DOMAIN_KEY_LENGTH = 'ec-256'
ECC_NAME = 'prime256v1'
ECC_KEY_LEN = '256'
CA_ZEROSSL = 'https://acme.zerossl.com/v2/DV90'
ZERO_EAB_ENDPOINT = 'https://api.zerossl.com/acme/eab-credentials-email'
CA_DIR = './ca/acme.zerossl.com/v2/DV90'
# 创建ca目录
if not os.path.exists(CA_DIR):
os.makedirs(CA_DIR)
# account.key
ACCOUNT_KEY = os.path.abspath(CA_DIR+'/'+'account.key')
# account.json
ACCOUNT_JSON = os.path.abspath(CA_DIR+'/'+'account.json')
# ca.conf
CA_EMAIL = 'thankyou@' + os.urandom(8).hex() + '.com' # 如果不输入 email=youremail,则随机生成
CA_CONF = os.path.abspath(CA_DIR+'/'+'ca.conf')
ACCOUNT_URL = '' # 在申请证书时要用
CA_EAB_KEY_ID = ''
CA_EAB_HMAC_KEY = ''
# 输入 domain=MAIN_DOMAIN
MAIN_DOMAIN = '' # 输入的第一个域名
ALT_DOMAINS = '' # 输入的其他域名
# 域名证书申请相关文件
CERT_KEY_PATH = '' # ./domain/domain.key
CSR_PATH = '' # ./domain/domain.csr
CSR_CONF_PATH = '' # ./domain/domain.csr.conf#acme.sh -> DOMAIN_SSL_CONF
DOMAIN_CONF_PATH = '' # ./domain/domain.conf"
CA_CERT_PATH = '' # ./domain/ca.cer
DOMAIN_CER_PATH = '' # ./domain/domain.cer#acme.sh -> CERT_PATH
FULLCHAIN_CER_PATH = '' # ./domain/fullchain.cer#acme.sh -> CERT_FULLCHAIN_PATH
# zerosslAcmeApi
NEW_NONCE = 'https://acme.zerossl.com/v2/DV90/newNonce'
NEW_ACCOUNT = 'https://acme.zerossl.com/v2/DV90/newAccount'
NEW_ORDER = 'https://acme.zerossl.com/v2/DV90/newOrder'
REVOKE_CERT = 'https://acme.zerossl.com/v2/DV90/revokeCert'
KEY_CHANGE = 'https://acme.zerossl.com/v2/DV90/keyChange'
# jwk信息不变
JWK = ''
# JWK_HEADER = String.format("\"{\"alg\": \"ES%s\", \"jwk\": %s}\"",ECC_KEY_LEN,jwk)
# 域名验证时共享
SHARED_NONCE = ''
# 域名txt记录
challs_for_domain = {}
txt_for_domain = {}
authrs_for_domain = {}
# thumbprint 信息不变
thumbprint = ''
###################################################################################
def new_nonce():
global SHARED_NONCE
if SHARED_NONCE:
return SHARED_NONCE
headers = {
'user-agent': 'acme.sh/3.0.8 (https://github.com/acmesh-official/acme.sh)',
'accept': '*/*',
'content-type': 'application/jose+json'}
payload = None
response = requests.request("HEAD", NEW_NONCE, headers=headers, data=payload, verify=False)
SHARED_NONCE = response.headers['replay-nonce'] # Replay-Nonce
logger.info(f'new_nonce: {SHARED_NONCE}')
logger.info(f'http_header: {response.headers}')
return SHARED_NONCE
def sign(data, private_key_path):
logger.info(f'数据签名: {data}')
# echo -n "$data" | openssl dgst -sign account.key -sha256 | openssl asn1parse -inform DER
# 从PEM文件加载ECC私钥
with open(private_key_path, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None,
backend=default_backend()
)
# 使用私钥和SHA256对数据进行签名
signature = private_key.sign(
data.encode(),
ec.ECDSA(hashes.SHA256())
)
# 将签名转换为ASN.1 DER编码
# 在cryptography中,签名结果已经是DER编码,无需额外转换
# 但是,为了与OpenSSL的asn1parse命令输出相匹配,我们可以解析DER编码的签名结构
der_signature_r, der_signature_s = utils.decode_dss_signature(signature)
der_signature_hex = binascii.hexlify(
der_signature_r.to_bytes(32, byteorder='big') + der_signature_s.to_bytes(32, byteorder='big'))
logger.info(f'签名结果: {der_signature_hex}')
return der_signature_hex
def sign_base64_url_replace(data, private_key_path):
signature = sign(data, private_key_path)
s64 = base64.urlsafe_b64encode(bytes.fromhex(signature.decode('ascii'))).decode('ascii').replace('=', '')
logger.info(f'签名结果: {s64}')
return s64
def read_config(key, conf_path):
"""
从指定配置文件中读取给定键对应的值。
:param key: 要查找的键
:param conf_path: 配置文件的路径
:return: 键对应的值,如果未找到则返回None
"""
logger.info(f'读取配置文件 {conf_path} 中的 {key}')
try:
with open(conf_path, 'r') as file:
for line in file:
# 去除行尾的换行符并分割字符串
parts = line.strip().split('=')
if len(parts) == 2 and parts[0].strip() == key:
# 假设等号右侧的值被单引号或双引号包围,去除引号
value = parts[1].strip("'\"")
return value
# 如果遍历完文件都没有找到键,则返回None
return None
except FileNotFoundError:
logger.error(f"!!! 文件 {conf_path} 未找到。")
return None
except Exception as e:
logger.error(f"!!! 读取配置文件时发生错误: {e}")
return None
def calc_thumbprint(jwk):
"""
acme.sh: openssl -> echo -n '$JWK' | tr -d ' ' | openssl dgst -sha256 -binary | openssl base64 -e -A
:param jwk:
:return:
"""
# 方法同account_key_hash
global thumbprint
if thumbprint:
return thumbprint
b_hash = hashlib.sha256(jwk.replace(' ', '').encode("utf8")).digest()
thumbprint = base64.urlsafe_b64encode(b_hash).decode().replace('=', '')
logger.info(thumbprint)
return thumbprint
def calc_txt_value(key_auth):
b_hash = hashlib.sha256(key_auth.replace(' ', '').encode("utf8")).digest()
txt_value = base64.urlsafe_b64encode(b_hash).decode().replace('=', '')
logger.info(txt_value)
return txt_value
# 判断域名格式是都正确 "^(?:[_a-z0-9](?:[_a-z0-9-]{0,61}[a-z0-9])?\\.)+(?:[a-z](?:[a-z0-9-]{0,61}[a-z0-9])?)?$"
def check_domain(domain):
# 替换通配符,方便判断域名格式是否正确
domain = domain.replace('*.', '_all_all_all_all')
return re.match("^(?:[_a-z0-9](?:[_a-z0-9-]{0,61}[a-z0-9])?\\.)+(?:[a-z](?:[a-z0-9-]{0,61}[a-z0-9])?)?$", domain)
def check_email(email):
return re.match(r'^[a-zA-Z0-9_!#$%&\'*+/=?`{|}~^.-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email)
###################################################################################
# ***********************************************************************************
# 初始化 zerossl api
def init_zerossl_api():
logger.info('>>> 初始化 zerossl api')
headers = {
'user-agent': 'acme.sh/3.0.8 (https://github.com/acmesh-official/acme.sh)',
'accept': '*/*'
}
payload = None
response = requests.request("GET", CA_ZEROSSL, headers=headers, data=payload, verify=False)
json_body = response.json()
logger.info(f'http_header: {response.headers}')
logger.info(f'json_body: {json_body}')
global NEW_NONCE, NEW_ACCOUNT, NEW_ORDER, REVOKE_CERT, KEY_CHANGE
NEW_NONCE = json_body['newNonce']
NEW_ACCOUNT = json_body['newAccount']
NEW_ORDER = json_body['newOrder']
REVOKE_CERT = json_body['revokeCert']
KEY_CHANGE = json_body['keyChange']
def init_account_info(args):
logger.info('>>> 初始化账户信息')
global CA_EMAIL
# 从 args 中获取邮箱
for arg in args:
if arg.startswith('--email'):
email = arg.split('=')[1]
# 检查域名格式
if check_email(email):
logger.info(f'邮箱: {email}')
CA_EMAIL = email
else:
logger.error(f'!!! 邮箱格式错误: {email}')
# sys.exit(1)
# 抛出异常
raise Exception(f'邮箱格式错误: {email}')
if not CA_EMAIL:
logger.error('!!! 请提供邮箱')
# sys.exit(1)
# 抛出异常
raise Exception('请提供邮箱')
return CA_EMAIL
# 创建账户私钥
def create_account_key(account_key_path):
logger.info('>>> 创建账户私钥')
if os.path.exists(account_key_path) and os.path.getsize(account_key_path) > 0:
logger.info(f'使用已有账户私钥: {account_key_path}')
return
# 生成一个P-256曲线的ECC私钥
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
logger.info(f'account.key: {pem}')
# 将PEM格式的私钥保存到文件
with open(account_key_path, "wb") as key_file:
key_file.write(pem)
if os.path.exists(account_key_path) and os.path.getsize(account_key_path) > 0:
logger.info(f'账户私钥保存成功: {account_key_path}')
else:
logger.error('!!! 账户私钥保存失败')
# sys.exit(1)
raise Exception('账户私钥保存失败')
# 计算 jwk
def calc_jwk(account_key_path):
global JWK
if JWK:
return JWK
logger.info('>>> 计算 jwk')
# 读取account_key
with open(account_key_path, 'r') as f:
account_key = f.read()
# 将account_key字符串转换为字节
pem_key = account_key.encode()
# 从PEM格式加载椭圆曲线私钥
private_key = serialization.load_pem_private_key(
pem_key, # PEM格式的密钥字节
password=None, # 如果密钥未加密,密码为None
backend=default_backend() # 默认的backend
)
# 确保加载的私钥是ECC类型的
if isinstance(private_key, ec.EllipticCurvePrivateKey):
# 获取公钥
public_key = private_key.public_key()
# 获取x和y坐标
public_numbers = public_key.public_numbers()
x = public_numbers.x
y = public_numbers.y
logger.info(f'x: {x}, y: {y}')
# 将x和y坐标转换为字节
x_bytes = x.to_bytes(32, 'big')
y_bytes = y.to_bytes(32, 'big')
# 将x和y坐标转换为Base64编码 urlsafe_b64encode 去掉=号
x64 = base64.urlsafe_b64encode(x_bytes).decode('utf-8').rstrip('=')
y64 = base64.urlsafe_b64encode(y_bytes).decode('utf-8').rstrip('=')
JWK = f'{{"crv": "P-256", "kty": "EC", "x": "{x64}", "y": "{y64}"}}'
# JWK = json.dumps({"crv": "P-256", "kty": "EC", "x": x64, "y": y64})
logger.info(f'JWK: {JWK}')
return JWK
# 计算账户私钥的哈希值
def calc_accout_key_hash(account_key_path):
logger.info('>>> 计算账户私钥的哈希值')
# 创建SHA-256哈希对象
hash_object = hashlib.sha256()
# 正确的
# 打开文件并读取二进制内容
with open(account_key_path, 'rb') as file:
# 逐块读取文件内容,以避免一次性加载大文件到内存
for chunk in iter(lambda: file.read(4096), b""):
hash_object.update(chunk)
# 获取二进制的散列值
binary_hash = hash_object.digest()
# 将二进制散列值编码为Base64
base64_hash = base64.b64encode(binary_hash).decode()
logger.info(base64_hash)
return base64_hash
# 获取账户的kid
def get_eab_kid():
logger.info('>>> 获取 eab_kid')
global CA_EMAIL, CA_EAB_KEY_ID, CA_EAB_HMAC_KEY
if os.path.exists(CA_CONF) and os.path.getsize(CA_CONF) > 0:
logger.info('使用已有 ca.conf 配置文件')
CA_EMAIL = read_config('CA_EMAIL', CA_CONF)
CA_EAB_KEY_ID = read_config('CA_EAB_KEY_ID', CA_CONF)
CA_EAB_HMAC_KEY = read_config('CA_EAB_HMAC_KEY', CA_CONF)
return read_config('CA_EAB_KEY_ID', CA_CONF)
headers = {
'user-agent': 'acme.sh/3.0.8 (https://github.com/acmesh-official/acme.sh)',
'accept': '*/*',
'content-type': 'application/x-www-form-urlencoded'
}
data_body = f'email={CA_EMAIL}'
response = requests.request("POST", "https://api.zerossl.com/acme/eab-credentials-email", headers=headers,
data=data_body, verify=False)
http_header = response.headers
json_body = response.json()
logger.info(f'http_header: {http_header} ')
logger.info(f'json_body: {json_body}')
CA_EAB_KEY_ID = json_body['eab_kid']
CA_EAB_HMAC_KEY = json_body['eab_hmac_key']
# 保存到 ca.conf
with open(CA_CONF, 'a') as f:
f.write(f"CA_EMAIL='{CA_EMAIL}'\n")
f.write(f"CA_EAB_KEY_ID='{CA_EAB_KEY_ID}'\n")
f.write(f"CA_EAB_HMAC_KEY='{CA_EAB_HMAC_KEY}'\n")
# 注册账户
def reg_account():
logger.info('>>> 注册账户')
if os.path.exists(ACCOUNT_JSON) and os.path.getsize(ACCOUNT_JSON) > 0:
logger.info(f'使用已有账户: {ACCOUNT_JSON}')
return
headers = {
'user-agent': 'acme.sh/3.0.8 (https://github.com/acmesh-official/acme.sh)',
'accept': '*/*',
'content-type': 'application/jose+json',
}
# inner protected & payload
eab_protected = f'{{"alg":"HS256","kid":"{CA_EAB_KEY_ID}","url":"{NEW_ACCOUNT}"}}'
eab_payload = ''
eab_protected64 = base64.urlsafe_b64encode(eab_protected.encode('utf-8')).decode('utf-8').replace('=', '')
eab_payload64 = base64.urlsafe_b64encode(calc_jwk(ACCOUNT_KEY).encode('utf-8')).decode('utf-8').replace('=', '')
eab_sign_t = f'{eab_protected64}.{eab_payload64}'
# keyhex = base64.urlsafe_b64decode(CA_EAB_HMAC_KEY + '==').hex()
keyhex = base64.urlsafe_b64decode(CA_EAB_HMAC_KEY + '==')
# 签名
eab_signature = base64.urlsafe_b64encode(
hmac.new(keyhex, eab_sign_t.encode('utf-8'), hashlib.sha256).digest()).decode(
'utf-8').replace('=', '')
external_binding = f',"externalAccountBinding":{{"protected":"{eab_protected64}", "payload":"{eab_payload64}", "signature":"{eab_signature}"}}'
email_sg = f'"contact": ["mailto:{CA_EMAIL}"], '
regjson = "{%s\"termsOfServiceAgreed\": true%s}" % (email_sg, external_binding)
logger.info(f'regjson: {regjson}')
# outer protected & payload
payload64 = base64.urlsafe_b64encode(regjson.encode('utf-8')).decode('utf-8').replace('=', '')
# {"nonce": "NvEMIkVbwc9nrWYvN7KWUmF6HwbXsG1ACYz7sE_lztw", "url": "https://acme.zerossl.com/v2/DV90/newAccount", "alg": "ES256", "jwk": {"crv": "P-256", "kty": "EC", "x": "degDMApXKlWtEvsigjAFuhqSuUCqMzE2M5pQmwxJDbk", "y": "Znb6T-XQ2J9RL-GdHui8_zf6MEeh-sW_aaTqRyKOTxk"}}
protected = f'{{"nonce": "{new_nonce()}", "url": "{NEW_ACCOUNT}", "alg": "ES{ECC_KEY_LEN}", "jwk": {calc_jwk(ACCOUNT_KEY)}}}'
protected64 = base64.urlsafe_b64encode(protected.encode('utf-8')).decode('utf-8').replace('=', '')
# 签名
signature = sign_base64_url_replace(f'{protected64}.{payload64}', ACCOUNT_KEY)
data_body = f'{{"protected":"{protected64}", "payload":"{payload64}", "signature":"{signature}"}}'
logger.info(f'data_body: {data_body}')
response = requests.request("POST", NEW_ACCOUNT, headers=headers, data=data_body, verify=False)
http_header = response.headers
json_body = response.text
global SHARED_NONCE
SHARED_NONCE = http_header['Replay-Nonce']
if (response.status_code > 300) or ('location' not in http_header):
logger.error(f'!!! 账户注册失败: {json_body}')
# 退出
# sys.exit(1)
raise Exception(f'账户注册失败: {json_body}')
# 保存到 account.json
with open(ACCOUNT_JSON, 'w') as f:
f.write(json_body)
# 保存到 ca.conf
with open(CA_CONF, 'a') as f:
account_url = http_header["Location"] if 'Location' in http_header else http_header["location"]
account_key_hash = calc_accout_key_hash(ACCOUNT_KEY)
f.write(f"ACCOUNT_URL='{account_url}'\n")
f.write(f"CA_KEY_HASH='{account_key_hash}'\n")
logger.info(f'账户注册成功: {json_body}')
logger.info(f'账户注册成功: {ACCOUNT_JSON}')
# 初始化域名信息
def init_domain_info(args):
logger.info('>>> 初始化域名信息')
# domain = str(domain.encode('idna'), 'utf-8')
# 从 args 中获取域名
# 遍历 args,获取域名
domains = []
for arg in args:
if arg.startswith('--domain') or arg.startswith('-d'):
d = arg.split('=')[1]
d = str(d.encode('idna'), 'utf-8')
# 检查域名格式
if check_domain(d):
domains.append(d)
else:
logger.error(f'!!! 域名格式错误: {d}')
# sys.exit(1)
raise Exception(f'域名格式错误: {d}')
if len(domains) == 0:
logger.error('!!! 未指定域名')
# sys.exit(1)
raise Exception('未指定域名')
# 第一个域名作为主域名
global MAIN_DOMAIN
MAIN_DOMAIN = domains[0]
# 其余域名拼接到 ALT_DOMAINS
global ALT_DOMAINS
ALT_DOMAINS = ','.join(domains[1:])
logger.info(f'主域名: {MAIN_DOMAIN}')
logger.info(f'多域名: {ALT_DOMAINS}')
# TODO add rsa
global DOMAIN_KEY_RSA
for arg in args:
if arg.startswith('--rsa'):
DOMAIN_KEY_RSA = True
logger.info('使用 RSA 密钥')
break
global CERT_KEY_PATH, CSR_PATH, CSR_CONF_PATH, DOMAIN_CONF_PATH, CA_CERT_PATH, DOMAIN_CER_PATH, FULLCHAIN_CER_PATH
# 创建目录,初始化路径
domain_root = MAIN_DOMAIN.replace('*.', '')
domain_home = os.path.abspath(domain_root)+('_rsa' if DOMAIN_KEY_RSA else '') # TODO add rsa
if not os.path.exists(domain_home):
os.makedirs(domain_home)
CERT_KEY_PATH = os.path.abspath(f'{domain_home}/{domain_root}.key')
CSR_PATH = os.path.abspath(f'{domain_home}/{domain_root}.csr')
CSR_CONF_PATH = os.path.abspath(f'{domain_home}/{domain_root}.csr.conf')
DOMAIN_CONF_PATH = os.path.abspath(f'{domain_home}/{domain_root}.conf')
CA_CERT_PATH = os.path.abspath(f'{domain_home}/ca.cer')
DOMAIN_CER_PATH = os.path.abspath(f'{domain_home}/{domain_root}.cer')
FULLCHAIN_CER_PATH = os.path.abspath(f'{domain_home}/fullchian.cer')
# 创建域名私钥
def create_domain_key(domain_key_path):
logger.info('>>> 创建域名私钥')
if os.path.exists(domain_key_path) and os.path.getsize(domain_key_path) > 0:
logger.info(f'使用已有域名私钥: {domain_key_path}')
return
# 生成一个P-256曲线的ECC私钥
private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
logger.info(f'domain.key: {pem}')
# 将PEM格式的私钥保存到文件
with open(domain_key_path, "wb") as key_file:
key_file.write(pem)
if os.path.exists(domain_key_path) and os.path.getsize(domain_key_path) > 0:
logger.info(f'域名私钥保存成功: {domain_key_path}')
else:
logger.error(f'!!! 域名私钥保存失败')
# sys.exit(1)
raise Exception('域名私钥保存失败')
# 创建域名私钥 RSA 2048
def create_domain_key_rsa(domain_key_path):
logger.info('>>> 创建域名私钥 RSA 2048')
if os.path.exists(domain_key_path) and os.path.getsize(domain_key_path) > 0:
logger.info(f'使用已有域名私钥: {domain_key_path}')
return
# 生成私钥
private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=2048,
backend=default_backend()
)
# 私钥序列化
pem = private_key.private_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL,
encryption_algorithm=serialization.NoEncryption()
)
logger.info(f'domain.key: {pem}')
# 将PEM格式的私钥保存到文件
with open(domain_key_path, "wb") as key_file:
key_file.write(pem)
if os.path.exists(domain_key_path) and os.path.getsize(domain_key_path) > 0:
logger.info(f'域名私钥保存成功: {domain_key_path}')
else:
logger.error(f'!!! 域名私钥保存失败')
# sys.exit(1)
raise Exception('域名私钥保存失败')
# 保存域名配置信息
def save_domain_conf(key, value):
with open(DOMAIN_CONF_PATH, 'a') as f:
f.write(f"{key}='{value}'\n")
logger.info(f"保存域名配置信息: {key}='{value}'")
# 申请证书前的操作
def on_before_issue():
if os.path.exists(DOMAIN_CONF_PATH) and os.path.getsize(DOMAIN_CONF_PATH) > 0:
logger.info(f'使用已有domain.conf: {DOMAIN_CONF_PATH}')
return
le_domain = MAIN_DOMAIN
le_alt = ALT_DOMAINS
le_webroot = 'dns'
le_api = CA_ZEROSSL
le_keylength = DOMAIN_KEY_RSA_LEN if DOMAIN_KEY_RSA else DEFAULT_DOMAIN_KEY_LENGTH # TODO add rsa
save_domain_conf('Le_Domain', le_domain)
save_domain_conf('Le_Alt', le_alt)
save_domain_conf('Le_Webroot', le_webroot)
save_domain_conf('Le_API', le_api)
save_domain_conf('Le_Keylength', le_keylength)
def send_new_order():
logger.info('>>> 发送新订单')
# 生成订单
# {"nonce": "QSUQafWTdd_mPDRczjKr9jSBxJ5dlsAtGlPveqERlMk", "url": "https://acme.zerossl.com/v2/DV90/newOrder", "alg": "ES256", "kid": "https://acme.zerossl.com/v2/DV90/account/7p*****GhreJrg"}
protected = f'{{"nonce": "{new_nonce()}", "url": "{NEW_ORDER}", "alg": "ES{ECC_KEY_LEN}", "kid": "{read_config("ACCOUNT_URL", CA_CONF)}"}}'
protected64 = base64.urlsafe_b64encode(protected.encode()).decode().replace('=', '')
# {"identifiers": [{"type":"dns","value":"yy.test.domain.com"},{"type":"dns","value":"*.yy.test.domain.com"},{"type":"dns","value":"zz.test.domain.com"},{"type":"dns","value":"*.zz.test.domain.com"}]}
# 从 ALT_DOMAINS 拼接多域名 {"type":"dns","value":"yy.test.domain.com"}
if ALT_DOMAINS:
domains = '{"type":"dns","value":"'+(MAIN_DOMAIN+','+ALT_DOMAINS).replace(',', '"},{"type":"dns","value":"')+'"}'
else:
domains = '{"type":"dns","value":"'+MAIN_DOMAIN+'"}'
payload = f'{{"identifiers": [{domains}]}}'
payload64 = base64.urlsafe_b64encode(payload.encode()).decode().replace('=', '')
# 签名
signature = sign_base64_url_replace(f'{protected64}.{payload64}', ACCOUNT_KEY)
data_body = f'{{"protected":"{protected64}", "payload":"{payload64}", "signature":"{signature}"}}'
logger.info(f'data_body: {data_body}')
headers = {
'user-agent': 'acme.sh/3.0.8 (https://github.com/acmesh-official/acme.sh)',
'accept': '*/*',
'content-type': 'application/jose+json',
}
response = requests.request("POST", NEW_ORDER, headers=headers, data=data_body, verify=False)
http_header = response.headers
json_body = response.json()
logger.info(f'http_header: {http_header}')
logger.info(f'json_body: {json_body}')
global SHARED_NONCE
SHARED_NONCE = http_header['Replay-Nonce']
# 保存到 domain.conf
Le_OrderFinalize = json_body['finalize']
save_domain_conf('Le_OrderFinalize', Le_OrderFinalize)
Le_LinkOrder = http_header['location']
save_domain_conf('Le_LinkOrder', Le_LinkOrder)
# 保存 authorizations
# {"status":"pending","expires":"2024-10-24T11:22:14Z","identifiers":[{"type":"dns","value":"yy.test.domain.com"},{"type":"dns","value":"*.yy.test.domain.com"},{"type":"dns","value":"zz.test.domain.com"},{"type":"dns","value":"*.zz.test.domain.com"}],"authorizations":["https://acme.zerossl.com/v2/DV90/authz/rgp_d5FDn1haYmATDZ2eSw","https://acme.zerossl.com/v2/DV90/authz/PIFj9tVWsBesDQ3x1wTJlw","https://acme.zerossl.com/v2/DV90/authz/8KqEh55E1PU7kpMLLWGqPQ","https://acme.zerossl.com/v2/DV90/authz/M-HvXbB91BKZf8zVZne_vg"],"finalize":"https://acme.zerossl.com/v2/DV90/order/gVgxWlURMo_AxeohKkVl5g/finalize"}
global authrs_for_domain
for i in range(len(json_body['authorizations'])):
domain = json_body['identifiers'][i]['value']
authorization = json_body['authorizations'][i]
authrs_for_domain[domain] = authorization
logger.info(f"authorization: {domain+'='+authorization}")
def get_each_authorization():
logger.info('>>> 获取每个授权')
Le_Vlist = ''
for domain, authorization in authrs_for_domain.items():
# {"nonce": "QjObzzTr2ag0klh9etfewPHyu5bM67xbtrdkzQDbez4", "url": "https://acme.zerossl.com/v2/DV90/authz/rgp_d5FDn1haYmATDZ2eSw", "alg": "ES256", "kid": "https://acme.zerossl.com/v2/DV90/account/7p*****GhreJrg"}
protected = f'{{"nonce": "{new_nonce()}", "url": "{authorization}", "alg": "ES{ECC_KEY_LEN}", "kid": "{read_config("ACCOUNT_URL", CA_CONF)}"}}'
protected64 = base64.urlsafe_b64encode(protected.encode()).decode().replace('=', '')
payload64 = ''
signature = sign_base64_url_replace(f'{protected64}.{payload64}', ACCOUNT_KEY)
data_body = f'{{"protected":"{protected64}", "payload":"{payload64}", "signature":"{signature}"}}'
logger.info(f'data_body: {data_body}')
headers = {
'user-agent': 'acme.sh/3.0.8 (https://github.com/acmesh-official/acme.sh)',
'accept': '*/*',
'content-type': 'application/jose+json'
}
response = requests.request("POST", authorization, headers=headers, data=data_body, verify=False)
http_header = response.headers
json_body = response.json()
logger.info(f'http_header: {http_header}')
logger.info(f'json_body: {json_body}')
global SHARED_NONCE
SHARED_NONCE = http_header['Replay-Nonce']
# 此处默认 status 为 pending(待定),不考虑其他情况
# {"identifier":{"type":"dns","value":"yy.test.domain.com"},"status":"pending","expires":"2024-08-25T11:22:13Z","challenges":[{"type":"http-01","url":"https://acme.zerossl.com/v2/DV90/chall/64plHUY3FyVgCCbCAmLKWQ","status":"pending","token":"zA1BLfiiF-B_EwdABf4ruGLIU8ynMi6PcuKkBmMmM6U"},{"type":"dns-01","url":"https://acme.zerossl.com/v2/DV90/chall/pshmanYqXNGZ3PeQE_CA9g","status":"pending","token":"B126WehfSpgJFBNOLX9lIH-2ofJQSRw9XLkTFlyICN8"}]}
# dns 的 token 和 challenge
token = chall = ''
for i in (json_body['challenges']):
if i['type'] == 'dns-01':
token = i['token']
chall = i['url']
logger.info(f"token, chall: {token}, {chall}")
break
# KEY_AUTHORIZATION
KEY_AUTHORIZATION = token + "." + calc_thumbprint(calc_jwk(ACCOUNT_KEY)) # token + "." + thumbprint
logger.info(f"KEY_AUTHORIZATION: {KEY_AUTHORIZATION}")
global txt_for_domain
txt_for_domain[domain] = calc_txt_value(KEY_AUTHORIZATION)
# Le_Vlist dvlist="$d$sep$keyauthorization$sep$uri$sep$vtype$sep$_currentRoot$sep$_authz_url"
# vlist='pc.test.domain.com#4iQ9TL3Kthwwl9frKMu2mb2IWLIQ5UPS9ZWcCSBE66s.WGpZWjxY_PWYqYH0lWnrgFEFynFF_VSzd4eL7MmK-4Y#https://acme.zerossl.com/v2/DV90/chall/9i-4dZO1uDtOivQEHLMdIQ#dns-01#dns#https://acme.zerossl.com/v2/DV90/authz/hr3DiLmg7uK87Kopf3Xd0A,'
sep = '#'
dvsep = ','
Le_Vlist += f'{domain}{sep}{KEY_AUTHORIZATION}{sep}{chall}{sep}dns-01{sep}dns{sep}{authorization}{dvsep}'
logger.info(f'Le_Vlist: {Le_Vlist}')
# 保存到 domain.conf
save_domain_conf('Le_Vlist', Le_Vlist)
# txt记录
logger.info('############################################################################')
logger.info('#### 请添加以下TXT记录到域名DNS解析中:')
for domain, txt in txt_for_domain.items():
logger.info(f'#### {"_acme-challenge."+domain.replace("*.","")}: {txt}')
logger.info('############################################################################')
def continue_verify(domain_conf_path):
logger.info('>>> 继续验证')
# 1 从 domain.conf 文件中获取 challenges authrs 信息
vlist = read_config('Le_Vlist', domain_conf_path)
logger.info(f'vlist: {vlist}')
domain_infos = vlist.split(',')[:-1]
for domain_info in domain_infos:
info = domain_info.split('#')
logger.info(f'info: {info}')
authrs_for_domain[info[0]] = info[5]
challs_for_domain[info[0]] = info[2]
logger.info(f'authrs_for_domain: {authrs_for_domain}')
logger.info(f'challs_for_domain: {challs_for_domain}')
# 2 challenge
for domain, chall_url in challs_for_domain.items():
logger.info(f'>>> >>> 域名 {domain} 的 challenge')
protected = f'{{"nonce": "{new_nonce()}", "url": "{chall_url}", "alg": "ES{ECC_KEY_LEN}", "kid": "{read_config("ACCOUNT_URL", CA_CONF)}"}}' #.format("new_nonce()", chall_url, ECC_KEY_LEN, read_config("ACCOUNT_URL", CA_CONF))
protected64 = base64.urlsafe_b64encode(protected.encode()).decode().rstrip("=")
payload64 = base64.urlsafe_b64encode('{}'.encode()).decode().rstrip("=")
signature = sign_base64_url_replace(protected64 + '.' + payload64, ACCOUNT_KEY)
data_body = f'{{"protected": "{protected64}", "payload": "{payload64}", "signature": "{signature}"}}' #.format(protected64, payload64, signature)
logger.info(f'data_body: {data_body}')
headers = {
'user-agent': 'acme.sh/3.0.8 (https://github.com/acmesh-official/acme.sh)',
'accept': '*/*',
'content-type': 'application/jose+json'
}
response = requests.request("POST", chall_url,
headers=headers, data=data_body, verify=False)
http_header = response.headers
json_body = response.json()
logger.info(f'http_header: {http_header}')
logger.info(f'json_body: {json_body}')
global SHARED_NONCE
SHARED_NONCE = http_header['Replay-Nonce']
status = json_body['status']
# status: processing, invalid, valid
if status == 'invalid':
logger.error(f'!!! challenge 失败: {json_body}')
raise Exception(f'challenge 失败: {json_body}')
elif status == 'valid':
logger.info(f'域名 {domain} 验证成功')
continue
elif status == 'processing':
# 3 验证 dns 记录
logger.info('>>> >>> >>> 验证 DNS 记录')
while not status == 'valid':
auth_url = authrs_for_domain[domain]
protected = f'{{"nonce": "{new_nonce()}", "url": "{auth_url}", "alg": "ES{ECC_KEY_LEN}", "kid": "{read_config("ACCOUNT_URL", CA_CONF)}"}}'
protected64 = base64.urlsafe_b64encode(protected.encode()).decode().rstrip("=")
payload64 = ''
signature = sign_base64_url_replace(protected64 + '.' + payload64, ACCOUNT_KEY)
data_body = f'{{"protected": "{protected64}", "payload": "{payload64}", "signature": "{signature}"}}'
headers = {
'user-agent': 'acme.sh/3.0.8 (https://github.com/acmesh-official/acme.sh)',
'accept': '*/*',
'content-type': 'application/jose+json'
}
response = requests.request("POST", auth_url,
headers=headers, data=data_body, verify=False)
http_header = response.headers
json_body = response.json()
logger.info(f'http_header: {http_header}')
logger.info(f'json_body: {json_body}')
SHARED_NONCE = http_header['Replay-Nonce']
status = json_body['status']
if status == 'invalid':
logger.error(f'!!! 验证失败: {json_body}')
raise Exception(f'验证失败: {json_body}')
logger.info('>>> >>> >>> >>> 等待 10s...')
time.sleep(10)
else:
logger.error(f'!!! challenge未知状态: {status}')
raise Exception(f'challenge未知状态: {status}')
def create_csr(domain_key_path,domain_conf_path, csr_conf_path, csr_path):
logger.info('>>> 生成 CSR')
# 1 创建 domain.csr.conf
if os.path.exists(csr_path) and os.path.getsize(csr_path) > 0:
logger.info(f'使用已有 domain.csr: {csr_path}')
return
# 新建 domain.csr 空文件
with open(csr_conf_path, 'w') as f:
f.write('')
# # 读取 domain.conf
MAIN_DOMAIN = read_config('Le_Domain', domain_conf_path)
ALT_DOMAINS = read_config('Le_Alt', domain_conf_path)
if ALT_DOMAINS:
domains = f'{MAIN_DOMAIN},{ALT_DOMAINS}'
# subjectAltName=DNS:yy.test.domain.com,DNS:*.yy.test.domain.com,DNS:zz.test.domain.com,DNS:*.zz.test.domain.com
subject_alt_name = ','.join([f'DNS:{d}' for d in domains.split(',')])
else:
domains = MAIN_DOMAIN
subject_alt_name = f'DNS:{MAIN_DOMAIN}'
# 写入 domain.csr.conf
with open(csr_conf_path, 'w') as f:
f.write("[ req_distinguished_name ]\n[ req ]\n" +
"distinguished_name = req_distinguished_name\n" +
"req_extensions = v3_req\n" +
"[ v3_req ]\n" +
"extendedKeyUsage=serverAuth,clientAuth\n" +
f"\nsubjectAltName={subject_alt_name}"
)
# 2 生成 domain.csr
with open(domain_key_path, "rb") as key_file:
private_key = serialization.load_pem_private_key(
key_file.read(),
password=None,
backend=default_backend()
)
# 设置CSR的主题(Distinguished Name)
subject = x509.Name([
x509.NameAttribute(NameOID.COMMON_NAME, MAIN_DOMAIN),
])
# 设置CSR的扩展(这里只设置subjectAltName)
# 添加多个DNS名称
# alt_names = [x509.DNSName("yy.test.domain.com"), x509.DNSName("*.yy.test.domain.com"), x509.DNSName("zz.test.domain.com"), x509.DNSName("*.zz.test.domain.com")]
alt_names = [x509.DNSName(d) for d in domains.split(',')]
# 扩展
san = x509.SubjectAlternativeName(alt_names)
eku_extension = x509.ExtendedKeyUsage([
ExtendedKeyUsageOID.SERVER_AUTH, # TLS Web Server Authentication
ExtendedKeyUsageOID.CLIENT_AUTH, # TLS Web Client Authentication
])
# 创建CSR
csr = (
x509.CertificateSigningRequestBuilder()
.subject_name(subject)
.add_extension(eku_extension, critical=False)
.add_extension(san, critical=False)
.sign(private_key, hashes.SHA256(), default_backend())
)
# 将CSR保存到文件
logger.info(csr.public_bytes(serialization.Encoding.PEM).decode())
with open(csr_path, "wb") as f:
f.write(csr.public_bytes(serialization.Encoding.PEM))
if os.path.exists(csr_path) and os.path.getsize(csr_path) > 0:
logger.info(f'生成 CSR 成功: {csr_path}')
def finalize_order(domain_conf_path, csr_path):
logger.info('>>> 发送 csr')
order_finalize_url = read_config('Le_OrderFinalize', domain_conf_path)
link_order_url = read_config('Le_LinkOrder', domain_conf_path)
# 1 读取 csr
with open(csr_path, 'r') as f:
csr = f.read()
# 替换开头和结尾,使用正则替换
csr = csr.replace('-----BEGIN CERTIFICATE REQUEST-----', '').replace('-----END CERTIFICATE REQUEST-----', '').replace('\r ', '').replace('\n', '').replace(' ', '')
der = base64.urlsafe_b64encode(base64.b64decode(csr)).decode().rstrip('=')
# 2 发送 csr
logger.info(f'>>> >>> 发送 csr: {der}')
protected = f'{{"nonce": "{new_nonce()}", "url": "{order_finalize_url}", "alg": "ES{ECC_KEY_LEN}", "kid": "{read_config("ACCOUNT_URL", CA_CONF)}"}}'
payload = f'{{"csr": "{der}"}}'
protected64 = base64.urlsafe_b64encode(protected.encode()).decode().rstrip('=')
payload64 = base64.urlsafe_b64encode(payload.encode()).decode().rstrip('=')
signature = sign_base64_url_replace(f'{protected64}.{payload64}', ACCOUNT_KEY)
data_body = f'{{"protected": "{protected64}", "payload": "{payload64}", "signature": "{signature}"}}'
headers = {
'user-agent': 'acme.sh/3.0.8 (https://github.com/acmesh-official/acme.sh)',
'accept': '*/*',
'content-type': 'application/jose+json'
}
response = requests.request("POST", order_finalize_url, headers=headers, data=data_body, verify=False)
http_header = response.headers
json_body = response.json()
logger.info(f'http_header: {http_header}')
logger.info(f'json_body: {json_body}')
global SHARED_NONCE
SHARED_NONCE = http_header['Replay-Nonce']
status = json_body['status']
if status == 'processing':
link_order_url = http_header['Location']
elif status == 'valid':
link_cert = json_body['certificate']
save_domain_conf('Le_LinkCert', link_cert)
logger.info('\n######################################################\n#### 申请成功,开始下发证书 ####\n######################################################')
return # letenc.. 可能不需要下面的流程 send link order
else:
logger.error(f'超出本程序处理范围: {status},忽略错误,尝试下一步')
# raise Exception(f'超出本程序处理范围: {status}')
# 3 send link order, util status=valid
while not status == 'valid':
logger.info(f'>>> >>> >>> send link order')
protected = f'{{"nonce": "{new_nonce()}", "url": "{link_order_url}", "alg": "ES{ECC_KEY_LEN}", "kid": "{read_config("ACCOUNT_URL", CA_CONF)}"}}'
protected64 = base64.urlsafe_b64encode(protected.encode()).decode().replace('=', '')
payload64 = ''