第一句子网 - 唯美句子、句子迷、好句子大全
第一句子网 > concurrent.futures模块ThreadPoolExecutor ProcessPoolExecutor讲解及使用实例

concurrent.futures模块ThreadPoolExecutor ProcessPoolExecutor讲解及使用实例

时间:2023-08-24 22:01:50

相关推荐

concurrent.futures模块ThreadPoolExecutor ProcessPoolExecutor讲解及使用实例

导入concurrent.futures.ThreadPoolExecutor

import concurrent.futures

concurrent.futures模块详解

这个模块是python并发执行的标准库,具有线程池和进程池、管理并行编程任务、处理非确定性的执行流程、进程/线程同步等功能。

模块组成

1、concurrent.futures.Executor: 这是一个虚拟基类,提供了异步执行的方法。

2、submit(function, argument): 调度函数(可调用的对象)的执行,将 argument 作为参数传入。

3、map(function, argument): 将 argument 作为参数执行函数,以 异步 的方式。

4、shutdown(Wait=True): 发出让执行者释放所有资源的信号。

5、concurrent.futures.Future: 其中包括函数的异步执行。Future对象是submit任务(即带有参数的functions)到executor的实例。

Executor是抽象类(父类),可以通过子类访问,即线程或进程的 ExecutorPools 。ThreadPoolExecutor是Executor的子类,它使用线程池来异步执行调用。因为,线程或进程的实例是依赖于资源的任务,所以最好以“池”的形式将他们组织在一起,作为可以重用的launcher或executor。

源码函数分析

init

class ThreadPoolExecutor(_base.Executor):# Used to assign unique thread names when thread_name_prefix is not supplied._counter = itertools.count().__next__def __init__(self, max_workers=None, thread_name_prefix='',initializer=None, initargs=()):# max_workers参数为空时,默认为机器处理器个数+4,最大值为32# thread_name_prefix 线程可选名称前缀# initializer 初始化工作线程使,指定的可调用函数# initargs 传给可调用函数的参数元组"""Initializes a new ThreadPoolExecutor instance.Args:max_workers: The maximum number of threads that can be used toexecute the given calls.thread_name_prefix: An optional name prefix to give our threads.initializer: A callable used to initialize worker threads.initargs: A tuple of arguments to pass to the initializer."""if max_workers is None:# ThreadPoolExecutor is often used to:# * CPU bound task which releases GIL# * I/O bound task (which releases GIL, of course)## We use cpu_count + 4 for both types of tasks.# But we limit it to 32 to avoid consuming surprisingly large resource# on many core machine.max_workers = min(32, (os.cpu_count() or 1) + 4)if max_workers <= 0:raise ValueError("max_workers must be greater than 0")if initializer is not None and not callable(initializer):raise TypeError("initializer must be a callable")self._max_workers = max_workersself._work_queue = queue.SimpleQueue()self._idle_semaphore = threading.Semaphore(0)self._threads = set()self._broken = Falseself._shutdown = Falseself._shutdown_lock = threading.Lock()self._thread_name_prefix = (thread_name_prefix or("ThreadPoolExecutor-%d" % self._counter()))self._initializer = initializerself._initargs = initargs

举例

# demo1import concurrent.futuresimport urllib.requestURLS = ['/','/','/','http://www.bbc.co.uk/','http://some-made-up-/']# Retrieve a single page and report the URL and contentsdef load_url(url, timeout):with urllib.request.urlopen(url, timeout=timeout) as conn:return conn.read()# We can use a with statement to ensure threads are cleaned up promptlywith concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:# Start the load operations and mark each future with its URLfuture_to_url = {executor.submit(load_url, url, 60): url for url in URLS}for future in concurrent.futures.as_completed(future_to_url):url = future_to_url[future]try:data = future.result()except Exception as exc:print('%r generated an exception: %s' % (url, exc))else:print('%r page is %d bytes' % (url, len(data)))

# demo2import osimport randomimport threadingimport requests as rqimport timefrom threading import Thread, Lockfrom queue import Queue # 用于多线程之间线程安全的数据通信from concurrent.futures import ThreadPoolExecutor, as_completedpool = ThreadPoolExecutor()'''利用线程池对I/O密集型任务进行优化with ThreadPoolExecutor() as pool:futures = [pool.submit(craw, url) for url in urls]for future in futures:#as_completed后的结果顺序是不固定的print(future.result())html_queue.put(future.result())'''def event_1():print("event_1 started")time.sleep(1)print("event_1 ended")return 1def event_2():print("event_2 started")time.sleep(2)print("event_2 ended")return 2def event_3():print("event_3 started")time.sleep(3)print("event_3 ended")return 3def main():t0 = time.time()res1 = pool.submit(event_1)res2 = pool.submit(event_2)res3 = pool.submit(event_3)print(res1.result())print(res2.result())print(res3.result())t1 = time.time()print(t1 - t0)if __name__ == '__main__':main()

