如何使用Nodejs进行批量下载?

0x1 Nodejs登场

Nodejs是一款基于谷人希的V8引擎开发javascript运行环境。在高性能的V8引擎以及事件驱动的单线程异步非阻塞运行模型的支持下,Nodejs实现的web服务可以在没有Nginx的http服务器做反向代理的情况下实现很高的业务并发量(当然了配合Nginx食用风味更佳)。

0x2 准备工作

现在我们假设你的爬虫已经帮你爬到了一堆图片的链接,然后你的nodejs脚本以某种方式(接收post http请求,进程间通信,读写文件或数据库等等。。。)获得了这些链接,这里我用某款大型角色扮演网络游戏的官网上提供的壁纸链接为例子(这里似乎并没有为一款运营10年经久不衰的游戏打广告的意思,仅仅只是情怀溢出。。。)

(function() {
  "use strict";
  const urlList = [
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fall-of-the-lich-king/fall-of-the-lich-king-1920x1080.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/black-temple/black-temple-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/zandalari/zandalari-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/rage-of-the-firelands/rage-of-the-firelands-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fury-of-hellfire/fury-of-hellfire-3840x2160.jpg",
  ];
})();

我们可以对urlList执行一个遍历来依次下载这些图片,确切的说是依次启动下载这些链接的任务。

(function() {
  //略...

  var startDownloadTask = function(imgSrc, dirName, index) {
    //TODO: startDownloadTask
  }

  urlList.forEach(function(item, index, array) {
    startDownloadTask(item, './', index);
  })
})();

startDownloadTask这个函数就是用来下载这些图片的。其中imgSrc是图片的链接,dirName是我们存放下载后的图片的路径,index是图片链接在列表中的序号。我们在这个函数中,会调用Nodejs的系统Apihttp.request来完成下载工作,由于该Api和大多数Nodejs的Api一样是异步非阻塞模式,所以startDownloadTask函数调用该Api后不会等待下载完成,就会立即返回。在下载的过程中,以及完成之后,或者发生异常时,系统会调用http.request的回掉函数来做相应的处理。我们接下来会看到该Api的详细声明和用法,在了解了该Api的使用方法之后,就可以用它来实现startDownloadTask函数。

0x3 http.request的声明和使用方法

我们在Nodejs的官方文档上可以找到http.request的完整声明和各个参数的说明。它的声明如下:

http.request(options[, callback])

其中options可以是带有请求的目的地址的一条字符串,亦可以是一系用于发起请求的列详细参数,用于对请求进行更精确的控制。我们现在暂时不需要这些精确的参数控制,直接传入图片的链接就可以。

至于callback参数就是刚才说到的回调函数,这是个非常重要的函数,图片下载下来后能否存入我们指定的位置可全靠它。这个回调函数会接受一个入参,文档中对这个入参没有详细说明,通过后面的例子我们发现,这个叫res的入参监听了两个事件,分别是dataend事件,并且还有一个setEncoding方法,并且还有statusCodeheaders两个成员属性。熟悉Nodejs Api的同学不难猜出,这个res其实是一个stream.Readable类型的子类的变量,那两个事件监听和setEncoding方法就是继承自这个类型,而那两个成员属性是子类扩展的。这并没有什么意外的,在其他语言的类库中,http请求Api返回一个可读数据流是很常见的做法。仔细阅读文档的其他部分后可以发现,这个res的真实类型是http.IncomingMessage。这里不得不对这种不写明每个参数的类型的文档提出批评,像javascript这种动态弱类型脚本语言,开发者要想知道一个Api各个参数和返回值有可能是什么类型,拿过来怎么处理可全靠文档啊。

介绍完了入参,再来看看http.request会返回什么。文档中说它会返回一个http.ClientRequest类型的变量,这个变量可以接受error事件,来对请求异常的情况进行处理。

刚才说过,这个Api是一个异步接口,调用这个Api之后会立即返回一个http.ClientRequest类型变量,假设变量名为req。但这时候不会马上发起请求。我们这时候可以设置reqerror事件的监听回调,如果是POST请求的话,还可以调用req.write方法来设置请求消息体,然后调用req.end方法来结束此次请求的发送过程。当收到响应时(严格的说是确认接收完响应头时),就会调用callback回调函数,在这个回调函数中,可以通过读取res.statusCoderes.headers获取响应的返回状态码和头部信息,其中头部信息包含了重要的字段content-length,表示响应消息体的总长度。由于响应消息体可能很长,服务端需要把消息体拆分成多个tcp封包来发送,客户端在接收到tcp封包后还要进行消息体的重组,所以这里采用一个数据流对象来对返回的消息体做读取操作,需要注册dataend事件监听,分别处理链路层缓冲区接收了若干字节的消息体封包并且拼接完成回调上层协议处理和tcp连接拆线时的事务。

