python程序中的线程操作 concurrent模块使用详解
Python是一种高级编程语言。并发编程是高级编程语言中的一个方面。并发编程可以让程序同时执行多个任务。Python中的并发编程可以通过concurrent模块来实现。
concurrent模块提供了一组工具,用于在Python程序中处理并发任务。本文将详细介绍concurrent模块的使用。
1. 线程和进程
首先介绍一下线程和进程。
线程是操作系统(Windows、Linux、macOS等)调度的最小单位。在Python中,线程由threading模块提供。
进程是操作系统调度的次小单位。在Python中,进程由multiprocessing模块提供。
线程和进程都可以实现并发编程。两者的区别在于:
线程是在同一个进程中的不同执行路径,各个线程共享进程的内存空间。
进程是一个执行中的程序,各个进程有各自独立的内存空间,互不干扰。
2. concurrent模块
concurrent模块包含三个子模块:
concurrent.futures:提供了执行异步任务的高级接口。
concurrent.process:提供了需要开启进程的函数。
concurrent.thread:提供了需要开启线程的函数。
本文将只介绍concurrent.futures和concurrent.thread。
3. concurrent.futures
concurrent.futures提供了两个类:ThreadPoolExecutor和ProcessPoolExecutor。
ThreadPoolExecutor和ProcessPoolExecutor都是Executor的子类,它们的主要区别在于:
ThreadPoolExecutor使用线程池来管理线程,可以加快任务的创建和销毁,但有时会出现GIL锁的问题。
ProcessPoolExecutor使用进程池来管理进程,可以避免GIL锁的问题,但开启进程的开销相对较大,适用于需要进行CPU密集型操作的任务。
3.1 ThreadPoolExecutor
ThreadPoolExecutor可以使用submit()方法提交一个可调用对象,然后返回一个Future对象,用于获取结果。
以下是ThreadPoolExecutor的使用范例:
import concurrent.futures
def task(id):
print(f'Task{id} start')
time.sleep(1)
print(f'Task{id} end')
return id
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
results = [executor.submit(task, i) for i in range(4)]
for f in concurrent.futures.as_completed(results):
print(f.result())
结果为:
Task0 start
Task1 start
Task0 end
0
Task2 start
Task1 end
1
Task3 start
Task2 end
2
Task3 end
3
在上面的例子中,初始时开启了2个线程,提交了4个任务,每次只有2个任务并行执行。as_completed方法可以按照完成的顺序迭代所有结果。
3.2 ProcessPoolExecutor
ProcessPoolExecutor与ThreadPoolExecutor使用方法一致,但需要注意的是:
ProcessPoolExecutor提交的可调用对象将在新的进程中执行,需要保证这些可调用对象可以被序列化。
以下是ProcessPoolExecutor的使用范例:
import concurrent.futures
import math
def check_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def get_primes(n):
print(f'Finding primes in range 1..{n}')
return [x for x in range(1, n) if check_prime(x)]
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
for result in executor.map(get_primes, [100, 200, 300, 400]):
print(result)
结果为:
Finding primes in range 1..100
Finding primes in range 1..200
Finding primes in range 1..300
Finding primes in range 1..400
[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97]
[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107,
109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199]
[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107,
109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233,
239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293]
[1, 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107,
109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233,
239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373,
379, 383, 389, 397]
4. concurrent.thread
concurrent.thread提供了以下函数:
concurrent.futures.ThreadPoolExecutor: ThreadPoolExecutor的别名。
concurrent.futures.ProcessPoolExecutor: ProcessPoolExecutor的别名。
concurrent.futures.Future: 这个类表示异步操作的结果。我们可以使用它的result方法获取异步操作的结果。
concurrent.futures.as_completed(fs, timeout=None): 这个函数等待fs中所有的Future对象完成,并按照完成的顺序返回Future对象的迭代器。它默认不会超时。当timeout参数不为None时,函数将在指定的时间之后超时。
concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED): 这个函数等待fs中的所有Future对象完成,如果提供了timeout参数则等待指定的时间。当所有对象完成时,函数返回指定的完成条件:ALL_COMPLETED、FIRST_COMPLETED或FIRST_EXCEPTION。默认情况下,函数会返回ALL_COMPLETED。返回的结果是两个集合:futures和_exceptions。futures参数是所有Future对象的集合,exceptions参数是那些异常对象的集合(如果有的话)。
concurrent.futures.ThreadPoolExecutor的使用范例:
因为Python中的GIL锁机制,多线程操作下,在多次请求一个特定的接口时,无法获得期望的请求次数。这种情况可以使用concurrent.futures模块实现,演示例子如下:
import concurrent.futures
import requests
import time
def download_something(delay):
"""模拟下载任务"""
time.sleep(delay)
return
