第一句子网 - 唯美句子、句子迷、好句子大全
第一句子网 > 第17章:使用 concurrent.futures 模块处理并发-使用 futures.as_completed 函数

第17章:使用 concurrent.futures 模块处理并发-使用 futures.as_completed 函数

时间:2019-07-12 08:45:05

相关推荐

第17章:使用 concurrent.futures 模块处理并发-使用 futures.as_completed 函数

前面的章节我们使用了Executor.map 函数,这个函数易于使用,不过有个特性可能有用,也可能没用, 具体情况取决于需求:这个函数返回结果的顺序与调用开始的顺序一致。

我们知道map 返回的结果是一个生成器 results,当我们用 for 循环取值时, for 会隐式调用 next(results) 返回第一个任务,然后又会在第一个任务的 Future实例上调用 .result() 方法。而 result() 方法会阻塞,直到第一个Future 运行结束;如果第一个调用生成结果用时 10 秒,而其他调用只用 1 秒,代码会阻塞 10 秒,获取 map 方法返回的生成器产出的第一个结果。在此之后,获取后续结果时不会阻塞,因为后续的调用已经结束。

如果必须等到获取所有结果后再处理,这种行为没问题,不过,通常更可取的方式是,不管提交的顺序,只要有结果就获取。

为此,要把 Executor.submit 方法和 futures.as_completed 函数结合起来使用。这个组合比 executor.map 更灵活,因为 submit 方法能处理不同的可调用对象和参数,而 executor.map 只能处理参数不同的同一个可调用对象。此外,传给 futures.as_completed 函数的Future 集合可以来自多个 Executor 实例,例如一些由 ThreadPoolExecutor 实例创建,另一些由 ProcessPoolExecutor 实例创建。

使用 futures.as_completed 函数

如果我们想使用不阻塞的方法立刻拿到线程执行结果,可以将Executor.submit 方法和 futures.as_completed 函数结合起来使用,替代 executor.map方法,实际上 Executor.map的源码中也是循环调用了 Executor.submit方法,只不过可能会被 result() 方法阻塞。而futures.as_completed 函数则是接收一个 Future组成的可迭代对象,然后返回一个迭代器;迭代器在每个 Future 完成时立刻产出结果,此方法不会被阻塞。

示例 17-5-1使用Executor.submit 方法和 futures.as_completed 函数改造代码,使其不阻塞立刻产出国旗下载结果:

下面代码中只修改了batch_downloads 函数,为了方便对比结果,还需要在 main 函数中将结果打印出来,记得download_flag 函数中的 print(cc, end=' ') 和 sys.stdout.flush()。

...from concurrent import futures...# 最下线程数MAX_WORKERS = 20def download_flag(cc):...def batch_downloads():"""多线程下载"""# 确定线程池数量workers = min(MAX_WORKERS, len(POP20_CC))# 启动线程池with futures.ThreadPoolExecutor(workers) as executor:# Future 任务列表future_tasks = []for cc in sorted(POP20_CC):# 使用 submit 方法启动线程,并将 Future 实例添加进列表future = executor.submit(download_flag, cc)future_tasks.append(future)# 传入 Future 列表,在每个 Future 完成时产出结果,此方法不阻塞。task_iter = futures.as_completed(future_tasks)# 完成列表done_list = []for future in task_iter:# 从 task_iter 中获取下载结果并添加到完成列表done_list.append(future.result())return done_listdef main():...print(flags)if __name__ == '__main__':main()# 结果输出1:# ['FR', 'PH', 'JP', 'CD', 'IN', 'DE', 'TR', 'BR', 'MX', 'IR', 'CN', 'ID', 'PK', 'VN', 'US', 'NG', 'BD', 'ET', 'RU', 'EG']# 20 flags downloaded in 1.20s# 结果输出2:# ['PH', 'FR', 'BR', 'ET', 'JP', 'BD', 'IN', 'EG', 'DE', 'MX', 'ID', 'RU', 'CD', 'PK', 'VN', 'NG', 'TR', 'CN', 'IR', 'US']# 20 flags downloaded in 0.80s# 结果输出3:# ['ET', 'BD', 'NG', 'DE', 'FR', 'IR', 'MX', 'BR', 'IN', 'EG', 'CD', 'CN', 'US', 'RU', 'ID', 'TR', 'VN', 'JP', 'PH', 'PK']# 20 flags downloaded in 0.57s

从上方示例看到,修改后的代码的 3 次执行结果顺序都不一样,这是因为每个线程的完成顺序都不一样。

然后我们再看下之前的 17-1-2 的示例的执行结果:

从下面的结果上看,每次返回的结果顺序都是一样的,其实它的线程完成顺序也是不一致的,但是因为在获取结果时会因为 result 的方法阻塞问题,导致最终的结果顺序是一致的。

# 结果输出1:# ['BD', 'BR', 'CD', 'CN', 'DE', 'EG', 'ET', 'FR', 'ID', 'IN', 'IR', 'JP', 'MX', 'NG', 'PH', 'PK', 'RU', 'TR', 'US', 'VN']# 20 flags downloaded in 0.41s# 结果输出2:# ['BD', 'BR', 'CD', 'CN', 'DE', 'EG', 'ET', 'FR', 'ID', 'IN', 'IR', 'JP', 'MX', 'NG', 'PH', 'PK', 'RU', 'TR', 'US', 'VN']# 20 flags downloaded in 0.19s# 结果输出3:# ['BD', 'BR', 'CD', 'CN', 'DE', 'EG', 'ET', 'FR', 'ID', 'IN', 'IR', 'JP', 'MX', 'NG', 'PH', 'PK', 'RU', 'TR', 'US', 'VN']# 20 flags downloaded in 0.61s

第17章:使用 concurrent.futures 模块处理并发-使用 futures.as_completed 函数立刻获取多线程任务执行结果

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。