Nodejs中流的操作,实现简单的pipe

fs与流都可以处理文件,为什么还要用流:

  fs模块处理文件的缺点:将文件的数据全读到内存中,在把数据写到文件内,会大量占用内存

流:

流(stream)是 Node.js 中处理流式数据的抽象接口,是一组有序的,有起点和终点的字节数据传输手段。可以实现将数据从一个地方流动到另一个地方,其边读取边写入的特点有别于fs模块的文件处理,并且可以做到控制读取文件和写入文件的速度,从而减少内存的占用

nodeJS中提供了许多种流的对象,像用http模块创建的服务器的回调内,req就是一个可读流,res就是可写流

流的特点:

  1、边读 边写,是边读边写的,读取一段文件,就将它写入

  2、流是基于事件的,所有的流对象都用 on绑定事件,并触发

Node.js 中有四种基本的流类型:

可读流:

let fs = require('fs')
//参数1:要读取的文件
//参数2:配置项,有highWaterMark  每次能读取多少,默认是64k,一次读取64k 不需要更改
let rs = fs.createReadStream('1.txt', {     // 返回了一个可读流的实例
    flags: 'r'        //对文件进行何种操作
        encoding: 'utf-8'        //设置之后,读取的是字符串,不然默认为buffer
        start:3                    //从索引3开始读
        end:7                    // 读到索引为7的  包括结束
        highWaterMark: 1
})       

// 默认是不会把读取的文件给你 需要监听事件,数据到来的事件  rs.emit('data',数据);
// 所以把那个绑定这个事件
fs.on('data',function(chunk){
    console,log(chunk)
})
// 默认这个data事件不停的触发,直到文件中的数据全部读完
rs.on('end', function () {
    
})

如果每次读取多少需要自己设置highWaterMark ,读取文件的时候为了不乱码,需要用到buffer拼接转化

let rs = fs,createReadStream('1.txt', {highWaterMark: 1})
let arr = []
rs.on('data',function(chunk){
    arr.push(chunk)            //读取文件时,是buffer类型,将每次读取的buffer拼到一个数组内
    
})
// 当文件全部读完,触发end
res.end('end',function(){
    let filesData = Buffer.concat(arr).toString()
})

//报错是  触发err        文件不存在,会触发这个事件
rs.on('err', function(err){

})

如果想要控制读取的速度,可以用rs.pause() 暂停data事件的触发, rs.resume() 恢复data事件的触发

let arr = []
rs.on('data',function(chunk){
    arr.push(chunk)
    rs.pause()    //暂停
    setTimeout(()=>{
        rs.resume       //一秒恢复一次  data事件的触发,直到数据读完
    },1000)
})

rs.on('end', ()=>{
    console.log(Buffer.concat(arr).toString())
})

可写流:

1、当往可写流里写数据的时候,不会立刻写入文件,先写入缓存区,缓存区的大小就是highWaterMark,默认为16k

等缓存区满了之后,再次真正的写入文件里

2、通过判断ws.write的返回值判断缓存区是否已经满了。为true是,还没满,有空间,可以继续写,为false时,表示满了

3、按理说如果返回false,就不能再往里写了,但是如果写了,不会丢失,会先缓存在内存中,等缓冲区写完清空之后,在从内存中读取写入。

所以一般在读取文件写入的时候。当缓存区满了,一般会暂停可读流的读取 data事件,等写完之后再次出发可读流的data读取文件,所以不会占用太多的内存

const fs = require('fs')
//第一个参数,写入的文件位置  名称
// 第二参数,配置项
let ws = fs.createWriteStream('./a.txt', {
      flags:'w'
      highWaterMark:2 
})
var flag = ws.write('1', function(){})        //flag为true
//write   写的内容,必须是 字符串或者buffer,
// 会返回一个布尔值,提示是否还有空间写入, 与highWaterMark对应,例如为写入1时,第一次写 返回一个true,表示还有空间写入
//write是异步的 有回调函数,但是不常用
var flag = ws.write('2', function(){})       //flag为false
var flag = ws.write('3', function(){}) 
// 当数据写入文件后,又有空间继续写入时,触发drain事件
ws.on('drain',function(){
    
})
// 当所有文件写完之后,触发end,也可以在end时,在写入最后的数据
ws.end('结束写入')

利用可读流可写流,实现一个pipe,边读边写,控制读取可写入的速度:

读取30b的文件,每次只能读5b,每次只能写入1b

1、可读流第一次读取5b

2、可写流写入读取流读取的数据,拿到5b开始写,因为设置了highWaterMark为1,写了1b之后,ws.write()就返回fasle。

表示没有空间在写入了,剩下的4b放到内存中,开始1b 1b的写入

3、ws.write()返回false后,暂停可读流 的 data事件,等待5b全部写入

4、 5b写完后,触发ws.on('drain',function(){rs.resume()}), 恢复可读流的 data事件,再次读取了5b

5、直到文件读完,写完

const fs = require('fs')

function pipe(readFile,writeFileu){
    let rs = fs.createReadStream(readFile,{
        highWaterMark:5
    })
    let ws = fs.createWriteStream(writeFileu,{
        highWaterMark:1
    })
    rs.on('data',function(chunk){
        console.log('读取')
        // 当ws.write() 返回false时,表示没有空间继续写入了,暂停读取
        if(ws.write(chunk) == false){
            rs.pause() // 暂停rs的data事件
        }
    })
    // 当触发可写流的drain,表示有空间继续写入了,继续读取文件
    ws.on('drain',function(){
        rs.resume() // 恢复rs的data事件        // 把当前读入的内容都写到文件中了,继续调用读写
    })
    // 当读取流触发end方法,表示读取完毕,这时关闭可写流的写入
    rs.on('end',function(){
        ws.end()
    })
}
pipe('1.txt','./2.txt')