Lua 学习之基础篇九

引言

讲到协程,首先来介绍一下线程和协程的区别

lua协程和多线程

相同之处:拥有自己独立的桟、局部变量和PC计数器,同时又与其他协程共享全局变量和其他大部分东西

不同之处:一个多线程程序可以同时运行几个线程(并发执行、抢占),而协程却需要彼此协作地运行,并非真正的多线程,即一个多协程程序在同一时间只能运行一个协程,并且正在执行的协程只会在其显式地要求挂起(suspend)时,它的执行才会暂停(无抢占、无并发)。

注意:

  • Lua中的协程无法在外部将其停止,有可能导致程序阻塞

  • 运行的是主线程时调用coroutine.yield()会报错LuaException: attempt to yield from outside a coroutine

协程函数的相关函数方法

方法描述返回值
coroutine.create (f)创建一个主体函数为 f 的新协程。 f 必须是一个 Lua 的函数。 返回这个新协程,它是一个类型为 "thread" 的对象,和 resume 配合使用的时候就唤醒函数调用返回它的控制器(一个对象为thread)的对象
coroutine.resume (co [, val1, ···])使协同从挂起变为运行. (1)激活coroutine,也就是让协程函数开始运行;(2)唤醒yield,使挂起的协同接着上次的地方继续运行。该函数可以传入参数运行成功返回true和前一个调用cyiled中传入的参数;反之false+错误信息
coroutine.yield (···)挂起正在调用的协程的执行。 传递给 yield 的参数都会转为 resume 的额外返回值。返回resume中传入的参数
coroutine.status (co)以字符串形式返回协程 co 的状态: 当协程正在运行(它就是调用 status 的那个) ,返回 "running"; 如果协程调用 yield 挂起或是还没有开始运行,返回 "suspended"; 如果协程是活动的,都并不在运行(即它正在延续其它协程),返回 "normal"; 如果协程运行完主体函数或因错误停止,返回 "dead"。running,suspended,normal,dead
coroutine.wrap (f)创建一个主体函数为 f 的新协程。 f 必须是一个 Lua 的函数。 返回一个函数, 每次调用该函数都会延续该协程。 传给这个函数的参数都会作为 resume 的额外参数。 和 resume 返回相同的值, 只是没有第一个布尔量。 如果发生任何错误,抛出这个错误。.
coroutine.running()返回当前正在运行的协程加一个布尔量。 如果当前运行的协程是主线程,其为真。
coroutine.isyieldable ()如果正在运行的协程可以让出,则返回真。不在主线程中或不在一个无法让出的 C 函数中时,当前协程是可让出的。

下面举例来说明协程的简单创建使用,第一个是create,看下例子了解return参数的应用

print("coroutine start!");
 
--没有yield的协程
local newCor1 = coroutine.create(function()
        return 1,"a"
end)
 
local ret1, num1, str1 = coroutine.resume(newCor1)
 
print("1-----", ret1, num1, str1)
 
--包含一个yield的协程,主要看参数相关
local newCor2 = coroutine.create(function(x)
        x = x+10;
        --str和y的值为resume传入的值
        local str, y = coroutine.yield(x);
        return str, y + x
end)
 
local ret2, x = coroutine.resume(newCor2, 50)
print("2-----", x)
local ret3, str2, y = coroutine.resume(newCor2, "sss", 100);
print("2-----", str2, y)

--输出如下:
coroutine start!
1-----  true    1       a
2-----  60
2-----  sss     160

接着可以看看wrap的方式,具体的区别在于,wrap的返回值是一个函数,所以唤起协程的时候不需要调用resume方法,直接调用wrap返回的函数即可。还有就是和resume返回的不同的是,返回值中少了协程是否成功运行的布尔值。

local newCor3 = coroutine.wrap(function(x)
        x = x - 10;
        local y = coroutine.yield(x);
        return y;
end)
 
--不需要resume函数来唤起,直接调用wrap返回的值
local ret4 = newCor3(100);
print("3-----", ret4)
local ret5 = newCor3(10);
print("3-----", ret5)

3-----       90
3-----  10

继续体会一下用法。


co = coroutine.create(
    function(i)
        print(i);
    end
)
 
print(coroutine.status(co)) --suspended
coroutine.resume(co, 1)   -- 1
print(coroutine.status(co))  -- dead
coroutine.resume(co, 1) 
 
