Ruby 多线程

每个正在系统上运行的程序都是一个进程。每个进程包含一到多个线程。

线程是程序中一个单一的顺序控制流程,在单个程序中同时运行多个线程完成不同的工作,称为多线程。

Ruby 中我们可以通过 Thread 类来创建多线程,Ruby的线程是一个轻量级的,可以以高效的方式来实现并行的代码。

创建 Ruby 线程

要启动一个新的线程,只需要调用 Thread.new 即可:

#线程#1代码部分Thread.new{#线程#2执行代码}#线程#1执行代码

在线示例

以下示例展示了如何在Ruby程序中使用多线程:

在线示例

#!/usr/bin/rubydeffunc1i=0whilei<=2puts"func1at:#{Time.now}"sleep(2)i=i+1endenddeffunc2j=0whilej<=2puts"func2at:#{Time.now}"sleep(1)j=j+1endendputs"StartedAt#{Time.now}"t1=Thread.new{func1()}t2=Thread.new{func2()}t1.joint2.joinputs"Endat#{Time.now}"

以上代码执行结果为:

StartedAtWedMay1408:21:54-07002014func1at:WedMay1408:21:54-07002014func2at:WedMay1408:21:54-07002014func2at:WedMay1408:21:55-07002014func1at:WedMay1408:21:56-07002014func2at:WedMay1408:21:56-07002014func1at:WedMay1408:21:58-07002014EndatWedMay1408:22:00-07002014

线程生命周期

1、线程的创建可以使用Thread.new,同样可以以同样的语法使用Thread.start 或者Thread.fork这三个方法来创建线程。

2、创建线程后无需启动,线程会自动执行。

3、Thread 类定义了一些方法来操控线程。线程执行Thread.new中的代码块。

4、线程代码块中最后一个语句是线程的值,可以通过线程的方法来调用,如果线程执行完毕,则返回线程值,否则不返回值直到线程执行完毕。

5、Thread.current 方法返回表示当前线程的对象。 Thread.main 方法返回主线程。

6、通过 Thread.Join 方法来执行线程,这个方法会挂起主线程,直到当前线程执行完毕。

线程状态

线程有5种状态:

线程状态返回值
可执行run
睡眠Sleeping
退出aborting
正常终止false
发生异常终止nil

线程和异常

当某线程发生异常,且没有被rescue捕捉到时,该线程通常会被无警告地终止。但是,若有其它线程因为Thread#join的关系一直等待该线程的话,则等待的线程同样会被引发相同的异常。

begint=Thread.newdoThread.pass#主线程确实在等joinraise"unhandledexception"endt.joinrescuep$!#=>"unhandledexception"end

使用下列3个方法,就可以让解释器在某个线程因异常而终止时中断运行。

  • 启动脚本时指定-d选项,并以调试模时运行。

  • 用Thread.abort_on_exception设置标志。

  • 使用Thread#abort_on_exception对指定的线程设定标志。

当使用上述3种方法之一后,整个解释器就会被中断。

t=Thread.new{...}t.abort_on_exception=true

线程同步控制

在Ruby中,提供三种实现同步的方式,分别是:

1. 通过Mutex类实现线程同步

2. 监管数据交接的Queue类实现线程同步

3. 使用ConditionVariable实现同步控制

通过Mutex类实现线程同步

通过Mutex类实现线程同步控制,如果在多个线程钟同时需要一个程序变量,可以将这个变量部分使用lock锁定。代码如下:

在线示例

#!/usr/bin/rubyrequire"thread"puts"SynchronizeThread"@num=200@mutex=Mutex.newdefbuyTicket(num)@mutex.lockif@num>=num@num=@num-numputs"youhavesuccessfullybought#{num}tickets"elseputs"sorry,noenoughtickets"end@mutex.unlockendticket1=Thread.new10do10.timesdo|value|ticketNum=15buyTicket(ticketNum)sleep0.01endendticket2=Thread.new10do10.timesdo|value|ticketNum=20buyTicket(ticketNum)sleep0.01endendsleep1ticket1.jointicket2.join

以上代码执行结果为:

SynchronizeThreadyouhavesuccessfullybought15ticketsyouhavesuccessfullybought20ticketsyouhavesuccessfullybought15ticketsyouhavesuccessfullybought20ticketsyouhavesuccessfullybought15ticketsyouhavesuccessfullybought20ticketsyouhavesuccessfullybought15ticketsyouhavesuccessfullybought20ticketsyouhavesuccessfullybought15ticketsyouhavesuccessfullybought20ticketsyouhavesuccessfullybought15ticketssorry,noenoughticketssorry,noenoughticketssorry,noenoughticketssorry,noenoughticketssorry,noenoughticketssorry,noenoughticketssorry,noenoughticketssorry,noenoughticketssorry,noenoughtickets

