,转Python多进程Process、Pool的使用总结

原文:https://www.cnblogs.com/wangdac/p/13892208.html

python中的多线程其实并不是真正的多线程,如果想要充分地使用多核CPU的资源,在python中大部分情况需要使用多进程。Python提供了非常好用的多进程包multiprocessing,只需要定义一个函数,Python会完成其他所有事情。借助这个包,可以轻松完成从单进程到并发执行的转换。

进程池Pool模块接口说明

拆分任务一提高执行效率时,独立多进程apply_async(),通过列表解析添加子任务,是最优选择。见pool部分

join方法的意义

join()方法可以等待子进程结束后再继续往下运行(更准确地说,在当前位置阻塞主进程,带执行join()的进程结束后再继续执行主进程),通常用于进程间的同步。(进一步地解释,哪个子进程调用了join方法,主进程就要等该子进程执行完后才能继续向下执行,具体可见下边的分析图)

在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。

Pool可以提供指定数量的进程,供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

case:使用进程池(非阻塞)
#coding: utf-8
import multiprocessing
import time

def func(msg):
    print "msg:", msg
    time.sleep(3)
    print "end"

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 3)
    for i in xrange(4):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去

    print "Mark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~"
    pool.close()
    pool.join()   #调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
    print "Sub-process(es) done."

'''    
函数解释:

apply_async(func[, args[, kwds[, callback]]]) 它是非阻塞,apply(func[, args[, kwds]])是阻塞的(理解区别,看例1例2结果区别)
close()    关闭pool,使其不在接受新的任务。
terminate()    结束工作进程,不在处理未完成的任务。
join()    主进程阻塞,等待子进程的退出, join方法要在close或terminate之后使用。
执行说明:创建一个进程池pool,并设定进程的数量为3,xrange(4)会相继产生四个对象[0, 1, 2, 4],四个对象被提交到pool中,因pool指定进程数为3,所以0、1、2会直接送到进程中执行,当其中一个执行完事后才空出一个进程处理对象3,所以会出现输出“msg: hello 3”出现在"end"后。因为为非阻塞,主函数会自己执行自个的,不搭理进程的执行,所以运行完for循环后直接输出“mMsg: hark~ Mark~ Mark~~~~~~~~~~~~~~~~~~~~~~”,主程序在pool.join()处等待各个进程的结束。    
pool.apply(func, (msg, ))   #维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
'''

Python多进程pool.map()函数,有两个参数可以传,第一个参数传的是函数,第二个参数传的是数据列表。那么怎么在第二个数据列表,多传几个参数呢,方法是通过对有多个参数的方法进行封装,在进程中运行封装后的方法。

pool.map

# -*- coding:utf-8 -*-

import time
import multiprocessing

def job(x ,y):
        """
        :param x:
        :param y:
        :return:
        """
        return x * y

def job1(z):
        """
        :param z:
        :return:
        """
        return job(z[0], z[1])

if __name__ == "__main__":
        time1=time.time()
        pool = multiprocessing.Pool(2)
        data_list=[(1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9),(10,10)]
        res = pool.map(job1,data_list)
        time2=time.time()
        print(res)
        pool.close()
        pool.join()
        print('总共耗时:' + str(time2 - time1) + 's')

pool.apply_async

case1 : https://github.com/michaelliao/learn-python3/blob/master/samples/multitask/pooled_processing.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')
case2: https://blog.csdn.net/jinping_shi/article/details/52433867
import multiprocessing
import time

def func(msg):
    print multiprocessing.current_process().name + '-' + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=4) # 创建4个进程
    for i in xrange(10):
        msg = "hello %d" %(i)
        pool.apply_async(func, (msg, ))
    pool.close() # 关闭进程池,表示不能在往进程池中添加进程
    pool.join() # 等待进程池中的所有进程执行完毕,必须在close()之后调用
    print "Sub-process(es) done."
case3:https://blog.csdn.net/weixin_42898819/article/details/81811514
from multiprocessing import Process,Pool  #导入进程池
import time,os

def Foo(i):
    time.sleep(2)
    print('到了2s')
    return i+100
def Bar(arg):
    print('结果:',arg)

if __name__  == '__main__':  
    pool=Pool(processes= 5)  #允许进程池同时放入5个进程

    for i in range(10):  #10个进程都启动 但是一次只能运行5个
        #pool.apply(func= Foo,args=(i,))  #串行执行进程,一次执行1个进程
        pool.apply_async(func= Foo,args=(i,),callback= Bar) #并行执行进程,一次5个,callback回调 Foo执行完就会执行Bar
    print('end')
    pool.close()
    pool.join() #等待进程池中的进程执行完毕  必须先close()  在join()

什么时候用进程池Pool

当我们需要的进程数量不多的时候,我们可以使用multiprocessing的Process类来创建进程。但是如果我们需要的进程特别多的时候,手动创建工作量太大了,所以Python也为我们提供了Pool(池)的方式来创建大量进程。

