欢迎访问宙启技术站
智能推送

python程序中的线程操作 concurrent模块使用详解

发布时间:2023-05-16 08:51:20

Python是一种高级编程语言。并发编程是高级编程语言中的一个方面。并发编程可以让程序同时执行多个任务。Python中的并发编程可以通过concurrent模块来实现。

concurrent模块提供了一组工具,用于在Python程序中处理并发任务。本文将详细介绍concurrent模块的使用。

1. 线程和进程

首先介绍一下线程和进程。

线程是操作系统(Windows、Linux、macOS等)调度的最小单位。在Python中,线程由threading模块提供。

进程是操作系统调度的次小单位。在Python中,进程由multiprocessing模块提供。

线程和进程都可以实现并发编程。两者的区别在于:

线程是在同一个进程中的不同执行路径,各个线程共享进程的内存空间。

进程是一个执行中的程序,各个进程有各自独立的内存空间,互不干扰。

2. concurrent模块

concurrent模块包含三个子模块:

concurrent.futures:提供了执行异步任务的高级接口。

concurrent.process:提供了需要开启进程的函数。

concurrent.thread:提供了需要开启线程的函数。

本文将只介绍concurrent.futures和concurrent.thread。

3. concurrent.futures

concurrent.futures提供了两个类:ThreadPoolExecutor和ProcessPoolExecutor。

ThreadPoolExecutor和ProcessPoolExecutor都是Executor的子类,它们的主要区别在于:

ThreadPoolExecutor使用线程池来管理线程,可以加快任务的创建和销毁,但有时会出现GIL锁的问题。

ProcessPoolExecutor使用进程池来管理进程,可以避免GIL锁的问题,但开启进程的开销相对较大,适用于需要进行CPU密集型操作的任务。

3.1 ThreadPoolExecutor

ThreadPoolExecutor可以使用submit()方法提交一个可调用对象,然后返回一个Future对象,用于获取结果。

以下是ThreadPoolExecutor的使用范例:

import concurrent.futures

def task(id):

    print(f'Task{id} start')

    time.sleep(1)

    print(f'Task{id} end')

    return id

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:

    results = [executor.submit(task, i) for i in range(4)]

    for f in concurrent.futures.as_completed(results):

        print(f.result())

结果为:

Task0 start

Task1 start

Task0 end

0

Task2 start

Task1 end

1

Task3 start

Task2 end

2

Task3 end

3

在上面的例子中,初始时开启了2个线程,提交了4个任务,每次只有2个任务并行执行。as_completed方法可以按照完成的顺序迭代所有结果。

3.2 ProcessPoolExecutor

ProcessPoolExecutor与ThreadPoolExecutor使用方法一致,但需要注意的是:

ProcessPoolExecutor提交的可调用对象将在新的进程中执行,需要保证这些可调用对象可以被序列化。

以下是ProcessPoolExecutor的使用范例:

import concurrent.futures

import math

def check_prime(n):

    if n % 2 == 0:

        return False

    sqrt_n = int(math.floor(math.sqrt(n)))

    for i in range(3, sqrt_n + 1, 2):

        if n % i == 0:

            return False

    return True

def get_primes(n):

    print(f'Finding primes in range 1..{n}')

    return [x for x in range(1, n) if check_prime(x)]

with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:

    for result in executor.map(get_primes, [100, 200, 300, 400]):

        print(result)

结果为:

Finding primes in range 1..100

Finding primes in range 1..200

Finding primes in range 1..300

Finding primes in range 1..400

[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]

[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107,

109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199]

[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107,

109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233,

239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293]

[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107,

109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233,

239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373,

379, 383, 389, 397]

4. concurrent.thread

concurrent.thread提供了以下函数:

concurrent.futures.ThreadPoolExecutor: ThreadPoolExecutor的别名。

concurrent.futures.ProcessPoolExecutor: ProcessPoolExecutor的别名。

concurrent.futures.Future: 这个类表示异步操作的结果。我们可以使用它的result方法获取异步操作的结果。

concurrent.futures.as_completed(fs, timeout=None): 这个函数等待fs中所有的Future对象完成,并按照完成的顺序返回Future对象的迭代器。它默认不会超时。当timeout参数不为None时,函数将在指定的时间之后超时。

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED): 这个函数等待fs中的所有Future对象完成,如果提供了timeout参数则等待指定的时间。当所有对象完成时,函数返回指定的完成条件:ALL_COMPLETED、FIRST_COMPLETED或FIRST_EXCEPTION。默认情况下,函数会返回ALL_COMPLETED。返回的结果是两个集合:futures和_exceptions。futures参数是所有Future对象的集合,exceptions参数是那些异常对象的集合(如果有的话)。

concurrent.futures.ThreadPoolExecutor的使用范例:

因为Python中的GIL锁机制,多线程操作下,在多次请求一个特定的接口时,无法获得期望的请求次数。这种情况可以使用concurrent.futures模块实现,演示例子如下:

import concurrent.futures

import requests

import time

def download_something(delay):

    """模拟下载任务"""

    time.sleep(delay)

    return