-
Notifications
You must be signed in to change notification settings - Fork 1
/
pe_analyzer.py
146 lines (131 loc) · 4.25 KB
/
pe_analyzer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from pwn import u32
import pefile
import os
import subprocess
from analyzer.config import *
from collections import namedtuple
from analyzer import packer1_unpacker_x86
from analyzer import packer1_unpacker_x64
class pe_file():
def __init__(self,name=None,pe=None,stirng=None):
self._name = None
self._pe = None
self._strings = None
@property
def name(self):
return self._name
@name.setter
def name(self,n):
self._name = n
@property
def pe(self):
return self._pe
@pe.setter
def pe(self,p):
self._pe = p
@property
def strings(self):
return self._strings
@strings.setter
def strings(self,s):
self._strings = s
class pe_analyzer():
def __init__(self):
self.files = {}
self.result = {}
@staticmethod
def repair_one_file(path):
with open(path, "rb") as f:
data = f.read()
data = bytearray(data)
data[0:2] = b"MZ"
pe_addr = u32(data[0x3c:0x3c+4])
data[pe_addr:pe_addr+2] = b"PE"
return data
@staticmethod
def packer_scanner(data):
needCheck = False
for cha in PACKERS:
if cha.encode() in data:
needCheck = True
if not(needCheck):
return 0,None
pe = pefile.PE(data=data)
for section in pe.sections:
for cha in PACKERS:
if cha.encode() in section.Name:
# print("PACKER: %d" % PACKERS[cha])
return PACKERS[cha],pe
return 0,None
@staticmethod
def unpack(name,pe,packer,rawdata):
out_path = os.path.join(TMP,name)
if packer == PACKERS["UPX"]:
the_last_upx_section_addr = None
for section in pe.sections:
if "UPX".encode() in section.Name:
the_last_upx_section_addr = section.VirtualAddress
if the_last_upx_section_addr == None:
print("section not found: %s" % path)
assert False
pe.NT_HEADERS.OPTIONAL_HEADER.DATA_DIRECTORY[1].VirtualAddress = the_last_upx_section_addr
pe.NT_HEADERS.OPTIONAL_HEADER.DATA_DIRECTORY[1].Size = 0xb4
elif packer == PACKERS["MPRESS"]:
pass
else:
# elif packer == PACKERS["\x20\x20\x20\x00\x20\x20\x20\x20"]:
print("PACKER1: " + name)
# with open(out_path+".bak", "wb") as f:
# f.write(rawdata)
if pe.FILE_HEADER.Machine == 0x8664:
unpack_data = packer1_unpacker_x64.emu_unpacker(rawdata)
else:
unpack_data = packer1_unpacker_x86.emu_unpacker(rawdata)
# assert unpack_data != None and "unpack_data is None!!!!"
# print(unpack_data)
with open(out_path, "wb") as f:
f.write(unpack_data)
return out_path
# else:
# pass
pe.write(out_path)
os.system("%s %s -o %s" % (RET, out_path, out_path))
return out_path
def parse_one_file(self,path):
name = path.split("/")[-1]
data = self.repair_one_file(path)
try:
packer,pe = self.packer_scanner(data)
except:
packer,pe = None,None
if packer:
tmp = self.unpack(name,pe,packer, data)
with open(tmp,"rb") as f:
data = f.read()
else:
tmp = os.path.join(TMP,name)
with open(tmp, "wb") as f:
f.write(data)
res_aes = self.check_aes(data)
if res_aes:
subprocess.check_output(['rm',tmp])
return (name,1)
else:
s = subprocess.check_output(['strings','-el',tmp])
s += subprocess.check_output(['strings',tmp])
subprocess.check_output(['rm',tmp])
res_str = self.check_strings(s)
if res_str: return (name,1)
else: return (name,0)
@staticmethod
def check_aes(data):
if aes_const in data:
return 1
return 0
@staticmethod
def check_strings(strings):
for pattern in list(patterns.keys()):
l = pattern.findall(strings.decode())
if l:
return 1
return 0