笔记-python-standard library-17.2 multiprocessing

1. multiprocessing

source code:Lib/multiprocessing/

多进程是一个包,用于支持多任务处理,它的接口类似threading。

它还提供了threading模块没有的功能,典型的是pool。

1.1. process class

与threading中类似,通过创建一个process对象并调用start()来启动进程。

from multiprocessing import Process

def f(name):

print('hello', name)

if __name__ == '__main__':

p = Process(target=f, args=('bob',))

p.start()

p.join()

上面的案例并不能直观的显示出进程的创建,下面是一个带信息展示的案例:

from multiprocessing import Process

import os

def info(title):

print(title)

print('module name:', __name__)

print('parent process:', os.getppid())

print('process id:', os.getpid())

def f(name):

info('function f')

print('hello', name)

if __name__ == '__main__':

info('main line')

p = Process(target=f, args=('bob',))

p.start()

p.join()

1.2. contexts and start methods

depending on the platform, multiprocessing supports three ways to start a process.

  1. spawn:

父进程启动一个新的Python解释器进程。子进程仅继承对运行run()方法必需的资源,特别是不会继承不需要的文件描述符和句柄。

这种启动方式比fork or forkserver都慢。

可以在windows and unix中使用,在windows中是默认选项。

  1. fork

父进程使用os.fork()生成Python interpretr。子进程在启动时与父进程完全相同。所有父进程的资源都被子进程继承。

可以在unix中使用,也是unix环境下的默认选项。

  1. forkserver

当使用forkserver时,一个服务器进程被创建。从这时始,当需要创建一个新进程时,父进程连接到服务器进程并申请fork一个新的进程。

the forkserver process is single threaded so it is safe for it to use os.for().

no unnecessary resources are inherited.

在unix平台上可用,但平台要支持通过管道传递文件描述符。

在版本3.4中更改:在所有unix平台上添加了spawn,并为某些unix平台添加了forkserver。子进程不再继承Windows上的所有父可继承句柄。

要选择启动方法可以使用set_start_method():

importmultiprocessingasmp

def foo(q):

q.put('hello')

if __name__ == '__main__':

mp.set_start_method('spawn')

#mp.set_start_method('spawn')

q = mp.Queue()

p = mp.Process(target=foo, args=(q,))

p.start()

print(q.get())

p.join()

在程序中set_start_mehtod()只能被使用一次。

如果像上例中使用多次会报错:RuntimeError: context has already been set

或者,可以使用get_context()获取上下文对象,该对象与multiprocess具有相同的api,并允许在同一程序中使用多个启动方法。

importmultiprocessingasmp

def foo(q):

q.put('hello')

if __name__ == '__main__':

ctx = mp.get_context('spawn')

q = ctx.Queue()

p = ctx.Process(target=foo, args=(q,))

p.start()

print(q.get())

p.join()

1.3. exchanging objects between processes

该模块支持两种进程间的通讯通道。

  1. queues

The Queue class is near clone of queue.Queue.

frommultiprocessingimport Process, Queue

def f(q):

q.put([42, None, 'hello'])

if __name__ == '__main__':

q = Queue()

p = Process(target=f, args=(q,))

p.start()

print(q.get()) # prints "[42, None, 'hello']"

p.join()

Queues are thread and process safe.

  1. pipes

the Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex(two-way).

frommultiprocessingimport Process, Pipe

def f(conn):

conn.send([42, None, 'hello'])

conn.close()

if __name__ == '__main__':

parent_conn, child_conn = Pipe()

p = Process(target=f, args=(child_conn,))

p.start()

print(parent_conn.recv()) # prints "[42, None, 'hello']"

p.join()

Pipe()返回的两个连接对象表示管道的两端。

每个连接对象都有send() and recv()方法。

值得注意的是,如果在管道一端同时读或者写数据可能导致数据损坏,同时操作管道两端并没有任何风险。

1.4. synchronization between processes

Multiprocessing 包含所有threading中支持的同步方式。