print("----------")
print(coroutine.status(co)) --dead
print(coroutine.resume(co)) --false     cannot resume dead coroutine
 
co = coroutine.wrap(
    function(i)
        print(i);
    end
)
 
co(1)
 
print("----------")
 
co2 = coroutine.create(
    function()
        for i=1,10 do
            print(i)
            if i == 3 then
                print(coroutine.status(co2))  --running
                print(coroutine.running()) --thread:XXXXXX
            end
            coroutine.yield() --这里被挂起,也就是说for循环只会进行一次
        end
    end
)
 
coroutine.resume(co2) --1
coroutine.resume(co2) --2
coroutine.resume(co2) --3
 
print(coroutine.status(co2))   -- suspended
print(coroutine.running())
 
print("----------")

suspended
1
dead
----------
1
----------
1
2
3
running
thread: 0x7f9c12c07c78  false
suspended
thread: 0x7f9c13001008  true
----------

coroutine.running就可以看出来,coroutine在底层实现就是一个线程。

当create一个coroutine的时候就是在新线程中注册了一个事件。

当使用resume触发事件的时候,create的coroutine函数就被执行了,当遇到yield的时候就代表挂起当前线程,等候再次resume触发事件。

function foo (a)
    print("foo 函数输出", a)
    print("执行次数", 1,os.date())
    return coroutine.yield(2 * a) -- 返回  2*a 的值
end
 
co = coroutine.create(function (a , b)
    print("第一次协同程序执行输出", a, b) -- co-body 1 10
    local r = foo(a + 1)
     
    print("第二次协同程序执行输出", r)
    local r, s = coroutine.yield(a + b, a - b)  -- a,b的值为第一次调用协同程序时传入
     
    print("第三次协同程序执行输出", r, s)
    return b, "结束协同程序"                   -- b的值为第二次调用协同程序时传入
end)
        
print("main", coroutine.resume(co, 1, 10)) -- true, 4
print("--分割线----")
print("main", coroutine.resume(co, "r")) -- true 11 -9
print("---分割线---")
print("main", coroutine.resume(co, "x", "y")) -- true 10 end
print("---分割线---")
print("main", coroutine.resume(co, "x", "y")) -- cannot resume dead coroutine
print("---分割线---")
第一次协同程序执行输出     1       10
foo 函数输出        2
执行次数    1       Tue Nov  5 13:23:41 2019
main    true    4
--分割线----
第二次协同程序执行输出     r
main    true    11      -9
---分割线---
第三次协同程序执行输出     x       y
main    true    10      结束协同程序
---分割线---
main    false   cannot resume dead coroutine
---分割线---

总结:

  • 调用resume,将协同程序唤醒,resume操作成功返回true,否则返回false;
  • 协同程序运行,运行到yield语句;
  • yield挂起协同程序,第一次resume返回;(注意:此处yield返回,参数是resume的参数)
  • 第二次resume,再次唤醒协同程序;(注意:此处resume的参数中,除了第一个参数,剩下的参数将作为yield的参数)
  • yield返回;
  • 协同程序继续运行;
  • 如果使用的协同程序继续运行完成后继续调用 resume方法则输出:cannot resume dead coroutine

resume和yield的配合强大之处在于,resume处于主程中,它将外部状态(数据)传入到协同程序内部;而yield则将内部的状态(数据)返回到主程中。

总体来说有点像断点再执行的样子。yield函数可以将正在运行的coroutine 挂起,并可以在适当的时候再重新被唤醒,然后继续运行,这是协程的精髓

协程执行唯一性

当一个coroutine A在resume另一个coroutine B时,A的状态没有变为suspended,我们不能去resume它;但是它也不是running状态,因为当前正在running的是B。这时A的状态其实就是normal 状态了。

前面已经提到了返回值的case,下面再单独列出来加深理解

  1. main函数中没有yield,调用resume时,多余的参数,都被传递给main函数作为参数,下面的示例,1 2 3分别就是a b c的值了:
co = coroutine.create(function (a,b,c)
print(a,b,c)
end
)
coroutine.resume(co,1,2,3)
1       2       3
  1. main函数中有yield,所有被传递给yield的参数,都被返回。因此resume的返回值,除了标志正确运行的true外,还有传递给yield的参数值:
