nodejs的C++扩展中实现异步回调

在nodejs的官方网站中有关于C++扩展的详细说明,其中包含了从"hello world"到对象封装的一系列示例。其中的“callback”节是关于回调函数的,美中不足的是,这个回调是阻塞的回调。

官方示例的回调函数用JS代码来模拟的话,大致是这个样子:

function syncCallback(callback) {
  // 业务代码
  // 业务代码
  callback();
}

使用C++扩展的一个最大好处就是处理一些CPU密集的业务,因此这部分代码一定是比较耗时的,否则用C++去实现完全没有意义。业务代码中的阻塞操作,例如传统文件读写、密集计算等都会导致nodejs原始线程的阻塞,导致后来的请求无法得到及时响应,严重影响node的并发性能。

有服务器程序开发的朋友肯定已经想到用多线程的方法解决这个问题。是的,我要分享的就是在C++扩展中用多线程的方法处理回调,从而达到解决复杂的业务同时保证node线程的无阻塞特性。

node C++扩展中,可以使用libuv提供的线程方法,非常方便的进行线程调度。

下面是具体代码,详细解释见注释

#include <v8.h>
#include <node.h>
#include <stdlib.h>
#include <errno.h>
#include <stdio.h>
#include <string.h>

using namespace node;
using namespace v8;

//
// 定义线程入参结构体
//
// a: 整型入参1
// b: 整型入参2
// result: 在工作线程里面计算好的a+b
// name: 线程名称,由JS调用代码指定
// callback: 回调函数
struct reqData
{
    int result;
    int a;
    int b;
        char name[128];
    Persistent<Function> callback;
};

//
// 定义工作线程处理函数
//
// 入参为libuv指定的结构体格式
// 没有返回值
// 具体的业务处理函数
void workerFunc(uv_work_t* req)
{
        // 从uv_work_t的结构体中获取我们定义的入参结构
    reqData* request = (reqData*)req->data;

        // 计算结果
    request->result = request->a + request->b;

        // 模拟耗时业务
        for(int i = 0; i < 50; ++i) {
                // 模拟密集运算导致的阻塞
                Sleep(1000);
                // 打印每次循环的内容
                printf("[%s %04d] I am working.\n", request->name, i);
        }
}


// 
// 定义线程的回调函数
// req: 处理后的数据结构体
// status: 线程状态
void afterWorkFunc(uv_work_t* req, int status)
{
    HandleScope scope;

        // 获取我们定义的数据结构
    reqData* request = (reqData*)req->data;
        // 释放请求结构体
    delete req;         

        // 构造JS回调函数的arguments
    Handle<Value> argv[2];

    argv[0] = Undefined();                                              // err
    argv[1] = Integer::New(request->result); // data

        
        // 下面代码相当于JS中的
        // try {
        //   callback.apply(global, arguments);
        // } catch (err) {
        //   throw err;
        // }
    TryCatch try_catch;
    request->callback->Call(Context::GetCurrent()->Global(), 2, argv);
    if (try_catch.HasCaught()) 
    {
        FatalException(try_catch);
    }

        // 回收资源
    request->callback.Dispose();
    delete request;
}


// 定义模块的导出函数
static Handle<Value> test(const Arguments& args)
{
    HandleScope scope;

        // 判断入参是否满足条件
    if ( args.Length() < 3 || !args[0]->IsNumber() || !args[1]->IsNumber() )
    {
        return ThrowException(Exception::TypeError(String::New("Bad argument")));
    }

        // 获取参数,并转化格式
    ssize_t int1 ( args[0]->Int32Value() );
    ssize_t int2 ( args[1]->Int32Value() );
        char nameBuffer[128] = {0};
        args[2]->ToString()->WriteAscii(nameBuffer);

        // 检查回调参数是否为函数
    if ( args[3]->IsFunction() )
    {
                // 构造数据结构
        Local<Function> callback = Local<Function>::Cast(args[3]);

        reqData* request = new reqData;
        request->callback = Persistent<Function>::New(callback);

        request->a = int1;
        request->b = int2;
                strcpy(request->name, nameBuffer);

        uv_work_t* req = new uv_work_t();
        req->data = request;

                // 调用libuv的线程处理函数
        uv_queue_work(uv_default_loop(), req, workerFunc, afterWorkFunc);
    }
    else
    {
        return ThrowException(Exception::TypeError(String::New("Callback missing")));
    }

    return Undefined();
}

extern "C"
{
        // 相当于JS中的
        //
        // exports.test = function Test(){};
        //
    static void init(Handle<Object> target)
    {
        target->Set(String::NewSymbol("test"), FunctionTemplate::New(test)->GetFunction());
    }
}

NODE_MODULE(asyncAddon, init);