# lock

from multiprocessing import Process, Lock

from time import sleep

def f(l, i):

l.acquire()

try:

print('hello world', i)

sleep(2)

print('555666')

finally:

l.release()

pass

if __name__ == '__main__':

lock = Lock()

for num in range(10):

Process(target=f, args=(lock, num)).start()

注意:在idle下运行看不到输出,可能是输出未指向该窗口对象。

1.5. sharing state between processes

在并发编程时,尽可能不要使用状态共享,在多进程编程时尤其如此。

如果确实需要共享数据,Multiprocessing也有一些方法。

  1. shared memory

使用value或array存储共享数据。

frommultiprocessingimport Process, Value, Array

def f(n, a):

n.value = 3.1415927

for i in range(len(a)):

a[i] = -a[i]

if __name__ == '__main__':

num = Value('d', 0.0)

arr = Array('i', range(10))

p = Process(target=f, args=(num, arr))

p.start()

p.join()

print(num.value)

print(arr[:])

will print

3.1415927

[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

d和i参数初始化使用的格式类型参照array模块。

  1. server process

由Manager()创建的管理对象控制一个服务进程,该进程保存python对象并允许其它进程使用代理访问它。

上文提到的管理对象支持类型:list,dict,Namespac,Lock,Rlock,Semaphore,BoundedSemaphore,Condition,Event,Barrier,Queue,Value,Array.

frommultiprocessingimport Process, Manager

def f(d, l):

d[1] = '1'

d['2'] = 2

d[0.25] = None

l.reverse()

if __name__ == '__main__':

with Manager() as manager:

d = manager.dict()

l = manager.list(range(10))

p = Process(target=f, args=(d, l))

p.start()

p.join()

print(d)

print(l)

will print

{0.25: None, 1: '1', '2': 2}

[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

服务进程管理比共享内存支持更多对象类型,一个管理对象可以在不同的计算机上访问;但是,服务进程比共享内存慢。

1.6. pool

frommultiprocessingimport Pool, TimeoutError

importtime

importos

def f(x):

return x*x

if __name__ == '__main__':

# start 4 worker processes

with Pool(processes=4) as pool:

# print "[0, 1, 4,..., 81]"

print(pool.map(f, range(10)))

# print same numbers in arbitrary order

for i in pool.imap_unordered(f, range(10)):

print(i)

# evaluate "f(20)" asynchronously

res = pool.apply_async(f, (20,)) # runs in *only* one process

print(res.get(timeout=1)) # prints "400"

# evaluate "os.getpid()" asynchronously

res = pool.apply_async(os.getpid, ()) # runs in *only* one process

print(res.get(timeout=1)) # prints the PID of that process

# launching multiple evaluations asynchronously *may* use more processes

multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]

print([res.get(timeout=1) for res in multiple_results])

# make a single worker sleep for 10 secs

res = pool.apply_async(time.sleep, (10,))

try:

print(res.get(timeout=1))

except TimeoutError:

print("We lacked patience and got a multiprocessing.TimeoutError")

print("For the moment, the pool remains available for more work")

# exiting the 'with'-block has stopped the pool

print("Now the pool is closed and no longer available")

2. 包内容参考

multiprocessing包的api和threading模块一致。

2.1. process and exceptions

class multiprocessing.Process(group=None, target=None, name=None, args=(),kwargs={}, *, daemon=None)

进程类实现进程的操作,该类具有的方法与threading.Thread相同。

If a subclass overrides the constructor, it must make sure it invokes the base class constructor (Process.__init__()) before doing anything else to the process.

与Thread相同方法不再列出,下面列出一些进程类的独特内容:

  1. pid:process ID
  2. exitcode

the child’s exit code.如果进程未被终止,该值为None;如果返回-N代表子进程被信号N终止。

  1. authkey
  2. sentinel
  3. terminate()

terminate the process. On unix this is done using the SIGTERM SIGNAL; on windows,TerminateProcess() is used.

Note that exit handlers and finally clauses, etc., will not be executed.

进程的后代进程不会被终止,它会变成孤立的。

Note that the start(), join(), is_alive(), terminate() and exitcode methods should only be called by the process that created the process object.

异常:

  1. exception mutilprocessing.ProcessError

进程异常类的基类。

  1. exception mutilprocessing.BufferTooShort

Exception raised by Connection.recv_bytes_into() when the supplied buffer object is too small for the message read.

If e is an instance of BufferTooShort then e.args[0] will give the message as a byte string.

  1. exception multiprocessing.AuthenticationError

Raised when there is an authentication error.

  1. exception multiprocessing.TimeoutError

Raised by methods with a timeout when the timeout expires.

2.2. pipe and queues

在使用多进程时,一般使用消息传递来进行进程之间的通信,避免同步操作例如锁。

消息传递可以使用Pipe() or Queue()

The Queue, SimpleQueue and JoinableQueue types are multi-producer, multi-consumer FIFOqueues modelled on the queue.Queue class in the standard library. They differ in that Queuelacks the task_done() and join() methods introduced into Python 2.5’s queue.Queue class.

If you use JoinableQueue then you must call JoinableQueue.task_done() for each task removed from the queue or else the semaphore used to count the number of unfinished tasks may eventually overflow, raising an exception.

Note that one can also create a shared queue by using a manager object – see Managers.

2.2.1. pipe

multiprocessing.Pipe([duplex])

return a pair(conn1, conn2) of Connection objects representing the ends of a pipe.

duplex为真则双工;为否则单工,且只能conn2发,conn1收。

class multiprocessing.Queue([maxsize])

返回一个共享队列,该队列使用pipe and a few locks/semphores实现。

当一个进程放了一个对象到队列中,一个feeder线程启动,将对象从缓冲区传到管道中。

方法列表:

  1. qsize()

返回队列的大致大小。由于多线程/多处理语义,这个数字不可靠。

  1. empty()

如果队列为空return True,否则返回False;。由于多线程/多处理语义,这是不可靠的。

  1. full() 由于多线程/多处理语义,这是不可靠的。
  2. put)obj[, block[, timeout]])

