-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmycrypt.py
130 lines (108 loc) · 4.8 KB
/
mycrypt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
"""On-the-fly AES256 CTR encryption with file-like interface."""
from Crypto.Cipher import AES
from Crypto.Util import Counter
import hashlib, os
def sha1(s: object) -> bytes:
"""One-off sha1 hashing of bytes or a string (encoded as utf8)."""
m = hashlib.sha1()
if isinstance(s, str): m.update(s.encode('utf8'))
else: m.update(s)
return m.digest()
# Courtesy of Tom Gardiner at Teppen.io
# https://teppen.io/2018/10/23/aws_s3_verify_etags/
def calc_etag(infile, partsize=8):
"""Calculate AWS S3 Etag based on partsize."""
md5_digests = []
if partsize < 2**16: partsize *= 2**20
for chunk in iter(lambda: infile.read(partsize), b''):
md5_digests.append(hashlib.md5(chunk).digest())
if len(md5_digests)==1: return md5_digests[0].hex()
return hashlib.md5(b''.join(md5_digests)).hexdigest() + '-' + \
str(len(md5_digests))
class AESFile:
"""On-the-fly AES encryption (on read) and decryption (on write).
Uses CTR mode with 16 byte initial value (iv). When reading,
returns the iv first, then encrypted payload. On writing, first
16 bytes are assumed to contain the iv.
Does the bare minimum, you may get errors if not careful. See
Python's :class:`io.IOBase` for details on most methods.
Args:
filename (str): File to open for reading (encrypt on the fly)
or writing (decrypt on the fly)
mode (str): Either 'rb' or 'wb', just like with :func:`io.open`
key (bytes): Encryption/decryption key (32 bytes for AES256)
iv (bytes): Initial value (16 bytes), if not set uses os.urandom
Returns:
AESFile: File-like object
"""
def __initAES(self) -> None:
self.obj = AES.new(self.key, AES.MODE_CTR, counter=Counter.new(
128, initial_value=int.from_bytes(self.iv, byteorder='big')))
#print('Initialized AES with IV', self.iv.hex())
def __init__(self, filename: str, mode: str, key: bytes, iv: bytes=None) -> None:
"""Init the class. Documented in class docstring."""
if not mode in ('wb', 'rb'):
raise RuntimeError('Only rb and wb modes supported!')
self._pos = 0
self.key = key
self.mode = mode
self.fp = open(filename, mode)
if mode == 'rb':
self.iv = iv or os.urandom(16)
self.__initAES()
else: self.iv = bytearray(16)
def __enter__(self) -> None:
return self
def __exit__(self, type, value, traceback) -> None:
self.fp.close()
def write(self, data : bytes) -> int:
"""Write data and decrypt on the fly. First 16 bytes absorbed as iv."""
datalen = len(data)
if self._pos < 16:
ivlen = min(16-self._pos, datalen)
self.iv[self._pos:self._pos+ivlen] = data[:ivlen]
self._pos += ivlen
if self._pos == 16: self.__initAES() # ready to init now
data = data[ivlen:]
if data: self._pos += self.fp.write(self.obj.decrypt(data))
return datalen
def read(self, size: int=-1) -> bytes:
"""Read data and encrypt on the fly. First 16 bytes returned are iv."""
ivpart = b''
if self._pos < 16:
if size == -1: ivpart = self.iv
else:
ivpart = self.iv[self._pos:min(16, self._pos+size)]
size -= len(ivpart)
enpart = self.obj.encrypt(self.fp.read(size)) if size else b''
self._pos += len(ivpart) + len(enpart)
return ivpart + enpart
def tell(self) -> int:
"""Tell the current position.
Note that when reading, goes 16 bytes further than the file
being read, due to the fact that iv is injected to start.
"""
return self._pos
# only in read mode (encrypting)
def seek(self, offset: int, whence: int=0) -> None:
"""Seek to given position.
Only offset 0 is supported (relative to start, current position
or end depending on whence parameter). Otherwise dummy-encrypting
stuff might get really slow.
Args:
offset (int): Offset, has to be 0
whence (int): 0,1,2 for absolute,relative,end-based
Raises:
RuntimeError: If offset is nonzero
"""
if whence==0 and offset==self._pos: return # nop
if offset!=0: raise RuntimeError('Only seek(0, whence) supported')
self.fp.seek(0, whence) # offset=0 works for all whences
if whence==0: # absolute positioning, offset=0
self._pos = 0
self.__initAES()
elif whence==2: # relative to file end, offset=0
self._pos = 16 + self.fp.tell()
def close(self) -> None:
"""Close the file stream."""
self.fp.close()