在AWS EMR上运行Map Reduce的Java示例程序 及 操作小计

下面的代码中AffairClient类中包含了三个内之类,分别对应于Hadoop Mapreduce程序运行所需的Mapper类,Reducer类,和主类。

AffairClient类中其余方法用于配置和运行EMR程序。

可以修改相关参数来对程序做适当调整。比如:修改map和reduce函数,添加combiner类,或者设置集群大小。

这个样例是一个去重的mapreduce程序,具体见map函数和reduce函数。

我们创建的是一个Maven项目,因为是在AWS EMR上运行hadoop程序,所以需要AWS和hadoop-client的dependency:

<!-- aws -->
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk</artifactId>
    <version>1.10.26</version>
</dependency>
<!-- hadoop-client -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.7.1</version>
</dependency>

另外一个可能会出现的情况是在运行Java程序的环境下(可能是某台远程服务器)的CLASSPATH没有aws java sdk或hadoop-client对应的jar包,这个时候运行程序可能会出现ClassNotFoundException,所以我们需要在pom.xml中的build->plugins中添加如下代码使得依赖的jar包也能在mvn package的时候被打到jar包里:

<plugin>
    <artifactId>maven-assembly-plugin</artifactId>
    <configuration>
        <descriptorRefs>
              <descriptorRef>jar-with-dependencies</descriptorRef>
        </descriptorRefs>
          <archive>
            <manifest>
                  <mainClass></mainClass>
            </manifest>
          </archive>
    </configuration>
      <executions>
        <execution>
              <id>make-assembly</id>
              <phase>package</phase>
              <goals>
                <goal>single</goal>
              </goals>
           </execution>
      </executions>
</plugin>

运行程序前,我们需要把输入文件放到程序中全局变量INPUT_DIR目录下;还需要把jar包放到JAR_DIR目录下,并且jar包名为JAR_NAME。

可以通过如下指令在server上对maven项目打jar包:

$ mvn clean install

我们假设最终将jar包改名为了affair.jar并上传到了s3上对应的位置,之后再affair.jar包同一目录下输入如下指令便可启动AWS EMR的MapReduce程序:

$ java -cp affair.jar AffairClient

在终端显示的结果类似如下:

log4j:WARN No appenders could be found for logger (com.amazonaws.AmazonWebServiceClient).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
j-61EJHJXPSR2L  STARTING        this job flow runs a mapreduce affair.
        PENDING AffairClient$AffairJob Affair

j-61EJHJXPSR2L  STARTING        this job flow runs a mapreduce affair.
        PENDING AffairClient$AffairJob Affair

j-61EJHJXPSR2L  STARTING        this job flow runs a mapreduce affair.
        PENDING AffairClient$AffairJob Affair

j-61EJHJXPSR2L  STARTING        this job flow runs a mapreduce affair.
        PENDING AffairClient$AffairJob Affair

j-61EJHJXPSR2L  STARTING        this job flow runs a mapreduce affair.
        PENDING AffairClient$AffairJob Affair

j-61EJHJXPSR2L  STARTING        this job flow runs a mapreduce affair.
        PENDING AffairClient$AffairJob Affair

j-61EJHJXPSR2L  RUNNING this job flow runs a mapreduce affair.
        RUNNING AffairClient$AffairJob Affair

j-61EJHJXPSR2L  RUNNING this job flow runs a mapreduce affair.
        RUNNING AffairClient$AffairJob Affair

j-61EJHJXPSR2L  RUNNING this job flow runs a mapreduce affair.
        RUNNING AffairClient$AffairJob Affair

j-61EJHJXPSR2L  TERMINATING     this job flow runs a mapreduce affair.
        COMPLETED       AffairClient$AffairJob Affair

j-61EJHJXPSR2L  TERMINATING     this job flow runs a mapreduce affair.
        COMPLETED       AffairClient$AffairJob Affair

j-61EJHJXPSR2L  TERMINATING     this job flow runs a mapreduce affair.
        COMPLETED       AffairClient$AffairJob Affair

j-61EJHJXPSR2L  TERMINATING     this job flow runs a mapreduce affair.
        COMPLETED       AffairClient$AffairJob Affair

j-61EJHJXPSR2L  TERMINATED      this job flow runs a mapreduce affair.
        COMPLETED       AffairClient$AffairJob Affair

如果出现错误了(比如说有的时候输出目录已存在就会出现错误),可以在AWS Web管理控制台中的EMR中查看对应的syslog。

代码:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.record.compiler.generated.ParseException;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.ec2.model.InstanceType;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.ActionOnFailure;
import com.amazonaws.services.elasticmapreduce.model.Cluster;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterResult;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.ListStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.ListStepsResult;
import com.amazonaws.services.elasticmapreduce.model.PlacementType;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.elasticmapreduce.model.StepSummary;
import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest;

public class AffairClient {
    
    private static AmazonElasticMapReduceClient emr;
    
    private static final long SLEEP_TIME = 1000 * 30;
    private static final String JAR_DIR = "s3://bucketname/affair/";
    private static final String JAR_NAME = "affair.jar";
    private static final String INPUT_DIR = "s3://bucketname/affair/input/";
    private static final String OUTPUT_DIR = "s3://bucketname/affair/output/";
    private static final String LOG_DIR = "s3://bucketname/affair/log/";
    private static final String JOB_FLOW_NAME = "this job flow runs a mapreduce affair.";
    private static final String AWS_ACCESS_KEY = "YOUR_AWS_ACCESS_KEY";
    private static final String AWS_SECRET_KEY = "YOUR_AWS_SECRET_LEY";
    