co = coroutine.create(function(a,b)
coroutine.yield(a+b,a-b)end)
a,b,c = coroutine.resume(co,10,20)
print(a,b,c)
true    30      -10
  1. yield也会把多余的参数返回给对应的resume,如下:
co = coroutine.create(function()
    print("co",coroutine.yield())
    end)
    print(coroutine.status(co))
    coroutine.resume(co)
    print(coroutine.status(co))
    print(coroutine.resume(co,4,5))
suspended
suspended
co      4       5
true

但是这里有一个现象,resume第一次返回竟然是空?没有执行起来,是不是很奇怪?再看一下例子:

coroutineFunc = function (a, b) 
    for i = 1, 10 do
        print(i, a, b)
        coroutine.yield()
    end
end

co2 = coroutine.create(coroutineFunc)        --创建协同程序co2
coroutine.resume(co2, 100, 200)                -- 1 100 200 开启协同,传入参数用于初始化
coroutine.resume(co2)                        -- 2 100 200 
coroutine.resume(co2, 500, 600)                -- 3 100 200 继续协同,传入参数无效

co3 = coroutine.create(coroutineFunc)        --创建协同程序co3
coroutine.resume(co3, 300, 400)                -- 1 300 400 开启协同,传入参数用于初始化
coroutine.resume(co3)                        -- 2 300 400 
coroutine.resume(co3)                        -- 3 300 400

协同中的参数传递形势很灵活,一定要注意区分,在启动coroutine的时候,resume的参数是传给主程序的;在唤醒yield的时候,参数是传递给yield的。看下面这个例子:

co = coroutine.create(function (a, b) print("co", a, b, coroutine.yield()) end)
coroutine.resume(co, 1, 2)        --没输出结果,注意两个数字参数是传递给函数的
coroutine.resume(co, 3, 4, 5)        --co 1 2 3 4 5,这里的两个数字参数由resume传递给yield 

resume-yield来交换数据:

​ (1)resume把参数传给程序(相当于函数的参数调用);

  (2)数据由yield传递给resume;

  (3)resume的参数传递给yield;

  (4)协同代码结束时的返回值,也会传给resume

再废话一步,举个例子解释上面的作用,完美百分百让你理解传递逻辑,

coroutineFunc = function (a, b) 
    key  = a 
    print ("co",a)
    coroutine.yield(b)       
end

co2 = coroutine.create(coroutineFunc)      
print(coroutine.resume(co2, 100, 200))   
--co    100
--true  200

我们都知道resume返回的值;一部分是协同本身return的结果,另一部分执行成功返回true+value(这个value是yield传入的),这样是不是理清楚了.

  1. 当一个coroutine结束的时候,main函数的所有返回值都被返回给resume:

  2. co = coroutine.create(function()
    return 6,7 end)
    print (coroutine.resume(co))
    
    true 6       7
    

总结

我们在同一个coroutine中,很少会将上面介绍的这些功能全都用上,但是所有这些功能都是很useful的。

目前为止,我们已经了解了Lua中coroutine的一些知识了。下面我们需要明确几个概念。Lua提供的是asymmetric coroutine,意思是说,它需要一个函数(yield)来挂起一个coroutine,但需要另一个函数(resume)来唤醒这个被挂起的coroutine。对应的,一些语言提供了symmetric coroutine,用来切换当前coroutine的函数只有一个。

有人想把Lua的coroutine称为semi-coroutine,但是这个词已经被用作别的意义了,用来表示一个被限制了一些功能来实现出来的coroutine,这样的coroutine,只有在一个coroutine的调用堆栈中,没有剩余任何挂起的调用时,才会被挂起,换句话说,就是只有main可以挂起。Python中的generator好像就是这样一个类似的semi-coroutine。

跟asymmetric coroutine和symmetric coroutine的区别不同,coroutine和generator(Python中的)的不同在于,generator并么有coroutine的功能强大,一些用coroutine可实现的有趣的功能,用generator是实现不了的。Lua提供了一个功能完整的coroutine,如果有人喜欢symmetric coroutine,可以自己简单的进行一下封装。

pipes和filters

下面这个例子是resume 和yield很好的实际应用。

function receive (prod)
    local status, value = coroutine.resume(prod)
    return value
end

function send (x)
    coroutine.yield(x)
