python的系统底层操作6——硬断点 ∞
python前一篇分析了软断点的原理和它的实现。对于软断点,有时候会不起作用。因为有一些软件会检测自己在内存中运行的代码的CRC校验值,一旦检测失败就会进行“自我了断”,而软断点就是要修改内存中运行的代码,这对于这种软件是行不通的,只能用“硬断点”和内存断点。本文主要分析硬断点。
寄存器中有几个专门用来调试的寄存器,DR0–DR7,前面四个DR0到DR3用于存储所设硬件断点的地址,所以你最多只能设置4个硬件断点。DR4和DR5是保留不用的寄存器,DR6是调试状态寄存器,它记录了一次断点触发时相关的调试事件类型的信息,DR7则是一个开关,它用来控制DR0到DR3的激活状况,这个比较重要。硬件断点有三个触发条件:
当某一特定的内存地址被 1、读写时。2、写入时。3、执行时。
DR7的结构如下图:
前8位分别用来标识DR0到DR3是否激活,每个寄存器对应两位,分别表示L域和G域,即局部和全局断点,这两个其中只要有一个被设成1,断点就会起作用。8—15位比较神秘,我也不知道用来干嘛。16到31位,分成4个4位,每个4位中的前两位表示断点的类型,就是上面讲的三个触发条件,后两位表示所要设的断点指定的内存中的长度,这个一般设置1个字节长就可以了。
如上所述,三种触发条件是用两位来表示的,而它分道扬镳值如下:00表示执行,01表示写入,11表示读写。
后两们的长度,值对应关系为:00表示1字节,01表示2字节,11表示4字节。
硬件断点使用的是INT1,1号中断,该中断事件一般是用于硬件调试和单步调试事件。
代码实现部分:
其实代码很多部分都和前面的一样,硬件断点的事件也是在fire_debug_event这个函数里面触发的。它的ExceptionCode是EXCEPTION_SINGLE_STEP。
而当EXCEPTION_SINGLE_STEP触发时,也不一定是硬件断点,这个要根据Dr6寄存器来判断,如果Dr6的前四位有一个为1,且它对应的Dr0到Dr3也被激活,那么它就是硬件断点触发的,这时我们才让它继续执行。下面的代码中,我只让断点停一次,然后就把断点清掉了。
设置断点要分几个步骤来,首先判断Dr0到Dr3是否有可用的,有就把内存地址传给第一个可用的,然后就是设置Dr7,先把低8位中相关的位设1,激活,然后再把后16位中,相应的断点类型和内存长度设置上。而且对于要调试进程的每一个线程都要进行相同的这些设置。去掉断点的时候,也要把上面这些步骤都回滚一遍。
主要实现的文件:debugger.py
# -*- coding: utf-8 -*-
from ctypes import *
from debug_define import *
kernel32 = windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
self.h_process = None
self.context = None
self.breakpoints = {} # 所有的断点数
self.first_breakpoint = True
self.hw_breakpoints = {} # 所有的硬件断点数
def loadExe(self, exe_path):
creation_flags = DEBUG_PROCESS
startupinfo = STARTUPINFO() #相当于c的alloc这么大一个空间,初始化为0或NULL
processinfo = PROCESS_INFORMATION() #同上
startupinfo.dwFlags = STARTF_USESHOWWINDOW
startupinfo.wShowWindow = SW_HIDE
startupinfo.cb = sizeof(startupinfo)
# 开始创建
if kernel32.CreateProcessA(exe_path, #lpApplicationName
None, #lpCommandLine
None, #lpProcessAttributes
None, #lpThreadAttributes
None, #bInheritHandles
creation_flags, #dwCreationFlags
None, #lpEnvironment
None, #lpCurrentDirectory
byref(startupinfo), #lpStartupInfo
byref(processinfo)): #lpProcessInformation
print "进程ID:%d" % processinfo.dwProcessId
else:
print "进程创建失败。"
# 打开这个进程
self.h_process = self.open_process(processinfo.dwProcessId)
# 由id取得进程
def open_process(self, pid):
h_progress = kernel32.OpenProcess(PROCESS_ALL_ACCESS, False, pid)
return h_progress
# 枚举被调试进程的线程
def enumerate_threads(self):
thread_entry = THREADENTRY32()
thread_list = []
snapshot = kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD,
self.pid)
# 上面这个函数要注意help第一个字母是小写CreateToolhelp32Snapshot
if snapshot is not None:
thread_entry.dwSize = sizeof(thread_entry)
# 遍历
# 先取得第一个线程
next = kernel32.Thread32First(snapshot, byref(thread_entry))
# 循环获取属于本进程的线程
while next:
# print "------>%d" % thread_entry.th32OwnerProcessID
if thread_entry.th32OwnerProcessID == self.pid:
thread_list.append(thread_entry.th32ThreadID)
next = kernel32.Thread32Next(snapshot, # 获取下一个线程,参数一样
byref(thread_entry))
kernel32.CloseHandle(snapshot)
return thread_list
else:
return False
# 附加进程
def attach(self, pid):
self.h_process = self.open_process(pid)
# 调用DebugActiveProcess,附加到pid进程
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True # 可以开始调试了
self.pid = int(pid)
# self.run()
else:
print "进程附加失败!"
def run(self):
while self.debugger_active == True:
self.fire_debug_event()
# 可以通过Dr6中的BS标志位来判断这个单步事件的触发原
def del_hw_breakpoint(self, slot):
# 为调试进程的所有线程都移除断点
for tid in self.enumerate_threads():
context = self.get_thread_context(tid)
# 把Dr0——Dr3中相应的Dr清空
if slot == 0:
context.Dr0 = 0
elif slot == 1:
context.Dr1 = 0
elif slot == 2:
context.Dr2 = 0
elif slot == 3:
context.Dr3 = 0
# 把Dr7中相应的标志去掉
context.Dr7 &= ~(1 << (slot*2)) # 把低8位中相应的值清空
context.Dr7 &= ~(3 << (16+slot*4)) # 把Dr7中高16中相应的触发条件标志去掉
context.Dr7 &= ~(3 << (16+slot*4)) # 把Dr7中高16中相应的长度标志去掉
# 改完,写回
h_thread = self.open_thread(tid)
kernel32.SetThreadContext(h_thread, byref(context))
# 上面已对所有相关的线程进行处理了,接着把它清掉
del self.hw_breakpoints[slot]
return True
def exception_handle_single_step(self):
# 如果是硬件断点触发的话,就会命中下面的一个if
if self.context.Dr6 & 0x1 and self.hw_breakpoints.has_key(0):
slot = 0
elif self.context.Dr6 & 0x2 and self.hw_breakpoints.has_key(1):
slot = 1
elif self.context.Dr6 & 0x4 and self.hw_breakpoints.has_key(2):
slot = 2
elif self.context.Dr6 & 0x8 and self.hw_breakpoints.has_key(3):
slot = 3
else: # 不是硬件断点的单步调试事件
continue_status = DBG_EXCEPTION_NOT_HANDLED
# 打印寄存器中的所有信息
print "slot:%d" % slot
self.print_cur_context()
# 触发一次就移除这个断点
if self.del_hw_breakpoint(slot):
continue_status = DBG_CONTINUE # 继续
print "硬件断点已移除"
return continue_status
# 等待触发调试事件。。
def fire_debug_event(self):
debug_event = DEBUG_EVENT()
continue_status = DBG_CONTINUE
# 接下去就是等待调试的事件被触发
if kernel32.WaitForDebugEvent(byref(debug_event), INFINITE): #一直等
# 到这里,说明调试的条件被触发了
# 。。。(在这里做些调试相关的事情)
self.debug_event = debug_event
self.h_thread = self.open_thread(debug_event.dwThreadId)
self.context = self.get_thread_context(h_thread=self.h_thread)
# print "被调试的进程:%d,线程:%d" % (debug_event.dwProcessId,debug_event.dwThreadId)
# print "0x%08x" % debug_event.dwDebugEventCode
if debug_event.dwDebugEventCode == EXCEPTION_DEBUG_EVENT:
# 获取异常的信息
exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
self.exception_addr = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
print "异常代码:%d,地址0x%08x" % (exception,self.exception_addr)
if exception == EXCEPTION_ACCESS_VIOLATION:
print "EXCEPTION_ACCESS_VIOLATION"
elif exception == EXCEPTION_BREAKPOINT:
print "EXCEPTION_BREAKPOINT"
elif exception == EXCEPTION_GUARD_PAGE:
print "EXCEPTION_GUARD_PAGE"
elif exception == EXCEPTION_SINGLE_STEP:
self.exception_handle_single_step()
#print "EXCEPTION_SINGLE_STEP"
# 这个调试事件处理结束,继续
# self.debugger_active = False
kernel32.ContinueDebugEvent(debug_event.dwProcessId,
debug_event.dwThreadId,
continue_status)
# 解除对进程的附加
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print "完成调试,退出..."
return True
else:
print "调试过程中出错..."
return False
# 打开线程
def open_thread(self, thread_id):
h_thread = kernel32.OpenThread(THREAD_ALL_ACCESS,
None,
thread_id)
if h_thread is not None:
return h_thread
else:
print "获取线程ID时出错!"
return False
# 得到线程的CONTEXT信息
def get_thread_context(self, thread_id=None, h_thread=None):
context = CONTEXT()
context.ContextFlags = CONTEXT_FULL | CONTEXT_DEBUG_REGISTERS
# 获得线程句柄
if h_thread is None:
self.h_thread = self.open_thread(thread_id)
if kernel32.GetThreadContext(self.h_thread, byref(context)):
# kernel32.CloseHandle(h_thread)
return context
else:
return False
# 打印寄存器信息(CONTEXT)
def print_cur_context(self):
thread_context = self.context
if thread_context:
# 打印信息
print "====================================="
print "[**] 线程ID: %d" % self.h_process
print "[**] EAX: 0x%08x" % thread_context.Eax
print "[**] EBX: 0x%08x" % thread_context.Ebx
print "[**] ECX: 0x%08x" % thread_context.Ecx
print "[**] EDX: 0x%08x" % thread_context.Edx
print "===="
print "[**] ESI: 0x%08x" % thread_context.Esi
print "[**] EDI: 0x%08x" % thread_context.Edi
print "===="
print "[**] EBP: 0x%08x" % thread_context.Ebp
print "[**] ESP: 0x%08x" % thread_context.Esp
print "[**] EIP: 0x%08x" % thread_context.Eip
print "====================================="
# 读取线程内存中的数据
def read_process_memory(self, addr, length):
data = ""
read_buf = create_string_buffer(length)
cnt = c_ulong(0)
if kernel32.ReadProcessMemory(self.h_process,
addr,
read_buf,
length,
byref(cnt)):
data += read_buf.raw
return data
else:
return False
# 往线程内存中写数据
def write_process_memory(self, addr, data):
cnt = c_ulong(0)
length = len(data)
c_data = c_char_p(data[cnt.value])
if kernel32.WriteProcessMemory(self.h_process,
addr,
c_data,
length,
byref(cnt)):
return True
else:
return False
# 设置断点
def set_hw_breakpoint(self, addr, length, condition):
if length not in(1, 2, 4): # 长度,减1后表示:0是1字节,1是2字节,3是4字节
return False
length -= 1;
# 检查触发的条件是否有效
if condition not in(HW_ACCESS, HW_EXECUTE, HW_WRITE):
return False
# 找到还没被用的硬件断点
if not self.hw_breakpoints.has_key(0): # Dr0
available = 0
elif not self.hw_breakpoints.has_key(1): # Dr1
available = 1
elif not self.hw_breakpoints.has_key(2): # Dr2
available = 2
elif not self.hw_breakpoints.has_key(3): # Dr3
available = 3
else:
return False
# 在调试进程的每个线程里面都进行R7的设置
for tid in self.enumerate_threads():
context = self.get_thread_context(tid)
# Dr0--Dr3其中一个中写入地址
if available == 0:
context.Dr0 = addr
elif available == 1:
context.Dr1 = addr
elif available == 2:
context.Dr2 = addr
elif available == 3:
context.Dr3 = addr
# 设置低8位,使Dr0--Dr3其中的一个有效(激活)
context.Dr7 |= 1 << (available*2) # 设置一位就可以
# 设置高16位中相应的触发条件信息
context.Dr7 |= condition << (16+available*4)
# 设置高16位中相应的长度
context.Dr7 |= length << (18+available*4)
# 到这里,context已经改完了,把它写回寄存器
h_thread = self.open_thread(tid)
kernel32.SetThreadContext(h_thread, byref(context))
# 把这个硬件记录到hw_breakpoints里面,字段为本函数的参数
self.hw_breakpoints[available] = (addr, length, condition)
return True
# 要下断点,得找一个地址,一般是一个函数的首地址
# 获取函数首地址的方法:
def get_func_addr(self, dllName, # dllName函数所在dll模块的名称
funcName): # funcName函数名称
handle = kernel32.GetModuleHandleA(dllName)
addr = kernel32.GetProcAddress(handle, funcName)
kernel32.CloseHandle(handle)
print "得到的地址:0x%08x" % addr
return addr
定义的文件:debug_define.py
# -*- coding: utf-8 -*-
from ctypes import *
# 定义window编程中常用的类型
WORD = c_ushort
DWORD = c_ulong
LPBYTE = POINTER(c_ubyte)
LPTSTR = POINTER(c_char)
HANDLE = c_void_p
PVOID = c_void_p
LPVOID = c_void_p
ULONG_PTR = c_ulong
SIZE_T = c_ulong
BYTE = c_ubyte
# 常量
DEBUG_PROCESS = 0x0001
CREATE_NEW_CONSOLE = 0x0010
PROCESS_ALL_ACCESS = 0x001F0FFF
THREAD_ALL_ACCESS = 0x001F03FF
INFINITE = 0xFFFFFFFF
DBG_CONTINUE = 0x00010002
STARTF_USESHOWWINDOW = 0x0001
SW_HIDE = 0
TH32CS_SNAPTHREAD = 0x00000004
CONTEXT_FULL = 0x00010007 # 得到所有的CPU寄存器
CONTEXT_DEBUG_REGISTERS = 0x00010010 # CPU的调试寄存器
# Exception
EXCEPTION_DEBUG_EVENT = 0x0001
EXCEPTION_ACCESS_VIOLATION = 0xC0000005
EXCEPTION_BREAKPOINT = 0x80000003
EXCEPTION_GUARD_PAGE = 0x80000001
EXCEPTION_SINGLE_STEP = 0x80000004
DBG_EXCEPTION_NOT_HANDLED = 0x80010001
# 自定义的硬件断点类型
HW_ACCESS = 0x0011
HW_EXECUTE = 0x0000
HW_WRITE = 0x0001
# CreateProcessA函数所需结构体
class STARTUPINFO(Structure):
_fields_ = [
("cb",DWORD),
("lpReserved",LPTSTR),
("lpDesktop",LPTSTR),
("lpTitle",LPTSTR),
("dwX",DWORD),
("dwY",DWORD),
("dwXSize",DWORD),
("dwYSize",DWORD),
("dwXCountChars",DWORD),
("dwYCountChars",DWORD),
("dwFillAttribute",DWORD),
("dwFlags",DWORD),
("wShowWindow",WORD),
("cbReserved2",WORD),
("lpReserved2",LPBYTE),
("hStdInput",HANDLE),
("hStdOutput",HANDLE),
("hStdError",HANDLE)
]
class PROCESS_INFORMATION(Structure):
_fields_ = [
("hProcess",HANDLE),
("hThread",HANDLE),
("dwProcessId",DWORD),
("dwThreadId",DWORD)
]
class EXCEPTION_RECORD(Structure):
pass
EXCEPTION_RECORD._fields_ = [
("ExceptionCode",DWORD),
("ExceptionFlags",DWORD),
("ExceptionRecord",POINTER(EXCEPTION_RECORD)),
("ExceptionAddress",PVOID),
("NumberParameters",DWORD),
("ExceptionInformation",ULONG_PTR*15) # 最多允许15条错误信息
]
class EXCEPTION_DEBUG_INFO (Structure):
_fields_ = [
("ExceptionRecord", EXCEPTION_RECORD),
("dwFirstChance",DWORD)
]
class DEBUG_EVENT_UNION(Union):
_fields_ = [
("Exception",EXCEPTION_DEBUG_INFO), #只用到这一个,下面的不用定义了
# ("CreateThread",CREATE_THREAD_DEBUG_INFO),
# ("CreateProcessInfo",CREATE_PROCESS_DEBUG_INFO),
# ("ExitThread",EXIT_THREAD_DEBUG_INFO),
# ("ExitProcess",EXIT_PROCESS_DEBUG_INFO),
# ("LoadDll",LOAD_DLL_DEBUG_INFO),
# ("UnloadDll",UNLOAD_DLL_DEBUG_INFO),
# ("DebugString",OUTPUT_DEBUG_STRING_INFO),
# ("RipInfo",RIP_INFO)
]
class DEBUG_EVENT(Structure):
_fields_ = [
("dwDebugEventCode",DWORD),
("dwProcessId",DWORD),
("dwThreadId",DWORD),
("u",DEBUG_EVENT_UNION)
]
class THREADENTRY32(Structure):
_fields_ = [
("dwSize",DWORD),
("cntUsage",DWORD),
("th32ThreadID",DWORD),
("th32OwnerProcessID",DWORD),
("tpBasePri",DWORD), # 本来是LONG,它和DWORD一样的位数
("tpDeltaPri",DWORD), # LONG
("dwFlags",DWORD)
]
# Used by the CONTEXT structure
class FLOATING_SAVE_AREA(Structure):
_fields_ = [
("ControlWord", DWORD),
("StatusWord", DWORD),
("TagWord", DWORD),
("ErrorOffset", DWORD),
("ErrorSelector", DWORD),
("DataOffset", DWORD),
("DataSelector", DWORD),
("RegisterArea", BYTE * 80),
("Cr0NpxState", DWORD),
]
class CONTEXT(Structure):
_fields_ = [
("ContextFlags", DWORD),
("Dr0", DWORD),
("Dr1", DWORD),
("Dr2", DWORD),
("Dr3", DWORD),
("Dr6", DWORD),
("Dr7", DWORD),
("FloatSave", FLOATING_SAVE_AREA),
("SegGs", DWORD),
("SegFs", DWORD),
("SegEs", DWORD),
("SegDs", DWORD),
("Edi", DWORD),
("Esi", DWORD),
("Ebx", DWORD),
("Edx", DWORD),
("Ecx", DWORD),
("Eax", DWORD),
("Ebp", DWORD),
("Eip", DWORD),
("SegCs", DWORD),
("EFlags", DWORD),
("Esp", DWORD),
("SegSs", DWORD),
("ExtendedRegisters", BYTE * 512)
]
测试文件,testMyDebugger.py
# -*- coding: utf-8 -*-
import debugger.debugger as debugger
from debugger.debug_define import HW_EXECUTE
deb = debugger.debugger()
pid = raw_input("请输入要调试(附加)的进程ID:")
deb.attach(int(pid))
printf_addr = deb.get_func_addr("msvcrt.dll", "printf")
deb.set_hw_breakpoint(printf_addr, 1, HW_EXECUTE)
deb.run()
deb.detach()