Python进程、线程、协程详解

2019年12月06日 阅读数:59
这篇文章主要向大家介绍Python进程、线程、协程详解,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

进程与线程的历史

咱们都知道计算机是由硬件和软件组成的。硬件中的CPU是计算机的核心,它承担计算机的全部任务。 操做系统是运行在硬件之上的软件,是计算机的管理者,它负责资源的管理和分配、任务的调度。 程序是运行在系统上的具备某种功能的软件,好比说浏览器,音乐播放器等。 每次执行程序的时候,都会完成必定的功能,好比说浏览器帮咱们打开网页,为了保证其独立性,就须要一个专门的管理和控制执行程序的数据结构——进程控制块。 进程就是一个程序在一个数据集上的一次动态执行过程。 进程通常由程序、数据集、进程控制块三部分组成。咱们编写的程序用来描述进程要完成哪些功能以及如何完成;数据集则是程序在执行过程当中所须要使用的资源;进程控制块用来记录进程的外部特征,描述进程的执行变化过程,系统能够利用它来控制和管理进程,它是系统感知进程存在的惟一标志。python


在早期的操做系统里,计算机只有一个核心,进程执行程序的最小单位,任务调度采用时间片轮转的抢占式方式进行进程调度。每一个进程都有各自的一块独立的内存,保证进程彼此间的内存地址空间的隔离。 随着计算机技术的发展,进程出现了不少弊端,一是进程的建立、撤销和切换的开销比较大,二是因为对称多处理机(对称多处理机(SymmetricalMulti-Processing)又叫SMP,是指在一个计算机上聚集了一组处理器(多CPU),各CPU之间共享内存子系统以及总线结构)的出现,能够知足多个运行单位,而多进程并行开销过大。 这个时候就引入了线程的概念。 线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程当中的最小单元,由线程ID、程序计数器、寄存器集合 和堆栈共同组成。线程的引入减少了程序并发执行时的开销,提升了操做系统的并发性能。 线程没有本身的系统资源,只拥有在运行时必不可少的资源。但线程能够与同属与同一进程的其余线程共享进程所拥有的其余资源。git


进程与线程之间的关系

线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的线程都会被强制退出并清除。线程可与属于同一进程的其它线程共享进程所拥有的所有资源,可是其自己基本上不拥有系统资源,只拥有一点在运行中必不可少的信息(如程序计数器、一组寄存器和栈)。程序员


python 线程github

Threading用于提供线程相关的操做,线程是应用程序中工做的最小单元。算法


一、threading模块

threading 模块创建在 _thread 模块之上。thread 模块以低级、原始的方式来处理和控制线程,而 threading 模块经过对 thread 进行二次封装,提供了更方便的 api 来处理线程。api

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import  threading
import  time
   
def  worker(num):
     """
     thread worker function
     :return:
     """
     time.sleep( 1 )
     print ( "The num is  %d"  %  num)
     return
   
for  in  range ( 20 ):
     =  threading.Thread(target = worker,args = (i,),name = “t. % d”  %  i)
     t.start()

上述代码建立了20个“前台”线程,而后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令。浏览器


Thread方法说明安全


t.start() : 激活线程,bash


t.getName() : 获取线程的名称网络


t.setName() : 设置线程的名称 


t.name : 获取或设置线程的名称


t.is_alive() : 判断线程是否为激活状态


t.isAlive() :判断线程是否为激活状态


t.setDaemon() 设置为后台线程或前台线程(默认:False);经过一个布尔值设置线程是否为守护线程,必须在执行start()方法以后才可使用。若是是后台线程,主线程执行过程当中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均中止;若是是前台线程,主线程执行过程当中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序中止


t.isDaemon() : 判断是否为守护线程


t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法以后该属性才有效,不然它只返回None。


t.join() :逐个执行每一个线程,执行完毕后继续往下执行,该方法使得多线程变得无心义


t.run() :线程被cpu调度后自动执行线程对象的run方法


二、线程锁threading.RLock和threading.Lock


因为线程之间是进行随机调度,而且每一个线程可能只执行n条执行以后,CPU接着执行其余线程。为了保证数据的准确性,引入了锁的概念。因此,可能出现以下问题:


