python之路——多进程,进程间通信、进程池

  一个进程,包括了代码、数据和分配给进程的资源(内存),在计算机系统里直观地说一个进程就是一个PID。操作系统保护进程空间不受外部进程干扰,即一个进程不能访问到另一个进程的内存。有时候进程间需要进行通信,这时可以使用操作系统提供进程间通信机制。通常情况下,执行一个可执行文件操作系统会为其创建一个进程以供它运行。但如果该执行文件是基于多进程设计的话,操作系统会在最初的进程上创建出多个进程出来,这些进程间执行的代码是一样,但执行结果可能是一样的,也可能是不一样的。

  为什么需要多进程?最直观的想法是,如果操作系统支持多核的话,那么一个执行文件可以在不同的核心上跑;即使是非多核的,在一个进程在等待I/O操作时另一个进程也可以在CPU上跑,提高CPU利用率、程序的效率。

  在Linux系统上可以通过fork()来在父进程中创建出子进程。一个进程调用fork()后,系统会先给新进程分配资源,例如存储数据和代码空间。然后把原来进程的所有值、状态都复制到新的进程里,只有少数的值与原来的进程不同,以区分不同的进程。fork()函数会返回两次,一次给父进程(返回子进程的pid或者fork失败信息),一次给子进程(返回0)。至此,两个进程分道扬镳,各自运行在系统里。

python调用多进程模块为------->>>multiprocessing

import multiprocessing,os,time

def func(name):
    print("%s is talking.....[father: %s ;own: %s]"%(name,os.getppid(),os.getpid()))
    # time.sleep(1)
def f(name):
    func(name)

if __name__ == "__main__":
    func("main")             #父进程调用func
    for i in range(5):
        P = multiprocessing.Process(target=f,args=(i,))    #子进程调用f
        P.start()

其中:os.getppid() ------->> 输出父进程pid ; os.getpid() ------->> 输出当前子进程pid

进程间通讯

不同进程间内存是不共享的,要想实现两个进程间的数据交换,可以用以下方法:

队列(Queues)-----------> 与线程queues方法类似,但不同

import multiprocessing,time,os,queue

def func(q):
    q.put(["japhi",1,"alex"])

if __name__ == "__main__":
    q = multiprocessing.Queue()     #Queue 能使进程之间进行数据互通
    # q = queue.Queue()       这个方法仅限线程,进程不能用
    q.put("alex")
    p = multiprocessing.Process(target=func,args=(q,))  #实质上是将主进程的q复制了一份给子进程p
    p.start()
    print(q.get())
    print(q.get())

将父进程的队列q传给子进程,实质上是对主进程的q进行了一个复制操作。

管道(pipe)

实例化管道后会产生两个值,一个给父进程,一个给子进程,就可以使数据通过管道在父进程和子进程中交互;类似于scoket通信。

import multiprocessing

def func(con):
    print(con.recv())
    con.send("收到主进程信息")   #子进程收父进程发送的数据
    con.close()

if __name__ == "__main__":
    conn_a,conn_b = multiprocessing.Pipe()    #实例化管道对象,主进程和子进程间可以进行数据通信
    p = multiprocessing.Process(target=func, args=(conn_b,))   #将conn_b给了子进程
    p.start()
    conn_a.send("您好,子进程")  #父进程发送数据
    print(conn_a.recv())

Manager 

Manager()返回的manager对象控制一个包含Python对象的服务器进程,并允许其他进程使用代理来操作它们。

Manager()返回的manager将支持类型list,dict,Namespace,Lock,RLock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value and Array。 例如,

import multiprocessing,os

def mod(d,l):
    d[os.getpid()] =os.getppid()
    l.append(os.getpid())
    print("\033[1;32m%s\033[0m"%l)
    print("\033[1;35m%s\033[0m"%d)

if __name__ == "__main__":
    M = multiprocessing.Manager()
    # with multiprocessing.Manager() as M:    与上面的M = multiprocessing.Manager() 作用一致
    dic = M.dict()   #生成一个字典,可在多个进程间共享和传递
    list1 = M.list(range(2))   #生成一个列表,可在多个进程间共享和传递
    p_list = []
    for i in range(5):
        p = multiprocessing.Process(target=mod,args=(dic,list1))
        p.start()
        p_list.append(p)
    for res in p_list:
        res.join()   #等待结果,不然程序会报错
    print(dic)
    print(list1)

进程同步 

what?

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()
    for num in range(10):
        Process(target=f, args=(lock, num)).start()

进程池

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

  • apply 串行,且没有回调函数
  • apply_async 并行

回调函数:指进程里的函数执行完会自动执行回调函数

import multiprocessing,os,time

def f(name):
    time.sleep(1)
    print("%s from %s"%(name,os.getpid()))

def b(a):
    print("------>>回调")

if __name__ == '__main__':
    # po = multiprocessing.Pool(processes=3)   #进程池中只能同时放入5个进程,可以直接写5
    po = multiprocessing.Pool(3)  #与上面的效果一样
    for i in range(20):
        # po.apply(func = f,args=(i,))    ##串行
        po.apply_async(func=f, args=(i,), callback=b)   #并行,callback是回调,前面f函数执行完执行b
    print("Done")
    po.close()
    po.join()   #进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。