end

function producer()
    return coroutine.create(function ()
        while true do
            local x = io.read() -- produce new value
                print("我进来了吗?1")
            send(x)
               print("我进来了吗?2")
        end
    end)
end
        
function consumer (prod)
    while true do
        local x = receive(prod) -- receive from producer
        io.write(x, "\n") -- consume new value
    end
end

p = producer()
consumer(p)

简言之这个程序的执行顺序先调用consumer, 然后recv函数去resume唤醒producer,produce一个值,send给consumer,然后继续等待下一次resume唤醒。

同样下面这个例子是上面的拆分,帮助理解过程。

function send(x) 
    coroutine.yield(x)
end

co = coroutine.create(
function()
    while true do
    local x = io.read()
    send(x) end
    end)
print(coroutine.status(co))
print(coroutine.resume(co))

我们可以继续扩展一下上面的例子,增加一个filter,在producer和consumer之间做一些数据转换啥的。那么filter里都做些什么呢?我们先看一下没加filter之前的逻辑,基本就是producer去send,send to consumer,consumer去recv,recv from producer,可以这么理解吧。加了filter之后呢,因为filter需要对data做一些转换操作,因此这时的逻辑为,producer去send,send tofilter,filter去recv,recv from producer,filter去send,send to consumer,consumer去recv,recv from filter.


function send(x)
    coroutine.yield(x)
end

function receive (prod)
    local status, value = coroutine.resume(prod)
    print("echo 1")
    print("value is",value)
    return value
end

 
function producer()
    return coroutine.create(function ()
        print("echo 3")
        while true do
            local x = io.read()
            send(x)
        end 
    end)
end
 
