Python-分布式进程

分布式进程是将Process进程分布到多台服务器中,利用多台机器的性能完成复杂的任务。可以应用到分布式爬虫的开发中。
分布式进程在Python中依然要用到multiprocess模块。它不但支持多进程,其中managers子模块还支持吧多进程分不到多台机器上,可以写一个服务进程作为调度者,将任务分不到其他多个进程中,依靠网络通信进行管理。
例:
抓取某网站的所有图片,多进程的方法是一个进程负责抓取图片的url,然后放到Queue中,另外的进程负责从Queue中读取url进行下载和存储到本地。
分布式:
当把上述过程做成分布式,一台机器上负责抓取url链接,其他机器负责下载和存储。
主要问题就是讲Queue暴露在网络中,让其他机器的进程都可以访问,分布式进程就是将这一过程进行了封装,可以将之成为本队列的网络化。
创建分布式进程需要的六个步骤:
1、建立队列Queue,用来进程之间的通信:
服务进程创建任务队列task_queue,用来作为传递惹怒给任务进程的通道。在分布式多进程环境下必须通过由Queuemanager
获得Queue接口来添加任务。
2、把第一步中建立的队列在网络上注册,暴露给其他进程(主机),注册后获得网络队列,相当于本地队列的映像。
3、建立一个对象(Queuemanager(BaseManage))实例manager,绑定端口和验证口令。
4、启动第三部中建立的实例,即启动管理manager,监管通信通道。
5、通过管理实例的方法获得通过网络访问Queue对象,即再把网络队列实例化成可已使用的本地队列。
6、创建人物到"本地"队列中,自动上传人物到网络队列中,分配给任务进程进行处理。
Python版本:3.5
系统:Mac OS
taskManager.py


#!/usr/bin/env python
# -*- coding: utf-8 -*-

__author__ = 'Fade Zhao'

# 服务进程(taskManager.py)
# 导入随机,时间,队列
from multiprocessing.managers import BaseManager

import time,random,queue
# #实现第一步:建立task_queue和result_queue,用来存放任务和结果
task_queue = queue.Queue()
result_queue = queue.Queue()
class Queuemanager(BaseManager):
    pass
#实现第二步:把创建的两个队列注册在网络上,利用register方法,callable参数关联了Queue对象,
# 将Queue对象在网络中暴露
Queuemanager.register('get_task_queue',callable=lambda:task_queue)
Queuemanager.register('get_result_queue',callable=lambda:result_queue)
#实现第三步:绑定端口8001,设置验证口令b‘abc’(是b格式)。这个相当于对象的初始化
manager=Queuemanager(address=('127.0.0.1',1234),authkey=b'abc',)
#实现第四步:启动管理,监听信息通道
manager.start()
#实现第五步:通过管理实例的方法获得通过网络访问的Queue对象
task=manager.get_task_queue()
result=manager.get_result_queue()
#实现第六步:添加任务
for url in ["ImageUrl_"+str(i) for i in range(10)]:
    print('put task %s ...' %url)
    task.put(url)
#获取返回结果
print('try get result...')
for i in range(10):
    print('result is %s' %result.get(timeout=10))
#关闭管理
manager.shutdown()

taskWorker.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

__author__ = 'Fade Zhao'


import time
from multiprocessing.managers import BaseManager
# 创建类似的QueueManager:
class QueueManager(BaseManager):
    pass
# 实现第一步:使用QueueManager注册获取Queue的方法名称
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 实现第二步:连接到服务器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证口令注意保持与服务进程设置的完全一致:
m = QueueManager(address=(server_addr, 1234), authkey=b'abc')
# 从网络连接:
m.connect()
# 实现第三步:获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 实现第四步:从task队列取任务,并把结果写入result队列:
while(not task.empty()):
        image_url = task.get(True,timeout=5)
        print('run task download %s...' % image_url)
        time.sleep(1)
        result.put('%s--->success'%image_url)

结果:

taskManager>>>
Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...
taskWorker>>>
Connect to server 127.0.0.1...
run task download ImageUrl_0...
run task download ImageUrl_1...
run task download ImageUrl_2...
run task download ImageUrl_3...
run task download ImageUrl_4...
run task download ImageUrl_5...
run task download ImageUrl_6...
run task download ImageUrl_7...
run task download ImageUrl_8...
run task download ImageUrl_9...

  注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。

  这个简单的Manager/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如换成发送邮件,就实现了邮件队列的异步发送。

  其中的Queue是在taskManage进程中被创建注册到网络中,而taskWorker通过网络获取Queue。

  authkey 为了保证两台机器正常通信,不被其他机器恶意干扰。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定连接不上。