from multiprocessing import Pool
import os,time

def run(msg):
    print("开始一个子线程运行了……")
    time.sleep(1)
    print("开始一个子线程运行结束了……")

if __name__ == "__main__":
    pool = Pool(3)  # 表示初始化一个进程池,最大进程数为5
    for x in range(10):
        pool.apply_async(run, args=("hello pool",))
    print("------start----")
    pool.close() # 关闭池
    pool.join() # 等待所有的子进程完成,必须放在close后面
    print("-------end------")
    
'''
注意:一般我们使用apply_async这个方法,表示非阻塞的运行,一旦使用了apply方法表示阻塞式执行任务,此时就是单任务执行了(一般不会使用,特殊场景才会使用)
'''    

Pool.map多参数任务

map的多参数解决办法

#也可以用List将多个参数拼接成一个argList元组,然后多个argList再组合为pool.map要求的可迭代对象

def job(x ,y):
        return x * y

def job1(z):
    return job(z[0], z[1])

if __name__ == "__main__":
        pool = multiprocessing.Pool()
        res = pool.map(job1, [(2, 3), (3, 4)])
        print res
# 将多个输入变量打包到一个参数
x = [1,2,3,4,5,6]
y = [1,1,1,1,1,1]
x_y = zip(x, y)
results = pool.map(work, x_y)

#使用pathos包下的multiprocessing
这个包是使用dill的multiprocessing的一个fork,允许多参数输入:

from pathos.multiprocessing import ProcessingPoll as Pool
pool = Pool(4)
results = pool.map(work, x, y)

Pool.apply_async()有序输出多个迭代结果

在使用apply_async()方法接收多个参数的方法时,在任务方法中正常定义多个参数,参数以元组形式传入即可

但是给apply_async()方法传入多个值获取多个迭代结果时就会报错,因为该方法只能接收一个值,所以可以将该方法放入一个列表生成式中,如下

def job(x):
    return x * x

if __name__ == "__main__":
    pool multiprocessing.Pool()
    res = [pool.apply_async(target=job, (i,)) for i in range(3)]
    print [r.get() for r in res]

进程池 apply_async, map方法示例

进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。就是固定有几个进程可以使用。

进程池中有两个方法:

apply:同步,一般不使用

apply_async:异步

from  multiprocessing import Process,Pool
import os, time, random

def fun1(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    pool = Pool(5) #创建一个5个进程的进程池

    for i in range(10):
        pool.apply_async(func=fun1, args=(i,))

    pool.close()
    pool.join()
    print('结束测试')

结果

Run task 0 (37476)...
Run task 1 (4044)...
Task 0 runs 0.03 seconds.
Run task 2 (37476)...
Run task 3 (17252)...
Run task 4 (16448)...
Run task 5 (24804)...
Task 2 runs 0.27 seconds.
Run task 6 (37476)...
Task 1 runs 0.58 seconds.
Run task 7 (4044)...
Task 3 runs 0.98 seconds.
Run task 8 (17252)...
Task 5 runs 1.13 seconds.
Run task 9 (24804)...
Task 6 runs 1.46 seconds.
Task 4 runs 2.73 seconds.
Task 8 runs 2.18 seconds.
Task 7 runs 2.93 seconds.
Task 9 runs 2.93 seconds.
结束测试

Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。

进程池map方法

因为网上看到这个例子觉得不错,所以这里就不自己写案例,这个案例比较有说服力

import os 
import PIL 

from multiprocessing import Pool 
from PIL import Image

SIZE = (75,75)
SAVE_DIRECTORY = \'thumbs\'

def get_image_paths(folder):
    return (os.path.join(folder, f) 
            for f in os.listdir(folder) 
            if \'jpeg\' in f)

def create_thumbnail(filename): 
    im = Image.open(filename)
    im.thumbnail(SIZE, Image.ANTIALIAS)
    base, fname = os.path.split(filename) 
    save_path = os.path.join(base, SAVE_DIRECTORY, fname)
    im.save(save_path)

if __name__ == \'__main__\':
    folder = os.path.abspath(
        \'11_18_2013_R000_IQM_Big_Sur_Mon__e10d1958e7b766c3e840\')
    os.mkdir(os.path.join(folder, SAVE_DIRECTORY))

    images = get_image_paths(folder)

    pool = Pool()
    pool.map(creat_thumbnail, images) #关键点,images是一个可迭代对象
    pool.close()
    pool.join()

复制

上边这段代码的主要工作就是将遍历传入的文件夹中的图片文件,一一生成缩略图,并将这些缩略图保存到特定文件夹中。这我的机器上,用这一程序处理 6000 张图片需要花费 27.9 秒。 map 函数并不支持手动线程管理,反而使得相关的 debug 工作也变得异常简单。

map在爬虫的领域里也可以使用,比如多个URL的内容爬取,可以把URL放入元祖里,然后传给执行函数。