function consumer(prod)
    
    while true do
        print("why ?")
        local x = receive(prod)
        if x then
            print("echo 2")
            io.write(x, \'\n\')
        else
            break
        end 
    end 
end
 
function filter(prod)                                                                                                              
    return coroutine.create(function ()
        for line = 1, math.huge do
            print("echo ")
            local x = receive(prod)
            x = string.format(\'%5d %s\', line, x)
            send(x)
        end 
    end)
end
 
p = producer()
f = filter(p)
consumer(f)

印有字串辅助看懂整个sequence,可以看到, consumer执行,透过receive resume叫起filter的协程,同时filter进来后又在透过receive的resume再叫起producer的协程。

讲到这里,你是否想起了unix中的pipe?coroutine怎么说也是multithreading的一种。使用pipe,每个task得以在各自的process里执行,而是用coroutine,每个task在各自的coroutine中执行。pipe在writer(producer)和reader(consumer)之间提供了一个buffer,因此相对的运行速度还是相当可以的。这个是pipe很重要的一个特性,因为process间通信,代价还是有点大的。使用coroutine,不同task之间的切换成本更小,基本上也就是一个函数调用,因此,writer和reader几乎可以说是齐头并进了啊。

用coroutine实现迭代器

我们可以把迭代器 循环看成是一个特殊的producer-consumer例子:迭代器produce,循环体consume。下面我们就看一下coroutine为我们提供的强大的功能,用coroutine来实现迭代器。

我们来遍历一个数组的全排列。先看一下普通的loop实现,代码如下:

function printResult(a)
    for i = 1, #a do
        io.write(a[i], \' \')
    end 
    io.write(\'\n\')
end
 
function permgen(a, n)                                                                                                             
    n = n or #a
    if n <= 1 then
        printResult(a)
    else
        for i = 1, n do
            a[n], a[i] = a[i], a[n]
            permgen(a, n-1)
            a[n], a[i] = a[i], a[n]
        end 
    end 
end
 
permgen({1,2,3})
2 3 1 
3 2 1 
3 1 2 
1 3 2 
2 1 3 
1 2 3 

采用协程实现如下:


function printResult(a)
    for i = 1, #a do
        io.write(a[i], \' \')
    end 
    io.write(\'\n\')
end  
        
function permgen(a, n)
    n = n or #a
    if n <= 1 then
       coroutine.yield(a) 
    else
        for i = 1, n do
            a[n], a[i] = a[i], a[n]
            permgen(a, n-1)
            a[n], a[i] = a[i], a[n]
        end 
    end 
end  
        
function permutations(a)
    local co = coroutine.create(function () permgen(a) end)                                                                        
    return function ()
        local code, res = coroutine.resume(co)
        return res 
    end 
end  
        
for p in permutations({"a", "b", "c"}) do
    printResult(p)
end 
b c a 
c b a 
c a b 
a c b 
b a c 
a b c 

permutations 函数使用了一个Lua中的常规模式,将在函数中去resume一个对应的coroutine进行封装。Lua对这种模式提供了一个函数coroutine.wap 。跟create 一样,wrap 创建一个新的coroutine ,但是并不返回给coroutine,而是返回一个函数,调用这个函数,对应的coroutine就被唤醒去运行。跟原来的resume 不同的是,该函数不会返回errcode作为第一个返回值,一旦有error发生,就退出了(类似C语言的assert)。使用wrap, permutations可以如下实现:

function permutations (a)
    return coroutine.wrap(function () permgen(a) end)
end

wrap 比create 跟简单,它实在的返回了我们最需要的东西:一个可以唤醒对应coroutine的函数。 但是不够灵活。没有办法去检查wrap 创建的coroutine的status, 也不能检查runtime-error(没有返回errcode,而是直接assert)

非抢占式多线程

我们知道,coroutine运行一系列的协作的多线程。每个coroutine相当于一个thread。一个yield-resume对可以在不同的thread之间切换控制权。但是,跟常规的multithr不同,coroutine是非抢占式的。一个coroutine在运行的时候,不可能被其他的coroutine从外部将其挂起,只有由其本身显式地调用yield才会挂起,并交出控制权。对一些程序来说,这没有任何问题,相反,因为非抢占式的缘故,程序变得更加简单。我们不需要担心同步问题的bug,因为在threads之间的同步都是显式的。我们只需要保证在对的时刻调用yield就可以了。

但是,使用非抢占式multithreading,不管哪个thread调用了一个阻塞的操作,那么整个程序都会被阻塞,这是不能容忍的。由于这个原因,很多程序员并不认为coroutine可以替代传统的multithread,但是,下面我们可以看到一个有趣的解决办法。

一个很典型的multithreading场景:通过http下载多个remote files。我们先来看下如何下载一个文件,这需要使用LuaSocket库,

local socket = require("socket")

require("socket")

host = "www.w3.org"
file = "/standards/xml/schema"

c = assert(socket.connect(host, 80))
c:send("GET " .. file .. " HTTP/1.0\r\n\r\n") -- 注意GET后和HTTP前面的空格

while true do
    local s, status, partial = c:receive(2^10)
    io.write(s or partial)
    if status == "closed" then
        break
    end
end

c:close()

现在我们就知道怎么下载一个文件了。现在回到前面说的下载多个remote files的问题。当我们接收一个remote file的时候,程序花费了大多数时间去等待数据的到来,也就是在receive函数的调用是阻塞。因此,如果能够同时下载所有的files,那么程序的运行速度会快很多。下面我们看一下如何用coroutine来模拟这个实现。我们为每一个下载任务创建一个thread,在一个thread没有数据可用的时候,就调用yield 将程序控制权交给一个简单的dispatcher,由dispatcher来唤醒另一个thread。下面我们先把之前的代码写成一个函数,但是有少许改动,不再将file的内容输出到stdout了,而只是间的的输出filesize。

function download(host, file)
    local c = assert(socket.connect(host, 80))
    local count = 0  --  counts number of bytes read
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
    while true do
        local s, status, partial = receive(c)
        count = count + #(s or partial)
        if status == "closed" then
            break
        end 
    end 
    c:close()
    print(file, count)
end
function receive (connection)
    return connection:receive(2^10)
end

但是,如果要同时下载多文件的话,这个函数必须非阻塞地接收数据。在没有数据接收的时候,就调用yield挂起,交出控制权。实现应该如下:

function receive(connection)   
    connection:settimeout(0)  -- do not block          
    local s, status, partial = connection:receive(2^10)
    if status == "timeout" then
        coroutine.yield(connection)
    end                        
    return s or partial, status
end

settimeout(0)将这个连接设为非阻塞模式。当status变为“timeout”时,意味着该操作还没完成就返回了,这种情况下,该thread就yield。传递给yield的non-false参数,告诉dispatcher该线程仍然在运行。注意,即使timeout了,该连接还是会返回它已经收到的东西,存在partial变量中。

下面的代码展示了一个简单的dispatcher。表threads保存了一系列的运行中的thread。函数get 确保每个下载任务都单独一个thread。dispatcher本身是一个循环,不断的遍历所有的thread,一个一个的去resume。如果一个下载任务已经完成,一定要将该thread从表thread中删除。当没有thread在运行的时候,循环就停止了。

最后,程序创建它需要的threads,并调用dispatcher。例如,从w3c网站下载四个文档,程序如下所示:

local socket = require("socket")

function receive(connection)
    connection:settimeout(0)  -- do not block
    local s, status, partial = connection:receive(2^10)
    if status == "timeout" then
        coroutine.yield(connection)
    end
    return s or partial, status
end

function download(host, file)
    local c = assert(socket.connect(host, 80))
    local count = 0  --  counts number of bytes read
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
    while true do
        local s, status, partial = receive(c)
        count = count + #(s or partial)
        if status == "closed" then
            break
        end
    end
    c:close()
    print(file, count)
end

threads = {}  -- list of all live threads

function get(host, file)
    -- create coroutine
    local co = coroutine.create(function ()
        download(host, file)
    end)
    -- intert it in the list
    table.insert(threads, co)
end

function dispatch()
    local i = 1
    while true do
        if threads[i] == nil then  -- no more threads?
            if threads[1] == nil then -- list is empty?
                break
            end
            i = 1  -- restart the loop
        end
        local status, res = coroutine.resume(threads[i])
        if not res then   -- thread finished its task?
            table.remove(threads, i)
        else
            i = i + 1
        end
    end
end

host = "www.w3.org"
get(host, "/TR/html401/html40.txt")
get(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
get(host, "/TR/REC-html32.html")
get(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")
dispatch() -- main loop
/TR/html401/html40.txt       629
/TR/REC-html32.html     606
/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt    229699
/TR/2002/REC-xhtml1-20020801/xhtml1.pdf 115777

程序运行了10s左右,4个文件已经下载完成

又重新用阻塞式的顺序下载重试了一下,需要时间本地测试时变长,不知道是不是网路问题,阻塞的多文件下载代码如下,其实就是上面几段代码放在一块了

local socket = require("socket")

function receive (connection)
    return connection:receive(2^10)
end

function download(host, file)
    local c = assert(socket.connect(host, 80))
    local count = 0  --  counts number of bytes read
    c:send("GET " .. file .. " HTTP/1.0\r\n\r\n")
    while true do
        local s, status, partial = receive(c)
        count = count + #(s or partial)
        if status == "closed" then
            break
        end 
    end 
    c:close()
    print(file, count)
end



host = "www.w3.org"

download(host, "/TR/html401/html40.txt")
download(host, "/TR/2002/REC-xhtml1-20020801/xhtml1.pdf")
download(host, "/TR/REC-html32.html")
download(host, "/TR/2000/REC-DOM-Level-2-Core-20001113/DOM2-Core.txt")

除了速度以外,其他有没有优化空间呢,答案是,有。当没有thread有数据接收时,dispatcher遍历了每一个thread去看它有没有数据过来,结果这个过程比阻塞式的版本多耗费了30倍的cpu。

为了避免这个情况,我们使用LuaSocket提供的select函数。它运行程序在等待一组sockets状态改变时阻塞。代码改动比较少,在循环中,收集timeout的连接到表connections 中,当所有的连接都timeout了,dispatcher调用select 来等待这些连接改变状态。该版本的程序,在博主开发环境测试,只需7s不到,就下载完成4个文件,除此之外,对cpu的消耗也小了很多,只比阻塞版本多一点点而已。新的dispatch代码如下:

function dispatch()
    local i = 1 
    local connections = {}
    while true do
        if threads[i] == nil then  -- no more threads?
            if threads[1] == nil then -- list is empty?
                break
            end 
            i = 1  -- restart the loop
            connections = {}
        end       
        local status, res = coroutine.resume(threads[i])
        if not res then   -- thread finished its task?
            table.remove(threads, i)
        else   
            i = i + 1 
            connections[#connections + 1] = res 
            if #connections == #threads then   -- all threads blocked?
                socket.select(connections)
            end                                                                                                                    
        end       
    end           
end

协程讲述完毕,上面的例子很好的阐述了其实现,供各位参考。