用户定义的java计数器

mapreduce 计数器用来做某个信息的统计。

计数器是全局的。mapreduce 框架将跨所有map和reduce聚集这些计数器,并且作业结束时产生一个最终的结果。

语法像 java 的 enum 类型。

需求: 统计某个目录下,各个文件一共出现的行数,和出现单词的总数。

思路: 定义一个计数器。

package com.mapreduce.count;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class CountDerived {
    
    // 定义一个全局的计数器,每个map,reduce都可以访问到
    enum COUNT{
        LINES_COUNT,
        WORDS_COUNT
    }
    
    public static void main(String[] args) throws Exception {
        

        // 1 获取configuration
        Configuration configuration = new Configuration();
        
        // 2 job
        
        Job job = Job.getInstance(configuration);
    
        // 3 作业jar包
        
        job.setJarByClass(CountDerived.class);
        
        // 4 map, reduce jar 包
        job.setMapperClass(CounterMap.class);
        
        // 5 map 输出类型
        
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        
        // 6 最终 输出类型  (reducer)
        
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        
        // 7 inputformatclass , outputformatclass  输入输出入文件类型  可能决定分片信息  
        
        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);
    
        // 8  输入输出文件路径 
        
        FileInputFormat.setInputPaths(job, new Path("d:/input"));
        FileOutputFormat.setOutputPath(job, new Path("d:/output1"));

        
        // 9 job提交 
        
        job.waitForCompletion(true);
        
        
    }

}
package com.mapreduce.count;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapreduce.Mapper;
/*
 *  job 那边定义 全局计数器  count { lineCount, wordsCount }
 */

import com.mapreduce.count.CountDerived.COUNT;


public class CounterMap extends Mapper<LongWritable, Text, Text, IntWritable>{
    
    Text k = new Text();
    IntWritable v = new IntWritable();
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        
        // 动态获取计数器
        Counter line_counter =  (Counter) context.getCounter(COUNT.LINES_COUNT);
        //将计数器 + 1
        line_counter.increment(1);
        
        String line = value.toString();
        String[] words = line.split(" ");
        v.set(1);
        for(String w:words){
            // 同理
            context.getCounter(COUNT.WORDS_COUNT).increment(1);
            k.set(w);
            context.write(k, v);
        }
    }

}