submit

submit(*args, **kwargs)

安排可调用对象 fn 以 fn(*args, **kwargs) 的形式执行,并返回 Future 对象来表示它的执行。

with ThreadPoolExecutor(max_workers=1) as executor:future = executor.submit(pow, 323, 1235)print(future.result())

map

map(func, *iterables, timeout=None, chunksize=1)

类似内置函数 map(func, *iterables),但是有两点不同:

1、立即获取 iterables 而不会惰性获取;

2、异步执行 func,并支持多次并发调用。

它返回一个迭代器。

从调用 Executor.map() 开始的 timeout 秒之后,如果在迭代器上调用了next() 并且无可用结果的话,迭代器会抛出 concurrent.futures.TimeoutError 异常。

timeout 秒数可以是浮点数或者整数,如果设置为 None 或者不指定,则不限制等待时间。

如果 func 调用抛出了异常,那么该异常会在从迭代器获取值的时候抛出。

当使用 ProcessPoolExecutor 的时候,这个方法会把 iterables 划分成多个块,作为独立的任务提交到进程池。这些块的近似大小可以通过给 chunksize 指定一个正整数。对于很长的 iterables,使用较大的 chunksize 而不是采用默认值 1,可以显著提高性能。对于 ThreadPoolExecutor,chunksize 不起作用。

不管并发任务的执行次序如何,map 总是基于输入顺序来返回值。map 返回的迭代器,在主程序迭代的时候,会等待每一项的响应。

def xiaotu_receive_thread(self, *phone):for i in phone:res = self.number_parameter(i)userid, nickname, token, sign = resprint(res)s = f"{token['S']}"t = f"{token['T']}"c_list = (s, t, userid)cooke_list = [c_list] * 10with concurrent.futures.ThreadPoolExecutor() as pool:pool.map(AccountNumber().xiaotu_receive, cooke_list)

shutdown(wait=True)

告诉执行器 executor 在当前所有等待的 future 对象运行完毕后,应该释放执行器用到的所有资源。在 shutdown 之后再调用 Executor.submit() 和 Executor.map() 会报运行时错误 RuntimeError。如果 wait 为 True,那么这个方法会在所有等待的 future 都执行完毕,并且属于执行器 executor 的资源都释放完之后才会返回。如果 wait 为 False,本方法会立即返回。属于执行器的资源会在所有等待的 future 执行完毕之后释放。不管 wait 取值如何,整个 Python 程序在等待的 future 执行完毕之前不会退出。你可以通过 with 语句来避免显式调用本方法。with 语句会用 wait=True 的默认参数调用 Executor.shutdown() 方法。

执行器类 Executor 实现了上下文协议,可以用做上下文管理器。它能并发执行任务,等待它们全部完成。当上下文管理器退出时,自动调用 shutdown() 方法。

import shutilwith ThreadPoolExecutor(max_workers=4) as e:e.submit(shutil.copy, 'src1.txt', 'dest1.txt')e.submit(shutil.copy, 'src2.txt', 'dest2.txt')e.submit(shutil.copy, 'src3.txt', 'dest3.txt')e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

ProcessPoolExecutor 进程池执行器

ProcessPoolExecutor 使用了 multiprocessing 模块,这允许它可以规避 Global Interpreter Lock,但是也意味着只能执行和返回可序列化的(picklable)对象。

main模块必须被 worker 子进程导入,这意味着 ProcessPoolExecutor 在交互解释器中无法工作。

在已经被提交到 ProcessPoolExecutor 中的可调用对象内使用 Executor 或者 Future 方法会导致死锁。

concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

1、这个 Executor 子类最多用 max_workers 个进程来异步执行调用。

2、如果不指定 max_workers 或者为 None,它默认为本机的处理器数量。

3、如果 max_workers 小于等于 0,会抛出 ValueError 异常。

4、mp_context 是多进程上下文(multiprocessing context)或者 None,它会被用来启动 workers。

5、如果不指定 mp_context 或者为 None,会使用默认的多进程上下文环境。

initializer 是一个可选的可调用对象,会在每个 worker 进程启动之前调用。

initargs 是传递给 initializer 的参数元组。

如果 initializer 抛出了异常,那么当前所有等待的任务都会抛出 BrokenProcessPool 异常,继续提交 submit 任务也会抛出此异常。

ProcessPoolExecutor使用实例

import concurrent.futuresimport mathPRIMES = [112272535095293,112582705942171,112272535095293,115280095190773,115797848077099,1099726899285419]def is_prime(n):if n % 2 == 0:return Falsesqrt_n = int(math.floor(math.sqrt(n)))for i in range(3, sqrt_n + 1, 2):if n % i == 0:return Falsereturn Truedef main():with concurrent.futures.ProcessPoolExecutor() as executor:for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):print('%d is prime: %s' % (number, prime))if __name__ == '__main__':main()

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。