Api声明后面附带了一个例子,比较简单不难看懂,这里就不详细说了。

0x4 实现startDownloadTask

了解了http.request的基本使用方法,以及看过例子之后,我们很快就能写出一个简单的下载过程了:

(function() {
  "use strict";
  const http = require("http");

  //略...

  function getHttpReqCallback(imgSrc, dirName, index) {
    var callback = function(res) {
      // TODO: callback回调函数实现
    };

    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    var req = http.request(imgSrc, getHttpReqCallback(imgSrc, dirName, index));
    req.on('error', function(e){});
    req.end();
  }

  //略
})();

我暂且先忽略了请求的错误处理。这里需要讲解的是函数getHttpReqCallback,这个函数本身不是回调函数,在调用http.request时会先调用它,它返回了一个闭包callback,作为http.request的回调函数。我很快会解释为什么需要这样写。

接下来我们来实现这个回调函数:

(function() {
  "use strict";
  const http = require("http");
  const fs = require("fs");
  const path = require("path");
  //略...

  function getHttpReqCallback(imgSrc, dirName, index) {
    var fileName = index + "-" + path.basename(imgSrc);
    var callback = function(res) {      
      var fileBuff = [];
      res.on('data', function (chunk) {
        var buffer = new Buffer(chunk);
        fileBuff.push(buffer);
      });
      res.on('end', function() {
        var totalBuff = Buffer.concat(fileBuff);      
        fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
      });        
    };

    return callback;
  }

  //略
})();

这里的callback函数的逻辑目前为止还不是很复杂,resdata事件的回调函数中,chunk参数是从可读数据流中读出的数据,将其转换为Buffer对象后插入fillBuff数组以待后用。

resend事件意味着链路层链接拆除,数据接收完毕,在该事件的回调中,我们通过Buffer.concat函数,将fileBuff中的所有Buffer对象依次重组为一个新的Buffer对象totalBuff,该对象既是接收到的完整的数据。之后通过fs.appendFile函数将totalBuff存入磁盘,存放路径为dirName + "/" + fileName

于是我们就有了一个完整的勉强可以工作的脚本,完整的脚本代码如下:

(function() {
  "use strict";
  const fs = require("fs");
  const http = require("http");
  const path = require("path");

  const urlList = [
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fall-of-the-lich-king/fall-of-the-lich-king-1920x1080.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/black-temple/black-temple-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/zandalari/zandalari-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/rage-of-the-firelands/rage-of-the-firelands-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fury-of-hellfire/fury-of-hellfire-3840x2160.jpg",
  ];

  function getHttpReqCallback(imgSrc, dirName, index) {
    var fileName = index + "-" + path.basename(imgSrc);
    var callback = function(res) {      
      var fileBuff = [];
      res.on('data', function (chunk) {
        var buffer = new Buffer(chunk);
        fileBuff.push(buffer);
      });
      res.on('end', function() {
        var totalBuff = Buffer.concat(fileBuff);
        fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
      });
    };
    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    var req = http.request(imgSrc, getHttpReqCallback(imgSrc, dirName, index));
    req.on('error', function(e){});
    req.end();
  }

  urlList.forEach(function(item, index, array) {
    startDownloadTask(item, './', index);
  })
})();

之所以说它勉强可工作,是因为它完全没有做错误处理,程序的健壮性几乎为0,甚至连打印日志都没有了,下载过程中一旦出现任何意外情况,那就自求多福吧。

但即使这样一个漏洞百出的代码,也还是有几点需要特殊说明。

为什么要采用闭包?

因为实际上作为http.request的回调函数callback,它的声明原型决定的它只可以接受唯一一个参数res,但是在callback函数中我们需要明确知道下载下来的数据在硬盘上存放的路径,这个路径取决于startDownloadTask的入参dirNameindex。所以函数getHttpReqCallback就是用于创建一个闭包,将dirNameindex的值写入这个闭包中。

其实我们原本并不需要getHttpReqCallback这个函数来显示的返回一个闭包,而是可以直接使用内联匿名函数的方法实现http.requestcallback,代码大概会写成这样:

var startDownloadTask = function(imgSrc, dirName, index) {
  var req = http.request(imgSrc, function(res) {
    var fileName = index + "-" + path.basename(imgSrc);
    var fileBuff = [];
    res.on('data', function (chunk) {
      var buffer = new Buffer(chunk);
      fileBuff.push(buffer);
    });
    res.on('end', function() {
      var totalBuff = Buffer.concat(fileBuff);
      fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
    });
  });
  req.on('error', function(e){});
  req.end();
}