    public static class AffairMapper 
      extends Mapper<LongWritable, Text, Text, Text> {
        
        public void map(LongWritable key, Text value, Context context) 
                throws IOException, InterruptedException {
            context.write(new Text(value), new Text(""));
        }
    }
    
    public static class AffairReducer
      extends Reducer<Text, Text, Text, Text> {
        
        public void reduce(Text key, Iterable<Text>values, Context context)
                throws IOException, InterruptedException {
            context.write(key, new Text(""));
        }
    }
    
    public static class AffairJob extends Configured implements Tool {

        public int run(String[] arg0) throws Exception {
            Configuration conf = getConf();
            conf.set("mapred.reduce.tasks", "" + 1);
            
            Job job = new Job(conf, "Affair MR job");
            job.setJarByClass(AffairJob.class);
            job.setMapperClass(AffairMapper.class);
            job.setReducerClass(AffairReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            //job.setNumReduceTasks(1);
            
            FileInputFormat.addInputPath(job, new Path(INPUT_DIR));
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_DIR));
            
            job.waitForCompletion(true);
            return 0;
        }
        
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new AffairJob(), args);
            System.exit(exitCode);
        }
        
    }
    
    
    public static void main(String[] args) throws ParseException {
        
        // emr jobflow
        try {
            String mainClass = AffairJob.class.getName();
            String stepName = mainClass + " Affair";
            runStep(mainClass, JAR_NAME, stepName);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    private static void runStep(String mainClass, String jarName, String stepName) 
            throws InterruptedException {
        
        String jarPath = JAR_DIR + JAR_NAME;
        HadoopJarStepConfig hadoopJarStep = new HadoopJarStepConfig(jarPath);
        hadoopJarStep.setMainClass(mainClass);
        hadoopJarStep.setArgs(null);
        
        StepConfig step = new StepConfig().withName(stepName)
                .withActionOnFailure(ActionOnFailure.TERMINATE_JOB_FLOW)
                .withHadoopJarStep(hadoopJarStep);
        
        String logUri = LOG_DIR;
        JobFlowInstancesConfig instances = createInstances();
        
        List<StepConfig> steps = new ArrayList<StepConfig>();
        steps.add(step);
        String jobFlowId = CreateJobFlow(JOB_FLOW_NAME, logUri, instances, steps);
        
        terminateJobFlow(jobFlowId);
    }
    
    private static void terminateJobFlow(String jobFlowId) {
        TerminateJobFlowsRequest request = new TerminateJobFlowsRequest().withJobFlowIds(jobFlowId);
        emr.terminateJobFlows(request);
    }

    private static String CreateJobFlow(String jobFlowName, String logUri,
            JobFlowInstancesConfig instances, List<StepConfig> steps) 
            throws InterruptedException {
        
        AWSCredentials credentials = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY);
        emr = new AmazonElasticMapReduceClient(credentials);
        
        // run job flow
        RunJobFlowRequest request = new RunJobFlowRequest().withName(jobFlowName)
                                            .withLogUri(logUri)
                                            .withSteps(steps)
                                            .withInstances(instances);
        RunJobFlowResult result = emr.runJobFlow(request);
        
        // get job flow details
        String jobFlowId = result.getJobFlowId();
        boolean runing = true;
        while(runing) {
            Thread.sleep(SLEEP_TIME);
            
            List<String> jobFlowIdList = new ArrayList<String>();
            jobFlowIdList.add(jobFlowId);
            
            System.out.println(getJobFlowStatus(jobFlowIdList));
            
            for(String clusterId : jobFlowIdList) {
                DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest().withClusterId(clusterId);
                DescribeClusterResult describeClusterResult = emr.describeCluster(describeClusterRequest);
                Cluster cluster = describeClusterResult.getCluster();
                
                if(cluster.getStatus().getState().contains("FAILED") ||
                        cluster.getStatus().getState().contains("COMPLETED") ||
                        cluster.getStatus().getState().contains("TERMINATED") ||
                        cluster.getStatus().getState().contains("SHUTTING_DOWN") ||
                        cluster.getStatus().getState().contains("WAITING"))
                    runing = false;
                break;
            }
        }
        return jobFlowId;
    }
    
    private static String getJobFlowStatus(List<String> jobFlowIdList) {
        
        String info = new String();
        
        for(String clusterId : jobFlowIdList) {
            DescribeClusterRequest describeClusterRequest = new DescribeClusterRequest().withClusterId(clusterId);
            DescribeClusterResult describeClusterResult = emr.describeCluster(describeClusterRequest);
            Cluster cluster = describeClusterResult.getCluster();
            
            info += cluster.getId() + "\t" + cluster.getStatus().getState() + "\t" + cluster.getName() + "\n";
            ListStepsRequest listStepsRequest = new ListStepsRequest().withClusterId(clusterId);
            ListStepsResult listStepsResult = emr.listSteps(listStepsRequest);
            for(StepSummary step : listStepsResult.getSteps()) {
                info += "\t" + step.getStatus().getState() + "\t" + step.getName() + "\n";
            }
        }
        return info;
    }

    private static JobFlowInstancesConfig createInstances() {
        JobFlowInstancesConfig instances = new JobFlowInstancesConfig()
                                .withHadoopVersion("1.0.3")
                                .withInstanceCount(5)
                                .withKeepJobFlowAliveWhenNoSteps(false)
                                .withMasterInstanceType(InstanceType.M1Large.toString())
                                .withSlaveInstanceType(InstanceType.M1Large.toString())
                                .withPlacement(new PlacementType("us-east-1a"));
        return instances;
    }
    
}