除了使用lock锁定变量,还可以使用try_lock锁定变量,还可以使用Mutex.synchronize同步对某一个变量的访问。

监管数据交接的Queue类实现线程同步

Queue类就是表示一个支持线程的队列,能够同步对队列末尾进行访问。不同的线程可以使用统一个对类,但是不用担心这个队列中的数据是否能够同步,另外使用SizedQueue类能够限制队列的长度

SizedQueue类能够非常便捷的帮助我们开发线程同步的应用程序,应为只要加入到这个队列中,就不用关心线程的同步问题。

经典的生产者消费者问题:

在线示例

#!/usr/bin/rubyrequire"thread"puts"SizedQueeTest"queue=Queue.newproducer=Thread.newdo10.timesdo|i|sleeprand(i)#让线程睡眠一段时间queue<<iputs"#{i}produced"endendconsumer=Thread.newdo10.timesdo|i|value=queue.popsleeprand(i/2)puts"consumed#{value}"endendconsumer.join

程序的输出:

SizedQueeTest0produced1producedconsumed02producedconsumed1consumed23producedconsumed34producedconsumed45producedconsumed56producedconsumed67producedconsumed78produced9producedconsumed8consumed9

线程变量

线程可以有其私有变量,线程的私有变量在线程创建的时候写入线程。可以被线程范围内使用,但是不能被线程外部进行共享。

但是有时候,线程的局部变量需要别别的线程或者主线程访问怎么办?ruby当中提供了允许通过名字来创建线程变量,类似的把线程看做hash式的散列表。通过[]=写入并通过[]读出数据。我们来看一下下面的代码:

在线示例

#!/usr/bin/rubycount=0arr=[]10.timesdo|i|arr[i]=Thread.new{sleep(rand(0)/10.0)Thread.current["mycount"]=countcount+=1}endarr.each{|t|t.join;printt["mycount"],","}puts"count=#{count}"

以上代码运行输出结果为:

8,0,3,7,2,1,6,5,4,9,count=10

主线程等待子线程执行完成,然后分别输出每个值。。

线程优先级

线程的优先级是影响线程的调度的主要因素。其他因素包括占用CPU的执行时间长短,线程分组调度等等。

可以使用 Thread.priority 方法得到线程的优先级和使用 Thread.priority= 方法来调整线程的优先级。

线程的优先级默认为 0 。 优先级较高的执行的要快。

一个 Thread 可以访问自己作用域内的所有数据,但如果有需要在某个线程内访问其他线程的数据应该怎么做呢? Thread 类提供了线程数据互相访问的方法,你可以简单的把一个线程作为一个 Hash 表,可以在任何线程内使用 []= 写入数据,使用 [] 读出数据。

athr=Thread.new{Thread.current["name"]="ThreadA";Thread.stop}bthr=Thread.new{Thread.current["name"]="ThreadB";Thread.stop}cthr=Thread.new{Thread.current["name"]="ThreadC";Thread.stop}Thread.list.each{|x|puts"#{x.inspect}:#{x["name"]}"}

可以看到,把线程作为一个 Hash 表,使用 [] 和 []= 方法,我们实现了线程之间的数据共享。

线程互斥

Mutex(Mutal Exclusion = 互斥锁)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。

不使用Mutax的示例

在线示例

#!/usr/bin/rubyrequire'thread'count1=count2=0difference=0counter=Thread.newdoloopdocount1+=1count2+=1endendspy=Thread.newdoloopdodifference+=(count1-count2).absendendsleep1puts"count1:#{count1}"puts"count2:#{count2}"puts"difference:#{difference}"

以上示例运行输出结果为:

count1:9712487count2:12501239difference:0

使用 mutex 的示例

在线示例

#!/usr/bin/rubyrequire'thread'mutex=Mutex.newcount1=count2=0difference=0counter=Thread.newdoloopdomutex.synchronizedocount1+=1count2+=1endendendspy=Thread.newdoloopdomutex.synchronizedodifference+=(count1-count2).absendendendsleep1mutex.lockputs"count1:#{count1}"puts"count2:#{count2}"puts"difference:#{difference}"

以上示例运行输出结果为:

count1:1336406count2:1336406difference:0

死锁

两个以上的运算单元,双方都在等待对方停止运行,以获取系统资源,但是没有一方提前退出时,这种状况,就称为死锁。

例如,一个进程 p1占用了显示器,同时又必须使用打印机,而打印机被进程p2占用,p2又必须使用显示器,这样就形成了死锁。

当我们在使用 Mutex 对象时需要注意线程死锁。

在线示例