将对象放入队列。

  1. put_nowait(obj)等效于put(obj, False)
  2. get([block[, timeout]])
  3. get_nowait()

2.2.2. simplequeue

class multiprocessing.SimpleQueue

It is a simplified Queue type, very close to a locked Pipe.

empty()

Return True if the queue is empty, False otherwise.

get()

Remove and return an item from the queue.

put(item)

Put item into the queue.

2.2.3. joinablequeue

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, a Queue subclass, is a queue which additionally has task_done() and join() methods.

task_done()

Indicate that a formerly enqueued task is complete. Used by queue consumers. For each get() used to fetch a task, a subsequent call to task_done() tells the queue that the processing on the task is complete.

If a join() is currently blocking, it will resume when all items have been processed (meaning that a task_done() call was received for every item that had been put()into the queue).

Raises a ValueError if called more times than there were items placed in the queue.

join()

Block until all items in the queue have been gotten and processed.

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

2.3. 杂项

  1. multiprocessing.active_children()

Return list of all live children of the current process.

Calling this has the side effect of “joining” any processes which have already finished.

  1. multiprocessing.cpu_count()

返回系统的CPU数量。

This number is not equivalent to the number of CPUs the current process can use. The number of usable CPUs can be obtained with len(os.sched_getaffinity(0))

May raise NotImplementedError.

  1. multiprocessing.current_process()

Return the Process object corresponding to the current process.

类似于 threading.current_thread().

  1. multiprocessing.freeze_support()

Add support for when a program which uses multiprocessing has been frozen to produce a Windows executable. (Has been tested with py2exe, PyInstaller and cx_Freeze.)

One needs to call this function straight after the if __name__ == '__main__' line of the main module.

For example:

from multiprocessing import Process, freeze_support

def f():

print('hello world!')

if __name__ == '__main__':

freeze_support()

