Python之多进程&异步并行

由于python的gil,多线程不是cpu密集型最好的选择
多进程可以完全独立的进程环境中运行程序,可以充分的利用多处理器
但是进程本身的隔离带来的数据不共享也是一个问题,而且线程比进程轻量
import multiprocessing
import datetime
def calc(i):
    sum = 0
    for _ in range(1000000000):
        sum+=1
    print(i,sum)

if __name__ == '__main__':
    start = datetime.datetime.now()

    ps = []
    for i in range(5):
        p = multiprocessing.Process(target=calc,args=(i,))
        ps.append(p)
        p.start()
    for p in ps:
        p.join()

    delta = (datetime.datetime.now()-start).total_seconds()
    print(delta)
注意多进程的代码一定要在__name__='__main__'下面执行
pid 进程id
exitcode 进程退出的状态码
terminate() 终止指定的进程
进程间同步:
multiprocessing还提供了共享内存,服务器进程来共享数据,还提供了queue队列,pipe管道用于进程间通信
通讯方式不同:
1多进程就是启动多个解释器进程,进程间通信必须序列化,反序列化
2.数据的线程安全性问题
由于每个进程中没有实现多线程,gil可以说没什么用
multiprocessing.Pool是进程池类
apply(self,func,args=(),kwds={})
阻塞执行,导致主进程执行其他子进程就像一个个执行
apply_async(self,func,args=(),kwds={},callback=None,error_callback=None)
与apply方法用法一致,非阻塞执行,得到结果后执行回调
close()
关闭池,池不能在接受新的任务
terminate()
结束工作进程,不在处理未处理的任务
join()
主进程阻塞等待子进程的退出,join方法要在close或terminate之后使用
import logging
import datetime
import multiprocessing

FORMAT = '%(asctime)s\t %(processName)s %(process)d %(message)s'
logging.basicConfig(level=logging.INFO,format=FORMAT)

def calc(i):
    sum = 0
    for _ in range(1000):
        sum += 1
    return sum

if __name__ == '__main__':
    start = datetime.datetime.now()
    pool = multiprocessing.Pool(5)
    for i in range(5):
        pool.apply_async(calc,args=(i,),callback=lambda x:logging.info('{} in callback'.format(x)))
    pool.close()
    # pool.join()

    delta = (datetime.datetime.now()-start).total_seconds()
    print(delta)
请求/应答模型:web应用中常见的处理模型
master启动多个worker工作进程,一般和cpu数目相同,发挥多核优势
worker工作进程中,往往需要操作网络io和磁盘io,启动多线程,提高并发处理能力,worker处理用户请求,往往需要等待数据,处理完请求还要通过网络io返回响应

2.异步并行 

'''
异步并行任务编程模块,提供一个高级的异步可执行的便利接口
ThreadPoolExecutor 异步调用的线程池的Executor
ProcessPoolExecutor 异步调用的进程池的Executor

ThreadPoolExceutor对象
ThreadPoolExecutor(max_workers=1) 池中至多创建max_workers个线程来同时异步执行,返回Exceutor
submit(fn,*args,**kwargs) 提交执行的函数及其参数,返回Future实例
shutdown(wait=True)    清理池

Future类
done()      如果调用被成功的取消或者执行完成,返回True
cancelled() 如果调用被成功的取消,返回True
running()   如果正在运行而且不能被取消,返回True
cancel()    尝试取消调用,如果已经执行且不能取消返回False,否则返回True
result(timeout=None) 取返回结果,timeout为none,一直等待返回;timeout设置到期,抛出concurrent.futures.timeouterror异常。
exceptiuon(timeout=None) 取返回的异常,timeout为none,一直等待返回;timeout设置到期,抛出concurrent.futures.timeouterror异常。
'''

 

import threading
from concurrent import futures
import logging
import time

FORMAT = '%(asctime)s\t %(processName)s %(process)d %(message)s'
logging.basicConfig(level=logging.INFO,format=FORMAT)

def worker(n):
    logging.info('begin to work {}'.format(n))
    time.sleep(5)
    logging.info('finished {}'.format(n))
#创建线程池,池的容量是3
executor = futures.ThreadPoolExecutor(max_workers=3)
fs = []
for i in range(3):
    future = executor.submit(worker,i)
    fs.append(future)
for i in range(3,6):
    future = executor.submit(worker,i)
    fs.append(future)
while True:
    time.sleep(2)
    logging.info(threading.enumerate())
    flag = True
    for f in fs: #判断是否有未完成的任务
        logging.info(f.done())
        flag = flag and f.done()

    if flag:
        executor.shutdown() #清理池,池中线程全部杀掉
        logging.info(threading.enumerate())
        break
#线程池一旦创建线程,就不需要频繁清楚了

支持上下文:TODO