ctypes
module ctypes
allows you to interact with the data and functions of the C language (that is, a large number of dynamic libraries with C interface). Thus, nothing interferes with intercepting a process function and sending it to Python with a method wrapped in a C-callback using ctypes
.gdb.Command
class. When using a command in GDB, the method invoke(argument, from_tty)
will be called.gdb.Parameter
. In the sample article, it is used to set the name of the file with interception functions.PID
process and loading the module is convenient to do immediately when GDB starts. gdb -ex 'attach PID' -ex 'source pyinject.py' -ex 'set hookfile hook.py'
The field of this debugged process is stopped and the interactive command line GDB is launched, in which a new command “pyinject” will be available.gdb.execute(command, from_tty, to_string)
, which allows you to execute an arbitrary GDB command and get its output as a string. out = gdb.execute("info registers", False, True)
Also useful is gdb.parse_end_eval(expression)
, which evaluates the expression and returns the result in the form gdb.Value
.dlopen
in the context of the target process.call
command in gdb.execute
, or gdb.parse_and_eval
: # pyinject.py gdb.execute('call dlopen("libpython2.7.so", %d)' % RTLD_LAZY) assert long(gdb.history(0)) handle = gdb.parse_and_eval('dlopen("libpython2.7.so", %d)' % RTLD_LAZY) assert long(handle)
# pyinject.py gdb.execute('call PyEval_InitThreads()') gdb.execute('call Py_Initialize()')
The first call creates a GIL (global interpreter lock), the second prepares the Python C-API for use. # pyinject.py fp = gdb.parse_and_eval('fopen("hook.py", "r")') assert long(fp) != 0 pyret = gdb.parse_and_eval('PyRun_AnyFileEx(%u, "hook.py", 1)' % fp)
PyRun_AnyFileEx
executes code from a file in the context of the __main__
module.Py_AddPendingCall
).Hook
class that performs the actual interception.open
function of the standard library, we print its arguments and return the result of calling the original function stored in the orig
field. # hook.py @hook(symbol='open', ctype=CFUNCTYPE(c_int, c_char_p, c_int)) def python_open(fname, oflag): print "open: ", fname, oflag return python_open.orig(fname, oflag)
@hook
decorator takes two parameters:ctypes
specifying the type of function # hook.py def hook(symbol, ctype): def deco(func): Hook.register(symbol, ctype, func) return func return deco
register
method creates an instance of the class and stores it in the all_hooks
dictionary. Thus, after the file has been executed, thanks to the decorators in Hook.all_hooks
will be all the information about the available functions of the interceptors. # hook.py class Hook(object): all_hooks = {} @staticmethod def register(symbol, *args): Hook.all_hooks[symbol] = Hook(symbol, *args)
Hook
class responsible for intercepting # hook.py class Hook(object): @staticmethod def hook(symbol, *args): h = Hook.all_hooks[symbol] if h.active: return h.install(*args)
In *args
, additional information about the function being intercepted is passed here. Which one depends on the method of interception.%rip
register (address of the current command).open
function in GDB: 0x7f6cc8aa83e0 <open64+0>: 83 3d ed 33 2d 00 00 cmpl $0x0,0x2d33ed(%rip) 0x7f6cc8aa83e7 <open64+7>: 75 10 jne 0x7f6cc8aa83f9 <open64+25> 0x7f6cc8aa83e9 <__open_nocancel+0>: b8 02 00 00 00 mov $0x2,%eax 0x7f6cc8aa83ee <__open_nocancel+5>: 0f 05 syscall
cmpl $0x0,0x2d33ed(%rip)
" to another address, then the relative address 0x2d33ed(%rip)
, which now points to 0x7f6cc8d7b7d4
, will point to another place (hello SIGSEGV).0x2d33ed(%rip)
signed 32-bit)%rip
in cmpl
0x7f6cc8aa83e9
already __open_nocancel
. This means that our springboard should be no farther than 2GB from the beginning of the open
to allow a 32-bit transition (all 64-bit transitions are longer than 9 bytes).gdb.execute()
), nothing prevents you from correctly implementing a trampoline hook, but for the sake of simplicity, this article will use a simple hook. 0x7f6cc8aa83e0 <open64+0>: e9 1b 6c 55 37 jmp 0x7f6cfffff000
Transition to 0x7f6cc8aa83e0 + 0x37556c1b + 5 = 0x7f6cfffff000
0x00007f6cc8aa83e0 <open64+0>: ff 25 1a 6c 55 37 jmpq *0x37556c1a(%rip)
Here in the 0x7f6cc8aa83e0 + 0x37556c1a + 6 = 0x7f6cfffff000
address of the absolute transition is stored. # hook.py class Hook(object): @staticmethod def get_indlongjmp(srcaddr, proxyaddr): s = struct.pack('=BBl', 0xff, 0x25, proxyaddr - srcaddr - 6) return map(ord, s)
get_indlongjmp
returns the code for jumping from the address srcaddr
to the address stored in QWORD at proxyaddr
Hook
class. The install
method gets the address of the original address
function and the address of the proxyaddr
secondary zone. After that, it rewrites the beginning of the function (after saving it in self.code
) by switching to an interceptor # hook.py def install(self, address, proxyaddr): self.address = address self.proxyaddr = proxyaddr proxymemory = (c_void_p * 1).from_address(self.proxyaddr) proxymemory[0] = Hook.cast_to_void_p(self.cfunc) self.jmp = self.get_indlongjmp(self.address, self.proxyaddr) self.memory = (c_ubyte * len(self.jmp)).from_address(self.address) self.code = list(self.memory) self.patchmem(self.jmp) self.pyfunc.orig = self.origfunc() self.active = True
patchmem
overwrites the beginning of the original function with data from src
# hook.py def patchmem(self, src): for i in range(len(src)): self.memory[i] = src[i]
origfunc
wraps the function call in a code that removes and sets the transition to the interceptor. # hook.py def origfunc(self): ofunc = self.ctype(self.address) def wrap(*args): self.patchmem(self.code) val = ofunc(*args) self.patchmem(self.jmp) return val return wrap
Hook.hook(symbol, address, proxyaddr)
on the Python side of the GDB module.open
" line = gdb.execute('info address %s' % "open" False, True) m = re.match(r'.*?(0x[0-9a-f]+)', line) addr = int(m.group(1), 16)
gdb.execute("thread apply all backtrace")
addr
prot = PROT_READ | PROT_WRITE | PROT_EXEC flags = MAP_PRIVATE | MAP_ANONYMOUS maddr = gdb.parse_and_eval('(void*)mmap(0x%x, %d, %d, %d, -1, 0)\n' % (addr | 0x7FFFFFFF, 4096, prot, flags)) maddr = (long(maddr) & 0x00000000FFFFFFFF) | (addr & 0xFFFFFFFF00000000)
(addr | 0x7FFFFFFF)
uses the undocumented mmap
property to (addr | 0x7FFFFFFF)
memory with the address less than the desired one.gdb.execute('info proc mappings', False, True)
, find the hole closest to addr in the address space and wipe out mmap with MAP_FIXED
. And of course it is not necessary to allocate a whole page of memory for each intercepted function. gdb.parse_and_eval('mprotect(0x%x, %u, %d)' % (addr & -0x1000, 4096*2, prot))
Hook.hook
via PyRun_SimpleString
pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.hook(\\"open\\", 0x%x, 0x%x)")' % (addr, maddr))
open
" in the target process will be intercepted and sent to python_open
from hook.py. # pyinject.py import re import os RTLD_LAZY = 1 PROT_READ = 0x1 PROT_WRITE = 0x2 PROT_EXEC = 0x4 MAP_PRIVATE = 0x2 MAP_FIXED = 0x10 MAP_ANONYMOUS = 0x20 LIBPYTHON = 'libpython2.7.so' class ParamHookfile(gdb.Parameter): instance = None def __init__(self, default=''): super(ParamHookfile, self).__init__("hookfile", gdb.COMMAND_NONE, gdb.PARAM_FILENAME) self.value = default ParamHookfile.instance = self def get_set_string(self): return self.value def get_show_string(self, svalue): return svalue class CmdHook(gdb.Command): instance = None def __init__(self): super(CmdHook, self).__init__("pyinject", gdb.COMMAND_NONE) self.initialized = False CmdHook.instance = self def complete(self, text, word): matching = [s[4:] for s in dir(self) if s.startswith('cmd_') and s[4:].startswith(text)] return matching def invoke(self, subcmd, from_tty): self.dont_repeat() if subcmd.startswith("hook"): self.cmd_hook(*gdb.string_to_argv(subcmd)) elif subcmd.startswith("unhook"): self.cmd_unhook(*gdb.string_to_argv(subcmd)) else: gdb.write('unknown sub-command "%s"' % subcmd) def cmd_hook(self, *args): self.initialize() if not self.initialized: return pyret = gdb.parse_and_eval('PyRun_SimpleString("print Hook")') if long(pyret) != 0: hookfile = ParamHookfile.instance.value if not os.path.exists(hookfile): gdb.write('Use "set hookfile <path>"\n') return fp = gdb.parse_and_eval('fopen("%s", "r")' % hookfile) assert long(fp) != 0 pyret = gdb.parse_and_eval('PyRun_AnyFileEx(%u, "%s", 1)' % (fp, hookfile)) if long(pyret) != 0: gdb.write('Error loading "%s"\n' % hookfile) return for symbol in args: try: line = gdb.execute('info address %s' % symbol, False, True) m = re.match(r'.*?(0x[0-9a-f]+)', line) if m: addr = int(m.group(1), 16) except gdb.error: continue prot = PROT_READ | PROT_WRITE | PROT_EXEC flags = MAP_PRIVATE | MAP_ANONYMOUS # | MAP_FIXED maddr = gdb.parse_and_eval('(void*)mmap(0x%x, %d, %d, %d, -1, 0)\n' % (addr | 0x7FFFFFFF , 4096, prot, flags)) maddr = (long(maddr) & 0x00000000FFFFFFFF) | (addr & 0xFFFFFFFF00000000) gdb.write("mmap = 0x%x\n" % maddr) if maddr == 0: continue gdb.parse_and_eval('mprotect(0x%x, %u, %d)' % (addr & -0x1000, 4096*2, prot)) pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.hook(\\"%s\\", 0x%x, 0x%x)")' % (symbol, addr, maddr)) if long(pyret) == 0: gdb.write('hook "%s" OK\n' % symbol) def cmd_unhook(self, *args): for symbol in args: pyret = gdb.parse_and_eval('PyRun_SimpleString("Hook.unhook(\\"%s\\")")' % (symbol)) if long(pyret) == 0: gdb.write('unhook "%s" OK\n' % symbol) def initialize(self): if self.initialized: return handle = gdb.parse_and_eval('dlopen("%s", %d)' % (LIBPYTHON, RTLD_LAZY)) if not long(handle): gdb.write('Cannot load library %s\n' % LIBPYTHON) return if not long(gdb.parse_and_eval('Py_IsInitialized()')): gdb.execute('call PyEval_InitThreads()') gdb.execute('call Py_Initialize()') self.initialized = True if __name__ == '__main__': ParamHookfile() CmdHook()
# hook.py import struct from ctypes import (CFUNCTYPE, POINTER, c_ubyte, c_int, c_char_p, c_void_p) class Hook(object): all_hooks = {} @staticmethod def cast_to_void_p(pointer): return CFUNCTYPE(c_void_p, c_void_p)(lambda x: x)(pointer) @staticmethod def register(symbol, *args): Hook.all_hooks[symbol] = Hook(symbol, *args) def __init__(self, symbol, ctype, pyfunc): self.symbol = symbol self.ctype = ctype self.pyfunc = pyfunc self.cfunc = self.ctype(self.pyfunc) self.address = 0 self.proxyaddr = 0 self.jmp = None self.memory = None self.code = None self.active = False def install(self, address, proxyaddr): print "install:", hex(address) self.address = address self.proxyaddr = proxyaddr proxymemory = (c_void_p * 1).from_address(self.proxyaddr) proxymemory[0] = Hook.cast_to_void_p(self.cfunc) self.jmp = self.get_indlongjmp(self.address, self.proxyaddr) self.memory = (c_ubyte * len(self.jmp)).from_address(self.address) self.code = list(self.memory) self.patchmem(self.jmp) self.pyfunc.orig = self.origfunc() self.active = True def uninstall(self): self.patchmem(self.code) self.active = False def origfunc(self): ofunc = self.ctype(self.address) def wrap(*args): self.patchmem(self.code) val = ofunc(*args) self.patchmem(self.jmp) return val return wrap def patchmem(self, src): for i in range(len(src)): self.memory[i] = src[i] @staticmethod def get_indlongjmp(srcaddr, proxyaddr): # 64-bit indirect absolute jump (6 + 8 bytes) # ff 25 off32 jmpq *off32(%rip) try: s = struct.pack('=BBl', 0xff, 0x25, proxyaddr - srcaddr - 6) return map(ord, s) except: print hex(proxyaddr), hex(srcaddr), hex(proxyaddr - srcaddr - 6) raise @staticmethod def hook(symbol, address, proxyaddr): h = Hook.all_hooks[symbol] if h.active: return h.install(address, proxyaddr) @staticmethod def unhook(symbol): h = Hook.all_hooks[symbol] if not h.active: return h.uninstall() def hook(symbol, ctype): def deco(func): Hook.register(symbol, ctype, func) return func return deco #int open (const char *__file, int __oflag, ...) @hook(symbol='open', ctype=CFUNCTYPE(c_int, c_char_p, c_int)) def python_open(fname, oflag): print "open: ", fname, oflag return python_open.orig(fname, oflag)
gdb -ex 'attach PID' -ex 'source /path/pyinject.py' -ex 'set hookfile /path/hook.py' (gdb) pyinject hook open (gdb) continue
Source: https://habr.com/ru/post/237575/
All Articles