这样也可以工作,http.requestcallback直接访问外层作用域的变量,即函数startDownloadTask的入参dirNameindex,这也是一个闭包。这样写的问题在于,一段异步代码强行插入原本连贯的同步代码中,也许现在你觉得这也没什么,这是因为目前callback里还没有处理任何的异常情况,所以逻辑比较简单,这样看起来也不算很混乱,但是我需要说的是,一旦后面加入了异常处理的代码,这一块看起来就会非常糟糕了。

为什么在data事件中要使用一个列表缓存接收到的所有数据,然后在end中一次性写入硬盘?

首先要说的是,这里并不是出于通过减少写磁盘次数达到提高性能或者延长磁盘寿命的目的,虽然可能确实有这样的效果。根本原因在于,如果不采用一次性写入,在nodejs的异步非阻塞运行机制下,这样存入磁盘的数据会混乱,导致不堪入目的后果,比较直观的情况见附录。

在同步阻塞运行模型的语言中(java, c, python),确实存在将远程连接传输过来的数据先缓存在内存里,待接收完整或或缓存了一定长度的数据之后再一次性写入硬盘的做法,以达到减少写磁盘操作次数的目的。但是如果在每一次从远程连接接中读取到数据之后立即将数据写入硬盘,也不会有什么问题(tcp协议已经帮我们将数据包排好序),这是因为在同步阻塞运行模型中,读tcp连接和写磁盘这两个动作必然不可能同时执行,而是读tcp -> 写磁盘 -> 读tcp -> 写磁盘...这样的串行执行,在上一个操作完成之后,下一个操作才会开始。这样的执行方式也许效率会比较低,但是写入的磁盘的数据并不会混乱。

现在回到我们的异步非阻塞世界中来,在这个世界中,远程读取的操作是通过事件回调的方式发生的,resdata事件任何一个时间片内都可能触发,你无法预知,无法控制,甚至触发频率都和你无关,那取决于本次连接的带宽。而我们的写磁盘操作fs.appendFile和Nodejs的大部分Api一样是一个异步非阻塞的调用,它会非常快的返回,但是它所执行的写文件操作,则会慢的多,而进程不会阻塞在那里等待这个操作完成。在常识里,远程连接的下载速度比本地硬盘的写入速度要慢,但这并不是绝对的,随着网速的提高,现在一块高速网卡,在高速的网络中带来的下载速度超过一块老旧的机械硬盘的写入速度并非不可能发生。除此之外,即使在较长的一段时间内,网络的平均连接速度并没有快的那么夸张,但是我们知道在tcp/ip协议栈中,链路层下层的网络层中前后两个ip报文的到达时间间隔也是完全无法确定的,有可能它们会在很短的时间间隔内到达,被tcp协议重组之后上抛给应用层协议,在我们的运行环境中以很短的间隔两次触发data事件,而这个间隔并不足够磁盘将前一段数据写入。

我画个草图来解释到底发生什么事情:

|data1
|       |data2
|-----------------------------|  //<- write data1
|       |-----------------------------|  //<- write data2
|       |
|----------------------------------------------------------> time

此时要想写入的数据保持有序不混乱,只能寄希望于机械硬盘的一面只有一个磁头来从物理层面保证原子操作了。但是很可惜我们知道现代机械硬盘每一面至少都有两个磁头。

有着很多java或者c++编程经验的你也许会想在这里加一个同步锁,不过Nodejs作为一个表面宣称的单线程环境(底层的V8引擎肯定还是有多线程甚至多进程调度机制实现的),在语法和Api层面并没有锁这个概念。

所以为了保证最终写入磁盘的数据不混乱,在data事件的回调中不可以再用异步的方式处理数据了,于是有了现在这种先写入缓存列表中,在数据接收完整后再一次性写文件的做法。由于new Buffer(chunk)fileBuff.push(buffer)都是同步操作,并且执行的速度非常快;即使下一个data事件到来的比这两个操作还要快,由于单线程运行模型的限制,也必须等待这两个操作完成后才会开始第二次回调。所以能保证数据有序的缓存到内存中,再有序的写入硬盘。

0x5 异常处理

刚才说到,目前为止我们的脚本虽然能够正常工作,但是没有异常处理,程序非常脆弱。由于异常处理是一个程序非常重要的部分,所以在这里我有义务要完成这部分代码。

首先我们从最简单的做起,打印一些日志来帮助调试程序。

(function() {
  //略。。
  function getHttpReqCallback(imgSrc, dirName, index) {
    var callback = function(res) {
      console.log("request: " + imgSrc + " return status: " + res.statusCode);
      //略。。
      res.on('end', function() {
        console.log("end downloading " + imgSrc);
        //略。。
      });
    };
    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    console.log("start downloading " + imgSrc);
    //略。。。
  }
})