Process(target=f).start()

If the freeze_support() line is omitted then trying to run the frozen executable will raise RuntimeError.

Calling freeze_support() has no effect when invoked on any operating system other than Windows. In addition, if the module is being run normally by the Python interpreter on Windows (the program has not been frozen), then freeze_support() has no effect.

  1. multiprocessing.get_all_start_methods()

返回支持启动方法的列表,列表第一项是创建进程时的默认方式。 The possible start methods are 'fork', 'spawn' and 'forkserver'. On Windows only 'spawn'is available. On Unix 'fork' and 'spawn' are always supported, with 'fork' being the default.

  1. multiprocessing.get_context(method=None)

Return a context object which has the same attributes as the multiprocessing module.

If method is None then the default context is returned. Otherwise method should be 'fork', 'spawn', 'forkserver'. ValueError is raised if the specified start method is not available.

  1. multiprocessing.get_start_method(allow_none=False)

Return the name of start method used for starting processes.

If the start method has not been fixed and allow_none is false, then the start method is fixed to the default and the name is returned. If the start method has not been fixed and allow_none is true then None is returned.

The return value can be 'fork', 'spawn', 'forkserver' or None. 'fork' is the default on Unix, while 'spawn' is the default on Windows.

  1. multiprocessing.set_executable()

Sets the path of the Python interpreter to use when starting a child process. (By default sys.executable is used). Embedders will probably need to do some thing like

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

before they can create child processes.

  1. multiprocessing.set_start_method(method)

Set the method which should be used to start child processes. method can be 'fork', 'spawn' or 'forkserver'.

Note that this should be called at most once, and it should be protected inside the if__name__ == '__main__' clause of the main module.

2.4. connection objects

连接对象允许发送和接收picklable objects or strings.可以想像为socket连接。

通常使用Pipe()创建连接对象。

class multiprocessing.Connection

方法:

  1. send(obj)

发送的对象必需序列化,过大的序列(32MB,取决于OS)可能抛出ValueError。

  1. recv()

如果没有可接收的内容且另一端已关闭抛出EOFError。

  1. fileno()

return the file descriptor or handle used by the connection.

  1. close()
  2. poll([timeout])

返回是否有数据可读。

不指定超时立即返回结果,指定超时则阻塞最长时间为timeout,如果超时参数为None则无限超时。

  1. send_bytes(buffers[, offset[, size]])

发送bytes-like object。bytes-like object包括bytes,bytearray,arrary.array。

  1. recv_bytes([maxlength])
  2. recv_bytes_into(buffer[, offset])

>>>frommultiprocessingimport Pipe

>>> a, b = Pipe()

>>> a.send([1, 'hello', None])

>>> b.recv()

[1, 'hello', None]

>>> b.send_bytes(b'thank you')

>>> a.recv_bytes()

b'thank you'

>>>importarray

>>> arr1 = array.array('i', range(5))

>>> arr2 = array.array('i', [0] * 10)

>>> a.send_bytes(arr1)

>>> count = b.recv_bytes_into(arr2)

>>>assert count == len(arr1) * arr1.itemsize

>>> arr2

array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

2.5. 同步

与threading基本一样。

2.6. process pools

2.6.1. class Pool

class multiprocessing.pool.Pool([processes, initializer, initargs, maxtarsksperchild, context])

参数释义:

  1. processes

进程池中的工作进程数,如果是None则该数为os.cpu_count().

  1. initializer

如果该参数非空,则子进程创建时会调用initializer(*initargs)

  1. maxtarsksperchild

每个工作进程在完成参数指定的任务数后退出并由一个新的进程替代,该操作主要是为了释放资源。如果不指定,工作进程生存周期和pool一样长。

  1. context

methods:

  1. apply(func[, args,kwds])

执行func,它是同步的,一次选中一个工作进程执行。

  1. apply_async(func[, args, kwds, callback, error_callback])