例:假设列表A的全部元素都为0,当一个线程从前向后打印列表的全部元素,另一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部分为0,一部分为1,这就致使了数据的不一致。锁的出现解决了这个问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import  threading
import  time
   
globals_num  =  0
   
lock  =  threading.RLock()
   
def  Func():
     lock.acquire()   # 得到锁
     global  globals_num
     globals_num  + =  1
     time.sleep( 1 )
     print (globals_num)
     lock.release()   # 释放锁
   
for  in  range ( 10 ):
     =  threading.Thread(target = Func)
     t.start()


三、threading.RLock和threading.Lock 的区别

RLock容许在同一线程中被屡次acquire。而Lock却不容许这种状况。 若是使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐。

1
2
3
4
5
6
7
8
9
10
11
12
import  threading
lock  =  threading.Lock()     #Lock对象
lock.acquire()
lock.acquire()   #产生了死琐。
lock.release()
lock.release() 
import  threading
rLock  =  threading.RLock()   #RLock对象
rLock.acquire()
rLock.acquire()     #在同一线程内,程序不会堵塞。
rLock.release()
rLock.release()


四、threading.Event


python线程的事件用于主线程控制其余线程的执行,事件主要提供了三个方法 set、wait、clear。


事件处理的机制:全局定义了一个“Flag”,若是“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,若是“Flag”值为True,那么event.wait 方法时便再也不阻塞。


clear:将“Flag”设置为False

set:将“Flag”设置为True

Event.isSet() :判断标识位是否为Ture。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import  threading
   
def  do(event):
     print ( 'start' )
     event.wait()
     print ( 'execute' )
   
event_obj  =  threading.Event()
for  in  range ( 10 ):
     =  threading.Thread(target = do, args = (event_obj,))
     t.start()
   
event_obj.clear()
inp  =  input ( 'input:' )
if  inp  = =  'true' :
     event_obj. set ()

当线程执行的时候,若是flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。


五、threading.Condition


一个condition变量老是与某些类型的锁相联系,这个可使用默认的状况或建立一个,当几个condition变量必须共享和同一个锁的时候,是颇有用的。锁是conditon对象的一部分:没有必要分别跟踪。


condition变量服从上下文管理协议:with语句块封闭以前能够获取与锁的联系。 acquire() 和 release() 会调用与锁相关联的相应的方法。


其余和锁关联的方法必须被调用,wait()方法会释放锁,当另一个线程使用 notify() or notify_all()唤醒它以前会一直阻塞。一旦被唤醒,wait()会从新得到锁并返回,


Condition类实现了一个conditon变量。 这个conditiaon变量容许一个或多个线程等待,直到他们被另外一个线程通知。 若是lock参数,被给定一个非空的值,,那么他必须是一个lock或者Rlock对象,它用来作底层锁。不然,会建立一个新的Rlock对象,用来作底层锁。


wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,若是调用它的线程没有获得锁,那么会抛出一个RuntimeError 异常。 wati()释放锁之后,在被调用相同条件的另外一个进程用notify() or notify_all() 叫醒以前 会一直阻塞。wait() 还能够指定一个超时时间。

若是有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒全部在等待conditon变量的线程。


注意: notify()和notify_all()不会释放锁,也就是说,线程被唤醒后不会马上返回他们的wait() 调用。除非线程调用notify()和notify_all()以后放弃了锁的全部权。


在典型的设计风格里,利用condition变量用锁去通许访问一些共享状态,线程在获取到它想获得的状态前,会反复调用wait()。修改状态的线程在他们状态改变时调用 notify() or notify_all(),用这种方式,线程会尽量的获取到想要的一个等待者状态。 例子: 生产者-消费者模型,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import  threading
import  time
def  consumer(cond):
     with cond:
         print ( "consumer before wait" )
         cond.wait()
         print ( "consumer after wait" )
   
def  producer(cond):
     with cond:
         print ( "producer before notifyAll" )
         cond.notifyAll()
         print ( "producer after notifyAll" )
   
condition  =  threading.Condition()
c1  =  threading.Thread(name = "c1" , target = consumer, args = (condition,))
c2  =  threading.Thread(name = "c2" , target = consumer, args = (condition,))
   
