第一句子网 - 唯美句子、句子迷、好句子大全
第一句子网 > python并发之concurrent.futures

python并发之concurrent.futures

时间:2019-10-24 06:18:27

相关推荐

python并发之concurrent.futures

concurrent:并发

Python标准库为我们提供了threading和multiprocessing模块编写相应的多线程/多进程代码。从Python3.2开始,标准库为我们提供了concurrent.futures模块,它提供了ThreadPoolExecutorProcessPoolExecutor两个类,实现了对threadingmultiprocessing的更高级的抽象,对编写线程池/进程池提供了直接的支持。

concurrent.futures基础模块是executor和future。

Executor

Executor是一个抽象类,它不能被直接使用。它为具体的异步执行定义了一些基本的方法。ThreadPoolExecutor和ProcessPoolExecutor继承了Executor,分别被用来创建线程池和进程池的代码。

submit方法

Executor中定义了submit()方法,这个方法的作用是提交一个可执行的回调task,并返回一个future实例。future对象代表的就是给定的调用。

我们使用submit方法来往线程池中加入一个task,submit返回一个Future对象,对于Future对象可以简单地理解为一个在未来完成的操作。

map方法

Exectuor还为我们提供了map方法,和内建的map用法类似。映射。

future

Future实例是由Executor.submit()创建的。可以理解为一个在未来完成的操作,这是异步编程的基础。通常情况下,我们执行io操作,访问url时(如下)在等待结果返回之前会产生阻塞,cpu不能做其他事情,而Future的引入帮助我们在等待的这段时间可以完成其他的操作。

示例:

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutorimport os,time,randomdef foo(i):print('%s is running %s'%(os.getpid(),i))time.sleep(random.randint(1, 3))return i**2if __name__ == '__main__':print('cpu_num:',os.cpu_count())executor=ProcessPoolExecutor()print('executor',executor,type(executor))# futures=[]# for i in range(10):#future=executor.submit(foo,i)#futures.append(future)futures=[executor.submit(foo,i) for i in range(10)]executor.shutdown()#程序运行到这里有明显的时间间隔,可见是在shutdown存在的情况下,程序将future全部执行完,才继续往下走的print('主')print(futures)for future in futures:print(future.result())

输出:

cpu_num: 8executor <concurrent.futures.process.ProcessPoolExecutor object at 0x00000276745AA978> <class 'concurrent.futures.process.ProcessPoolExecutor'>11740 is running 03156 is running 19928 is running 22208 is running 32324 is running 413080 is running 51892 is running 62964 is running 72208 is running 82324 is running 9主[<Future at 0x27674900e10 state=finished returned int>, <Future at 0x27674949dd8 state=finished returned int>, <Future at 0x27674949e80 state=finished returned int>, <Future at 0x27674949f28 state=finished returned int>, <Future at 0x27674949fd0 state=finished returned int>, <Future at 0x2767495a0b8 state=finished returned int>, <Future at 0x2767495a198 state=finished returned int>, <Future at 0x2767495a278 state=finished returned int>, <Future at 0x2767495a358 state=finished returned int>, <Future at 0x2767495a438 state=finished returned int>]0149162536496481

利用ThreadProcessExecutor爬虫

from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutorimport requestsdef get(url):r=requests.get(url)return {'url':url,'text':r.text}def parse(future):dic=future.result()#future对象调用result方法取其值、f=open('db.text','a')date='url:%s\n'%len(dic['text'])f.write(date)f.close()if __name__ == '__main__':executor=ThreadPoolExecutor()url_l = ['/', '/wupeiqi/', '/654321cc/','/', '/n1//1012/c1008-29581930.html','/news/shaonianxinzangyou5gedong.html', ]futures=[]for url in url_l:executor.submit(get,url).add_done_callback(parse) #与Pool进程池回调函数接收的是A函数的返回值(对象ApplyResult.get()得到的值)。executor.shutdown() #这里回调函数parse,接收的参数是submit生成的 Future对象。print('主')

输出:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。