hadoop怎么处理海量小图片

2019年12月14日 阅读数:51
这篇文章主要向大家介绍hadoop怎么处理海量小图片,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。

1.方法原理:java

系统借鉴Hbase存储的基本原理,提出以“状态标记位”的方法为当前并不能完美支持追加处理的HDFS的Mapfile文件提供了一种有效的解决方法,既解决了HDFS小文件存储的问题,又解决了Mapfile即时修改的问题。node

2.方法介绍:apache

在海量图片背景中,图片的存储形式探讨就成为了保证系统性能的重要部分。HDFS存在广泛的小文件存储的通病,对小文件的读取一般会形成大量从datanode到datanode的seeks和hopping来retrieve文件,而这样是很是的低效的一种访问方式。所以对于大小远小于HDFS的块大小的文件,须要进行处理后再存入HDFS中。几乎全部的图片都远远小于64M(HDFS默认数据块大小),处理这些大量的小图片就须要某种形式的容器来经过某种方式来打包这些file。Hadoop提供了一些选择。主要能够选择的有HARfile、Sequencefile、Mapfile。本系统采用了Mapfile做为小文件的容器存储。同时,若对于全部小于64M的图片均进行打包,则会加大打包文件的过程的资源损耗,所以须要定一个阈值,当文件大小超过该阈值后进行打包操做,不然直接经过namenode进行上传。本系统所定的阈值为2MB。此外,因为Hadoop在最新的版本才支持文件的追加append操做,但对于Mapfile尚未完善的支持。这意味着若用原始处理方法,每一次上传操做将会重写原Mapfile,效率低下。本系统采用了“标记法”对Mapfile打包小文件时的增删改查进行处理,保证了图片存储访问的效率。缓存

3.具体实现:网络

图片基本操做包括图片的增长、删除、修改和查询。因为图片存储在HDFS的特殊环境,所以图片的增删改查操做须要进行特殊的处理。因为mapfile不支持追加写入操做,这样每次进行操做须要对原mapfile文件进行覆盖写入,效率低下。为了实现相应功能,本系统对Hbase中存储的图片元数据增长了一个状态标志位,该状态位可能的取值为“HdfsLargeFile”,“HdfsMapflie”,“LocalSmallFile”以及“Deleted”四种。每次上传操做会进行会进行文件大小判断,并对其进行相应处理,更新标志位。对于mapfile的增长操做,本系统使用了写缓存队列的操做进行支持。每次用户的上传操做以后,会将图片写入到本地队列,标志位为“LocalSmallFile”,当队列到达指定上传阈值后,再启动线程进行打包,而且更新标志位为“HdfsMapfile”。app


4.代码实现ide

在HDFS上存储文件,大量的小文件是很是消耗NameNode内存的,由于每一个文件都会分配一个文件描述符,NameNode须要在启动的时候加载所有文件的描述信息,因此文件越多,对oop

NameNode来讲开销越大。
咱们能够考虑,将小文件压缩之后,再上传到HDFS中,这时只须要一个文件描述符信息,天然大大减轻了NameNode对内存使用的开销。MapReduce计算中,Hadoop内置提供了以下几性能

种压缩格式:spa

  • DEFLATE
  • gzip
  • bzip2
  • LZO

使用压缩文件进行MapReduce计算,它的开销在于解压缩所消耗的时间,在特定的应用场景中这个也是应该考虑的问题。不过对于海量小文件的应用场景,咱们压缩了小文件,却换

来的Locality特性。
假如成百上千的小文件压缩后只有一个Block,那么这个Block必然存在一个DataNode节点上,在计算的时候输入一个InputSplit,没有网络间传输数据的开销,并且是在本地进行

运算。假若直接将小文件上传到HDFS上,成百上千的小Block分布在不一样DataNode节点上,为了计算可能须要“移动数据”以后才能进行计算。文件不多的状况下,除了NameNode内

存使用开销之外,可能感受不到网络传输开销,可是若是小文件达到必定规模就很是明显了。
下面,咱们使用gzip格式压缩小文件,而后上传到HDFS中,实现MapReduce程序进行任务处理。
使用一个类实现了基本的Map任务和Reduce任务,代码以下所示:

01 package org.shirdrn.kodz.inaction.hadoop.smallfiles.compression;
02  
03 import java.io.IOException;
04 import java.util.Iterator;
05  
06 import org.apache.hadoop.conf.Configuration;
07 import org.apache.hadoop.fs.Path;
08 import org.apache.hadoop.io.LongWritable;
09 import org.apache.hadoop.io.Text;
10 import org.apache.hadoop.io.compress.CompressionCodec;
11 import org.apache.hadoop.io.compress.GzipCodec;
12 import org.apache.hadoop.mapreduce.Job;
13 import org.apache.hadoop.mapreduce.Mapper;
14 import org.apache.hadoop.mapreduce.Reducer;
15 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
16 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
17 import org.apache.hadoop.util.GenericOptionsParser;
18  
19 public class GzipFilesMaxCostComputation {
20  
21     public static class GzipFilesMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
22  
23         private final static LongWritable costValue = new LongWritable(0);
24         private Text code = new Text();
25  
26         @Override
27         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
28             // a line, such as 'SG 253654006139495 253654006164392 619850464'
29             String line = value.toString();
30             String[] array = line.split("\\s");
31             if (array.length == 4) {
32                 String countryCode = array[0];
33                 String strCost = array[3];
34                 long cost = 0L;
35                 try {
36                     cost = Long.parseLong(strCost);
37                 catch (NumberFormatException e) {
38                     cost = 0L;
39                 }
40                 if (cost != 0) {
41                     code.set(countryCode);
42                     costValue.set(cost);
43                     context.write(code, costValue);
44                 }
45             }
46         }
47     }
48  
49     public static class GzipFilesReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
50  
51         @Override
52         protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
53             long max = 0L;
54             Iterator<LongWritable> iter = values.iterator();
55             while (iter.hasNext()) {
56                 LongWritable current = iter.next();
57                 if (current.get() > max) {
58                     max = current.get();
59                 }
60             }
61             context.write(key, new LongWritable(max));
62         }
63  
64     }
65  
66     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
67  
68         Configuration conf = new Configuration();
69         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
70         if (otherArgs.length != 2) {
71             System.err.println("Usage: gzipmaxcost <in> <out>");
72             System.exit(2);
73         }
74  
75         Job job = new Job(conf, "gzip maxcost");
76  
77         job.getConfiguration().setBoolean("mapred.output.compress"true);
78         job.getConfiguration().setClass("mapred.output.compression.codec", GzipCodec.class, CompressionCodec.class);
79  
80         job.setJarByClass(GzipFilesMaxCostComputation.class);
81         job.setMapperClass(GzipFilesMapper.class);
82         job.setCombinerClass(GzipFilesReducer.class);
83         job.setReducerClass(GzipFilesReducer.class);
84  
85         job.setMapOutputKeyClass(Text.class);
86         job.setMapOutputValueClass(LongWritable.class);
87         job.setOutputKeyClass(Text.class);
88         job.setOutputValueClass(LongWritable.class);
89  
90         job.setNumReduceTasks(1);
91  
92         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
93         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
94  
95         int exitFlag = job.waitForCompletion(true) ? 0 1;
96         System.exit(exitFlag);
97  
98     }
99 }

上面程序就是计算最大值的问题,实现比较简单,并且使用gzip压缩文件。另外,若是考虑Mapper输出后,须要向Reducer拷贝的数据量比较大,能够考虑在配置Job的时候,指定

压缩选项,详见上面代码中的配置。