处理用千牛导出淘宝数据,供Logstash到Elasticsearch使用。,NodeJS

var rf=require("fs");  

// 加载编码转换模块  
//npm install iconv-lite
var iconv = require('iconv-lite');  

var fileName = "2017-03-01~2017-05-31";

//读取二进制
var data=rf.readFileSync(fileName+".txt","binary");  

//转化GBK格式
var buf = new Buffer(data, 'binary');  
var str = iconv.decode(buf, 'GBK');  

var newData = handleMS(str);
var oDate = new Date();
writeFile(fileName+newGuid()+".json", newData);

console.log("The END");  

//解析数据
function handleMS(data){

    var newData = "";

    var arr = str.split('\r\n');

    //获取客服名称
    var callcenter = arr[0];

    var customer = "";
    for (var i = 7 ; i <arr.length; i++) {

        var item = arr[i];

        if (item == "") {
            continue;
        };

        
        var delimiter = '----------------------------';
        if (item.indexOf(delimiter)  != -1) {
            customer = item.split(delimiter)[1];
            continue;
        };

        var cc = item.split('(')[0]; 
        var date = "";item.split('(')[1];

        var message =  "";
        var preMessage  =item.split('):  ');
        if (preMessage.length == 2) {
            message = preMessage[1];
            var date = item.split(')')[0].split('(')[1];
        };  
        newData += JSON.stringify({who:cc,date:new Date(date),m:message, isCC:cc == callcenter ? 1 : 0})+"\r\n";
    }
    return newData;
}

//写文件
function writeFile(file, data){  
    // 把中文转换成字节数组  
    var arr = iconv.encode(data, 'utf-8');  
      
    // appendFile,如果文件不存在,会自动创建新文件  
    // 如果用writeFile,那么会删除旧文件,直接写新文件  
    rf.writeFile(file, arr, function(err){  
        if(err)  
            console.log("fail " + err);  
        else  
            console.log("写入文件ok");  
    });  
}  
  
function newGuid()
{
    var guid = "";
    for (var i = 1; i <= 32; i++){
      var n = Math.floor(Math.random()*16.0).toString(16);
      guid +=   n;
      if((i==8)||(i==12)||(i==16)||(i==20))
        guid += "-";
    }
    return guid;    
}

Logstash.conf

input {  
      file {
          path => "D:/logstash-5.2.2/testdata/*.json"
          start_position => "beginning"
          sincedb_path => "D:/logstash-5.2.2/bin/sincedb"
          codec => json {
            charset => "UTF-8"
        }       
      }
}
filter {  
   json{
   source => "message"
   }

   
    mutate
    {
        remove_field => [ "message","path","@version","@timestamp","host","_id","value"]
    }
}
output {  
    elasticsearch {
        action => "index"
        hosts => ["http://172.31.2.9:9200/"] 
        user => "admin"
        password => "123456"
     
        index => "testtbmsdb3"
        document_type => "ms"
        workers => 1
    }
     #stdout {
         #codec => rubydebug
         #codec => json_lines
     #}
}