接下来我们在reqerror事件中,进行重新下载尝试的操作:

  var startDownloadTask = function(imgSrc, dirName, index) {
    //略。。
    req.on('error', function(e){
      console.log("request " + imgSrc + " error, try again");
      startDownloadTask(imgSrc, dirName, index);
    });
  }

这样一旦在请求阶段出现异常,会自动重新发起请求。你也可以在这里自行添加重试次数上限。

下面的代码给请求设置了一个一分钟的超时时间:

  var startDownloadTask = function(imgSrc, dirName, index) {
    //略。。
    req.setTimeout(60 * 1000, function() {
      console.log("reqeust " + imgSrc " timeout, abort this reqeust");
      req.abort();
    })
  }

一旦在一分钟之内下载还没有完成,则会强制终止此次请求,这会立即触发resend事件。

req的异常处理大致就是这些,接下来是对res的异常处理。

我们首先需要获取包体的总长度,该值在响应头的content-length字段中:

function getHttpReqCallback(imgSrc, dirName, index) {
  var callback = function(res) {
    var contentLength = parseInt(res.headers['content-length']);
    //略。。
  }
}

end事件的回调中,用接收到的数据总长度和响应头中的包体长度进行比较,验证响应信息是否接收完全:

res.on('end', function() {
  console.log("end downloading " + imgSrc);
  if (isNaN(contentLength)) {
    console.log(imgSrc + " content length error");
    return;
  }
  var totalBuff = Buffer.concat(fileBuff);
  console.log("totalBuff.length = " + totalBuff.length + " " + "contentLength = " + contentLength);
  if (totalBuff.length < contentLength) {
    console.log(imgSrc + " download error, try again");
    startDownloadTask(imgSrc, dirName, index);
    return;
  }
  fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
}

0x6 结束

本文上面全部示例代码如下:

(function() {
  "use strict";
  const http = require("http");
  const fs = require("fs");
  const path = require("path");

  const urlList = [
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fall-of-the-lich-king/fall-of-the-lich-king-1920x1080.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/black-temple/black-temple-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/zandalari/zandalari-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/rage-of-the-firelands/rage-of-the-firelands-1920x1200.jpg",
    "http://content.battlenet.com.cn/wow/media/wallpapers/patch/fury-of-hellfire/fury-of-hellfire-3840x2160.jpg",
  ];

  function getHttpReqCallback(imgSrc, dirName, index) {
    var fileName = index + "-" + path.basename(imgSrc);
    var callback = function(res) {
      console.log("request: " + imgSrc + " return status: " + res.statusCode);
      var contentLength = parseInt(res.headers['content-length']);
      var fileBuff = [];
      res.on('data', function (chunk) {
        var buffer = new Buffer(chunk);
        fileBuff.push(buffer);
      });
      res.on('end', function() {
        console.log("end downloading " + imgSrc);
        if (isNaN(contentLength)) {
          console.log(imgSrc + " content length error");
          return;
        }
        var totalBuff = Buffer.concat(fileBuff);
        console.log("totalBuff.length = " + totalBuff.length + " " + "contentLength = " + contentLength);
        if (totalBuff.length < contentLength) {
          console.log(imgSrc + " download error, try again");
          startDownloadTask(imgSrc, dirName, index);
          return;
        }
        fs.appendFile(dirName + "/" + fileName, totalBuff, function(err){});
      });
    };

    return callback;
  }

  var startDownloadTask = function(imgSrc, dirName, index) {
    console.log("start downloading " + imgSrc);
    var req = http.request(imgSrc, getHttpReqCallback(imgSrc, dirName, index));
    req.on('error', function(e){
      console.log("request " + imgSrc + " error, try again");
      startDownloadTask(imgSrc, dirName, index);
    });
    req.end();
  }

  urlList.forEach(function(item, index, array) {
    startDownloadTask(item, './', index);
  })
})();

本人在Nodejs方面也是完全的新手,没有太深入的研究Nodejs内部的运行机制,只是网上读过几篇文章,用Nodejs写过一些简短的脚本,在这个过程中掉过一些坑,本文就是一次印象深刻的爬坑过程的整理和总结。总的来说,Nodejs是一个非常强大且有趣的工具,但是由于其独特的运行模型,以及javascript自身也有不少的历史遗留问题需要解决,所以对于长期以来习惯了java, c/c++, python一类思维方式的猿们刚刚接触它的时候产生不少疑惑,希望本文能帮助大家理解Nodejs中的一些不同于其他语言的和运行环境的地方。