=  threading.Thread(name = "p" , target = producer, args = (condition,))
   
c1.start()
time.sleep( 2 )
c2.start()
time.sleep( 2 )
p.start()


六、queue模块


Queue 就是对队列,它是线程安全的


举例来讲,咱们去麦当劳吃饭。饭店里面有厨师职位,前台负责把厨房作好的饭卖给顾客,顾客则去前台领取作好的饭。这里的前台就至关于咱们的队列。造成管道样,厨师作好饭经过前台传送给顾客,所谓单向队列


这个模型也叫生产者-消费者模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import  queue
  
=  queue.Queue(maxsize = 0 )   # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。
  
q.join()     # 等到队列为kong的时候,在执行别的操做
q.qsize()    # 返回队列的大小 (不可靠)
q.empty()    # 当队列为空的时候,返回True 不然返回False (不可靠)
q.full()     # 当队列满的时候,返回True,不然返回False (不可靠)
q.put(item, block = True , timeout = None #  将item放入Queue尾部,item必须存在,能够参数block默认为True,表示当队列满时,会等待队列给出可用位置,
                         为 False 时为非阻塞,此时若是队列已满,会引起queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,事后,
                          若是队列没法给出放入item的位置,则引起 queue.Full 异常
q.get(block = True , timeout = None #   移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,若是队列为空,则阻塞,为False时,不阻塞,
                      若此时队列为空,则引起 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,事后,若是队列为空,则引起Empty异常。
q.put_nowait(item)  #   等效于 put(item,block=False)
q.get_nowait()  #    等效于 get(item,block=False)

代码以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#!/usr/bin/env python
import  Queue
import  threading
message  =  Queue.Queue( 10 )
  
  
def  producer(i):
     while  True :
         message.put(i)
  
  
def  consumer(i):
     while  True :
         msg  =  message.get()
  
  
for  in  range ( 12 ):
     =  threading.Thread(target = producer, args = (i,))
     t.start()
  
for  in  range ( 10 ):
     =  threading.Thread(target = consumer, args = (i,))
     t.start()

那就本身作个线程池吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 简单往队列中传输线程数
import  threading
import  time
import  queue
class  Threadingpool():
     def  __init__( self ,max_num  =  10 ):
         self .queue  =  queue.Queue(max_num)
         for  in  range (max_num):
             self .queue.put(threading.Thread)
     def  getthreading( self ):
         return  self .queue.get()
     def  addthreading( self ):
         self .queue.put(threading.Thread)
def  func(p,i):
     time.sleep( 1 )
     print (i)
     p.addthreading()
if  __name__  = =  "__main__" :
     =  Threadingpool()
     for  in  range ( 20 ):
         thread  =  p.getthreading()
         =  thread(target  =  func, args  =  (p,i))
         t.start()


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#往队列中无限添加任务
import  queue
import  threading
import  contextlib
import  time
StopEvent  =  object ()
class  ThreadPool( object ):
     def  __init__( self , max_num):
         self .q  =  queue.Queue()
         self .max_num  =  max_num
         self .terminal  =  False
         self .generate_list  =  []
         self .free_list  =  []
     def  run( self , func, args, callback = None ):
         """
         线程池执行一个任务
         :param func: 任务函数
         :param args: 任务函数所需参数
         :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数一、任务函数执行状态;二、任务函数返回值(默认为None,即:不执行回调函数)
         :return: 若是线程池已经终止,则返回True不然None
         """
         if  len ( self .free_list)  = =  0  and  len ( self .generate_list) <  self .max_num:
             self .generate_thread()
         =  (func, args, callback,)
         self .q.put(w)
     def  generate_thread( self ):
         """
         建立一个线程
         """
         =  threading.Thread(target = self .call)
         t.start()
     def  call( self ):
         """
         循环去获取任务函数并执行任务函数
         """
         current_thread  =  threading.currentThread
         self .generate_list.append(current_thread)
         event  =  self .q.get()   # 获取线程
         while  event ! =  StopEvent:    # 判断获取的线程数不等于全局变量
             func, arguments, callback  =  event    # 拆分元祖,得到执行函数,参数,回调函数
             try :
                 result  =  func( * arguments)    # 执行函数
                 status  =  True
             except  Exception as e:     # 函数执行失败
                 status  =  False
                 result  =  e
             if  callback  is  not  None :
                 try :
                     callback(status, result)
                 except  Exception as e:
                     pass
             # self.free_list.append(current_thread)
             # event = self.q.get()
             # self.free_list.remove(current_thread)
             with  self .work_state():
                 event  =  self .q.get()
         else :
             self .generate_list.remove(current_thread)
     def  close( self ):
         """
         关闭线程,给传输全局非元祖的变量来进行关闭
         :return:
         """
         for  in  range ( len ( self .generate_list)):
             self .q.put(StopEvent)
     def  terminate( self ):
         """
         忽然关闭线程
         :return:
         """
         self .terminal  =  True
         while  self .generate_list:
             self .q.put(StopEvent)
         self .q.empty()
     @contextlib.contextmanager
     def  work_state( self ):
         self .free_list.append(threading.currentThread)
         try :
             yield
         finally :
             self .free_list.remove(threading.currentThread)
def  work(i):
     print (i)
     return  + 1  # 返回给回调函数
def  callback(ret):
     print (ret)
pool  =  ThreadPool( 10 )
for  item  in  range ( 50 ):
     pool.run(func = work, args = (item,),callback = callback)
pool.terminate()
# pool.close()


python 进程

multiprocessing是python的多进程管理包,和threading.Thread相似。


一、multiprocessing模块


直接从侧面用subprocesses替换线程使用GIL的方式,因为这一点,multiprocessing模块可让程序员在给定的机器上充分的利用CPU。在multiprocessing中,经过建立Process对象生成进程,而后调用它的start()方法,

1
2
3
4
5
6
7
8
9
10
from  multiprocessing  import  Process
  
def  func(name):
     print ( 'hello' , name)
  
  
if  __name__  = =  "__main__" :
     =  Process(target = func,args = ( 'zhangyanlin' ,))
     p.start()
     p.join()   # 等待进程执行完毕

在使用并发设计的时候最好尽量的避免共享数据,尤为是在使用多进程的时候。 若是你真有须要 要共享数据, multiprocessing提供了两种方式。


(1)multiprocessing,Array,Value


数据能够用Value或Array存储在一个共享内存地图里,以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
from  multiprocessing  import  Array,Value,Process
  
def  func(a,b):
     a.value  =  3.333333333333333
     for  in  range ( len (b)):
         b[i]  =  - b[i]
  
  
if  __name__  = =  "__main__" :
     num  =  Value( 'd' , 0.0 )
     arr  =  Array( 'i' , range ( 11 ))
  
  
     =  Process(target = func,args = (num,arr))
     d =  Process(target = func,args = (num,arr))
     c.start()
     d.start()
     c.join()
     d.join()
  
     print (num.value)
     for  in  arr:
         print (i)

输出:

1
2
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

建立num和arr时,“d”和“i”参数由Array模块使用的typecodes建立:“d”表示一个双精度的浮点数,“i”表示一个有符号的整数,这些共享对象将被线程安全的处理。


Array(‘i’, range(10))中的‘i’参数:


‘c’: ctypes.c_char     ‘u’: ctypes.c_wchar    ‘b’: ctypes.c_byte     ‘B’: ctypes.c_ubyte

‘h’: ctypes.c_short     ‘H’: ctypes.c_ushort    ‘i’: ctypes.c_int      ‘I’: ctypes.c_uint

‘l’: ctypes.c_long,    ‘L’: ctypes.c_ulong    ‘f’: ctypes.c_float    ‘d’: ctypes.c_double


(2)multiprocessing,Manager


由Manager()返回的manager提供list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value and Array类型的支持。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
from  multiprocessing  import  Process,Manager
def  f(d,l):
     d[ "name" =  "zhangyanlin"
     d[ "age" =  18
     d[ "Job" =  "pythoner"
     l.reverse()
  
if  __name__  = =  "__main__" :
     with Manager() as man:
         =  man. dict ()
         =  man. list ( range ( 10 ))
  
         =  Process(target = f,args = (d,l))
         p.start()
         p.join()
  
         print (d)
         print (l)



输出:

1
2
{0.25: None, 1:  '1' '2' : 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Server process manager比 shared memory 更灵活,由于它能够支持任意的对象类型。另外,一个单独的manager能够经过进程在网络上不一样的计算机之间共享,不过他比shared memory要慢。


二、进程池(Using a pool of workers)


Pool类描述了一个工做进程池,他有几种不一样的方法让任务卸载工做进程。


进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,若是进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。


咱们能够用Pool类建立一个进程池, 展开提交的任务给进程池。 例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#apply
from   multiprocessing  import  Pool
import  time
  
def  f1(i):
     time.sleep( 0.5 )
     print (i)
     return  +  100
  
if  __name__  = =  "__main__" :
     pool  =  Pool( 5 )
     for  in  range ( 1 , 31 ):
         pool. apply (func = f1,args = (i,))
  
#apply_async
def  f1(i):
     time.sleep( 0.5 )
     print (i)
     return  +  100
def  f2(arg):
     print (arg)
  
if  __name__  = =  "__main__" :
     pool  =  Pool( 5 )
     for  in  range ( 1 , 31 ):
         pool.apply_async(func = f1,args = (i,),callback = f2)
     pool.close()
     pool.join()

一个进程池对象能够控制工做进程池的哪些工做能够被提交,它支持超时和回调的异步结果,有一个相似map的实现。


processes :使用的工做进程的数量,若是processes是None那么使用 os.cpu_count()返回的数量。

initializer: 若是initializer是None,那么每个工做进程在开始的时候会调用initializer(*initargs)。

maxtasksperchild:工做进程退出以前能够完成的任务数,完成后用一个心的工做进程来替代原进程,来让闲置的资源被释放。maxtasksperchild默认是None,意味着只要Pool存在工做进程就会一直存活。

context: 用在制定工做进程启动时的上下文,通常使用 multiprocessing.Pool() 或者一个context对象的Pool()方法来建立一个池,两种方法都适当的设置了context

注意:Pool对象的方法只能够被建立pool的进程所调用。


New in version 3.2: maxtasksperchild

New in version 3.4: context


进程池的方法

apply(func[, args[, kwds]]) :使用arg和kwds参数调用func函数,结果返回前会一直阻塞,因为这个缘由,apply_async()更适合并发执行,另外,func函数仅被pool中的一个进程运行。


apply_async(func[, args[, kwds[, callback[, error_callback]]]]) : apply()方法的一个变体,会返回一个结果对象。若是callback被指定,那么callback能够接收一个参数而后被调用,当结果准备好回调时会调用callback,调用失败时,则用error_callback替换callback。 Callbacks应被当即完成,不然处理结果的线程会被阻塞。


close() : 阻止更多的任务提交到pool,待任务完成后,工做进程会退出。


terminate() : 无论任务是否完成,当即中止工做进程。在对pool对象进程垃圾回收的时候,会当即调用terminate()。


join() : wait工做线程的退出,在调用join()前,必须调用close() or terminate()。这样是由于被终止的进程须要被父进程调用wait(join等价与wait),不然进程会成为僵尸进程。


map(func, iterable[, chunksize])¶


map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶


imap(func, iterable[, chunksize])¶


imap_unordered(func, iterable[, chunksize])


starmap(func, iterable[, chunksize])¶


starmap_async(func, iterable[, chunksize[, callback[, error_back]]])


python 协程


线程和进程的操做是由程序触发系统接口,最后的执行者是系统;协程的操做则是程序员。


协程存在的意义:对于多线程应用,CPU经过切片的方式来切换线程间的执行,线程切换时须要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。


协程的适用场景:当程序中存在大量不须要CPU的操做时(IO),适用于协程;


event loop是协程执行的控制点, 若是你但愿执行协程, 就须要用到它们。


event loop提供了以下的特性:


注册、执行、取消延时调用(异步函数)

建立用于通讯的client和server协议(工具)

建立和别的程序通讯的子进程和协议(工具)

把函数调用送入线程池中

协程示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
import  asyncio
   
async  def  cor1():
     print ( "COR1 start" )
     await cor2()
     print ( "COR1 end"