欢迎访问宙启技术站
智能推送

使用ctypes.windll在Python中实现Windows系统进程和线程的管理

发布时间:2024-01-02 12:07:37

在Python中,可以使用ctypes库来实现Windows系统进程和线程的管理。ctypes提供了一种与C代码进行交互的方式,允许我们调用Windows系统的动态链接库函数。

下面是使用ctypes.windll实现Windows系统进程和线程管理的示例代码:

import ctypes

# 获取Windows系统动态链接库
kernel32 = ctypes.windll.kernel32

# 创建进程函数
def create_process(process_path):
    # 定义STARTUPINFO和PROCESS_INFORMATION结构体
    class STARTUPINFO(ctypes.Structure):
        _fields_ = [
            ("cb", ctypes.c_uint),
            ("lpReserved", ctypes.c_char_p),
            ("lpDesktop", ctypes.c_char_p),
            ("lpTitle", ctypes.c_char_p),
            ("dwX", ctypes.c_uint),
            ("dwY", ctypes.c_uint),
            ("dwXSize", ctypes.c_uint),
            ("dwYSize", ctypes.c_uint),
            ("dwXCountChars", ctypes.c_uint),
            ("dwYCountChars", ctypes.c_uint),
            ("dwFillAttribute", ctypes.c_uint),
            ("dwFlags", ctypes.c_uint),
            ("wShowWindow", ctypes.c_ushort),
            ("cbReserved2", ctypes.c_ushort),
            ("lpReserved2", ctypes.c_char_p),
            ("hStdInput", ctypes.c_void_p),
            ("hStdOutput", ctypes.c_void_p),
            ("hStdError", ctypes.c_void_p),
        ]

    class PROCESS_INFORMATION(ctypes.Structure):
        _fields_ = [
            ("hProcess", ctypes.c_void_p),
            ("hThread", ctypes.c_void_p),
            ("dwProcessId", ctypes.c_uint),
            ("dwThreadId", ctypes.c_uint),
        ]

    # 初始化STARTUPINFO和PROCESS_INFORMATION
    startupinfo = STARTUPINFO()
    process_information = PROCESS_INFORMATION()
    startupinfo.cb = ctypes.sizeof(startupinfo)

    # 创建新进程
    result = kernel32.CreateProcessA(
        None,
        ctypes.c_char_p(process_path.encode()),
        None,
        None,
        False,
        0,
        None,
        None,
        ctypes.byref(startupinfo),
        ctypes.byref(process_information)
    )

    if result != 0:
        print("进程创建成功!")
        print("进程ID:", process_information.dwProcessId)
        print("线程ID:", process_information.dwThreadId)
        print("进程句柄:", process_information.hProcess)
        print("线程句柄:", process_information.hThread)
    else:
        print("进程创建失败!")

# 创建线程函数
def create_thread(thread_function):
    thread_id = ctypes.c_long()
    thread_handle = kernel32.CreateThread(
        None,
        0,
        thread_function,
        None,
        0,
        ctypes.byref(thread_id)
    )

    if thread_handle != 0:
        print("线程创建成功!")
        print("线程ID:", thread_id.value)
        print("线程句柄:", thread_handle)
    else:
        print("线程创建失败!")

# 查询系统中所有进程
def get_all_processes():
    # 定义PROCESSENTRY32结构体
    class PROCESSENTRY32(ctypes.Structure):
        _fields_ = [
            ("dwSize", ctypes.c_uint),
            ("cntUsage", ctypes.c_uint),
            ("th32ProcessID", ctypes.c_uint),
            ("th32DefaultHeapID", ctypes.c_void_p),
            ("th32ModuleID", ctypes.c_uint),
            ("cntThreads", ctypes.c_uint),
            ("th32ParentProcessID", ctypes.c_uint),
            ("pcPriClassBase", ctypes.c_long),
            ("dwFlags", ctypes.c_uint),
            ("szExeFile", ctypes.c_char * 260),
        ]

    # 创建快照
    snapshot_handle = kernel32.CreateToolhelp32Snapshot(0x00000002, 0)

    # 获取进程信息
    process_entry = PROCESSENTRY32()
    process_entry.dwSize = ctypes.sizeof(PROCESSENTRY32)
    result = kernel32.Process32First(snapshot_handle, ctypes.byref(process_entry))

    while result:
        # 输出进程信息
        print("进程ID:", process_entry.th32ProcessID)
        print("父进程ID:", process_entry.th32ParentProcessID)
        print("进程名称:", process_entry.szExeFile.decode("gbk"))

        result = kernel32.Process32Next(snapshot_handle, ctypes.byref(process_entry))

    # 关闭快照
    kernel32.CloseHandle(snapshot_handle)

# 查询系统中所有线程
def get_all_threads():
    # 创建快照
    snapshot_handle = kernel32.CreateToolhelp32Snapshot(0x00000004, 0)

    # 定义THREADENTRY32结构体
    class THREADENTRY32(ctypes.Structure):
        _fields_ = [
            ("dwSize", ctypes.c_uint),
            ("cntUsage", ctypes.c_uint),
            ("th32ThreadID", ctypes.c_uint),
            ("th32OwnerProcessID", ctypes.c_uint),
            ("tpBasePri", ctypes.c_long),
            ("tpDeltaPri", ctypes.c_long),
            ("dwFlags", ctypes.c_uint),
        ]

    # 获取线程信息
    thread_entry = THREADENTRY32()
    thread_entry.dwSize = ctypes.sizeof(THREADENTRY32)
    result = kernel32.Thread32First(snapshot_handle, ctypes.byref(thread_entry))

    while result:
        # 输出线程信息
        print("线程ID:", thread_entry.th32ThreadID)
        print("所属进程ID:", thread_entry.th32OwnerProcessID)

        result = kernel32.Thread32Next(snapshot_handle, ctypes.byref(thread_entry))

    # 关闭快照
    kernel32.CloseHandle(snapshot_handle)

# 测试 create_process 函数
create_process("C:\\Windows\\System32\
otepad.exe")

# 测试 create_thread 函数
def thread_function(parameter):
    print("这是一个新线程")

create_thread(thread_function)

# 测试 get_all_processes 函数
get_all_processes()

# 测试 get_all_threads 函数
get_all_threads()

上述代码示例实现了创建新进程、新线程,以及查询系统中所有进程和线程的功能。使用ctypes库与Windows系统动态链接库函数进行交互,可以方便地实现Windows系统进程和线程的管理。