apply()方法的变种,返回结果对象。

  1. map(func, iterable[, chunksize])
  2. map_async(func, iterable[, chunkksize, callback, error_callback])
  3. imap(func, iterable[, chunksize])
  4. imap_unnordered(func,iterable[, chunksize])
  5. starmap(func, iterable[, chunksize])
  6. starmap_async(func,iterable[, chunksize, callback, error_callback])
  7. close()

禁止新的任务提交到池中,当现有任务完成后退出。

  1. terminate()

立即停止。

  1. join()

等待工作进程退出,必需先调用close() or terminate()。

2.6.2. class multiprocessing.pool.AsyncResult

class multiprocessing.pool.AsyncResult

方法:

  1. get([timeout])
  2. wait([timeout])
  3. ready()
  4. successful()

2.6.3. pool使用示例代码

frommultiprocessingimport Pool

importtime

def f(x):

return x*x

if __name__ == '__main__':

with Pool(processes=4) as pool: # start 4 worker processes

result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process

print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow

print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"

it = pool.imap(f, range(10))

print(next(it)) # prints "0"

print(next(it)) # prints "1"

print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow

result = pool.apply_async(time.sleep, (10,))

print(result.get(timeout=1)) # raises multiprocessing.TimeoutError

2.7. listeners and clients

通常进程之间的消息传递使用队列或者Pipe() 生成的connection对象。

multiprocessing.connection模块可以提供更灵活一些的操作功能。

It basically gives a high level message oriented API for dealing with sockets or Windows named pipes. It also has support for digest authentication using the hmac module, and for polling multiple connections at the same time.

下面列出了相关的类及属性。

2.7.1. diver_challenge()

multiprocessing.connection.deliver_challenge(connection, authkey)

Send a randomly generated message to the other end of the connection and wait for a reply.

If the reply matches the digest of the message using authkey as the key then a welcome message is sent to the other end of the connection. Otherwise AuthenticationError is raised.

multiprocessing.connection.answer_challenge(connection, authkey)

Receive a message, calculate the digest of the message using authkey as the key, and then send the digest back.

If a welcome message is not received, then AuthenticationError is raised.

multiprocessing.connection.Client(address[, family[, authkey]])

Attempt to set up a connection to the listener which is using address address, returning a Connection.

The type of the connection is determined by family argument, but this can generally be omitted since it can usually be inferred from the format of address. (See Address Formats)

If authkey is given and not None, it should be a byte string and will be used as the secret key for an HMAC-based authentication challenge. No authentication is done if authkey is None. AuthenticationError is raised if authentication fails. See Authentication keys.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

A wrapper for a bound socket or Windows named pipe which is ‘listening’ for connections.

address is the address to be used by the bound socket or named pipe of the listener object.

方法及属性

  1. accept()

Accept a connection on the bound socket or named pipe of the listener object and return a Connection object. If authentication is attempted and fails, thenAuthenticationError is raised.

  1. close()

Close the bound socket or named pipe of the listener object. This is called automatically when the listener is garbage collected. However it is advisable to call it explicitly.

Listener objects have the following read-only properties:

  1. address

The address which is being used by the Listener object.

  1. last_accepted

The address from which the last accepted connection came. If this is unavailable then it is None.

.

multiprocessing.connection.wait(object_list, timeout=None)

Wait till an object in object_list is ready. Returns the list of those objects in object_listwhich are ready. If timeout is a float then the call blocks for at most that many seconds. Iftimeout is None then it will block for an unlimited period. A negative timeout is equivalent to a zero timeout.

For both Unix and Windows, an object can appear in object_list if it is

a readable Connection object;

a connected and readable socket.socket object; or

the sentinel attribute of a Process object.

A connection or socket object is ready when there is data available to be read from it, or the other end has been closed.

2.8. logging

multiprocessing.get_logger()

返回多线程的logger。

首次创建时,logger的级别为logging.NOTSET且没有绑定默认处理端。发送到此记录器的消息不会传到根日志处理器。

multiprocessing.log_to_stderr()

注:有部分内容并未做笔记。