#!/usr/bin/rubyrequire'thread'mutex=Mutex.newcv=ConditionVariable.newa=Thread.new{mutex.synchronize{puts"A:Ihavecriticalsection,butwillwaitforcv"cv.wait(mutex)puts"A:Ihavecriticalsectionagain!Irule!"}}puts"(Later,backattheranch...)"b=Thread.new{mutex.synchronize{puts"B:NowIamcritical,butamdonewithcv"cv.signalputs"B:Iamstillcritical,finishingup"}}a.joinb.join

以上示例输出结果为:

A:Ihavecriticalsection,butwillwaitforcv(Later,backattheranch...)B:NowIamcritical,butamdonewithcvB:Iamstillcritical,finishingupA:Ihavecriticalsectionagain!Irule!

线程类方法

完整的 Thread(线程) 类方法如下:

序号方法描述
1Thread.abort_on_exception
若其值为真的话,一旦某线程因异常而终止时,整个解释器就会被中断。它的默认值是假,也就是说,在通常情况下,若某线程发生异常且该异常未被Thread#join等检测到时,该线程会被无警告地终止。
2Thread.abort_on_exception=
如果设置为 true, 一旦某线程因异常而终止时,整个解释器就会被中断。返回新的状态
3Thread.critical
返回布尔值。
4Thread.critical=
当其值为true时,将不会进行线程切换。若当前线程挂起(stop)或有信号(signal)干预时,其值将自动变为false。
5Thread.current
返回当前运行中的线程(当前线程)。
6Thread.exit
终止当前线程的运行。返回当前线程。若当前线程是唯一的一个线程时,将使用exit(0)来终止它的运行。
7Thread.fork { block }
与 Thread.new 一样生成线程。
8Thread.kill( aThread )
终止线程的运行.
9Thread.list
返回处于运行状态或挂起状态的活线程的数组。
10Thread.main
返回主线程。
11Thread.new( [ arg ]* ) {| args | block }
生成线程,并开始执行。数会被原封不动地传递给块. 这就可以在启动线程的同时,将值传递给该线程所固有的局部变量。
12Thread.pass
将运行权交给其他线程. 它不会改变运行中的线程的状态,而是将控制权交给其他可运行的线程(显式的线程调度)。
13Thread.start( [ args ]* ) {| args | block }
生成线程,并开始执行。数会被原封不动地传递给块. 这就可以在启动线程的同时,将值传递给该线程所固有的局部变量。
14Thread.stop
将当前线程挂起,直到其他线程使用run方法再次唤醒该线程。

线程示例化方法

以下示例调用了线程示例化方法 join:

在线示例

#!/usr/bin/rubythr=Thread.newdo#示例化puts"Insecondthread"raise"Raiseexception"endthr.join#调用示例化方法join

以下是完整示例化方法列表:

序号方法描述
1thr[ name ]
取出线程内与name相对应的固有数据。 name可以是字符串或符号。若没有与name相对应的数据时, 返回nil。
2thr[ name ] =
设置线程内name相对应的固有数据的值, name可以是字符串或符号。 若设为nil时, 将删除该线程内对应数据。
3thr.abort_on_exception
返回布尔值。
4thr.abort_on_exception=
若其值为true的话,一旦某线程因异常而终止时,整个解释器就会被中断。
5thr.alive?
若线程是"活"的,就返回true。
6thr.exit
终止线程的运行。返回self。
7thr.join
挂起当前线程,直到self线程终止运行为止. 若self因异常而终止时, 将会当前线程引发同样的异常。
8thr.key?
若与name相对应的线程固有数据已经被定义的话,就返回true
9thr.kill
类似于 Thread.exit
10thr.priority
返回线程的优先度. 优先度的默认值为0. 该值越大则优先度越高.
11thr.priority=
设定线程的优先度. 也可以将其设定为负数.
12thr.raise( anException )
在该线程内强行引发异常.
13thr.run
重新启动被挂起(stop)的线程. 与wakeup不同的是,它将立即进行线程的切换. 若对死进程使用该方法时, 将引发ThreadError异常.
14thr.safe_level
返回self 的安全等级. 当前线程的safe_level与$SAFE相同.
15thr.status
使用字符串"run"、"sleep"或"aborting" 来表示活线程的状态. 若某线程是正常终止的话,就返回false. 若因异常而终止的话,就返回nil。
16thr.stop?
若线程处于终止状态(dead)或被挂起(stop)时,返回true.
17thr.value
一直等到self线程终止运行(等同于join)后,返回该线程的块的返回值. 若在线程的运行过程中发生了异常, 就会再次引发该异常.
18thr.wakeup
把被挂起(stop)的线程的状态改为可执行状态(run), 若对死线程执行该方法时,将会引发ThreadError异常。
编辑于2024-05-20 14:47