大数据mapreduce全局排序top-N之python实现

a.txt、b.txt文件如下:

a.txt

1       hadoop
3       hadoop
5       hadoop
7       hadoop
9       hadoop
11      hadoop
13      hadoop
15      hadoop
17      hadoop
19      hadoop
21      hadoop
23      hadoop
25      hadoop
27      hadoop
29      hadoop
31      hadoop
33      hadoop
35      hadoop
37      hadoop
39      hadoop
41      hadoop
43      hadoop
45      hadoop
47      hadoop
49      hadoop
51      hadoop
53      hadoop
55      hadoop
57      hadoop

b.txt如下:

0       java
2       java
4       java
6       java
8       java
10      java
12      java
14      java
16      java
18      java
20      java
22      java
24      java
26      java
28      java
30      java
32      java
34      java
36      java
38      java
40      java
42      java
44      java
46      java
48      java
50      java
52      java
54      java
56      java
58      java

将a.txt、b.txt上传至hdfs文件 /mapreduce/allsort 内:

hadoop fs -put a.txt b.txt  /mapreduce/allsort

实验一:第一种全局排序为,将数字列作为key,其余为value,设置一个reduce,利用shffer阶段,进行排序:(sgffer排序默认字符串排序,需要注意)

map.py代码如下:

#!usr/bin/python
import sys
base_count=9000000000
for line in sys.stdin:
        ss=line.strip().split('\t')
        key,val=ss
        new_key=base_count-int(key)
        print "%s\t%s"%(new_key,val)

red.py代码如下:

#!usr/bin/python
import sys
base_count=9000000000
for line in sys.stdin:
    ss=line.strip().split()
    key=9000000000-int(ss[0])
    print "%s\t%s"%(key,ss[1])

run.sh代码如下:

HADOOP=/usr/local/src/hadoop-1.2.1/bin/hadoop
HADOOP_STREAMING=/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar
INPUT_PATH=/mapreduce/allsort
OUT_PATH=/mapreduce/allsort/out
$HADOOP jar $HADOOP_STREAMING \
    -input $INPUT_PATH \
    -output $OUT_PATH \
    -mapper "python map.py" \
    -reducer "python red.py" \
    -file "./map.py" \
    -file "./red.py"

不设置reduce的运行个数,默认red.py的个数为1,结果输出为一个文件,且完成了倒排序;

实验二:设置3个reduce,每个ruduce内部完成排序,且3个reduce间也可以排序;思路:3个桶,60-40为0号桶、40-20为1号桶,20以下为3号桶,每个桶内部依赖shffer排序

map.py

#!usr/bin/python
import sys
base_count=9000000000
for line in sys.stdin:
        ss=line.strip().split('\t')
        key,val=ss
        new_key=base_count-int(key)
        if int(key)>=40:
                print "%s\t%s\t%s"%("0",new_key,val)
        elif int(key)>=20:
                print "%s\t%s\t%s"%("1",new_key,val)
        else:
                print "%s\t%s\t%s"%("2",new_key,val)

red.py

#!usr/bin/python
import sys
base_count=9000000000
for line in sys.stdin:
        ss=line.strip().split()
        key=base_count-int(ss[1])
        print "%s\t%s"%(key,ss[2])

run.sh

HADOOP="/usr/local/src/hadoop-1.2.1/bin/hadoop"
HADOOP_STREAMING="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_PATH="/mapreduce/allsort"
OUT_PATH="/mapreduce/allsort/out"
$HADOOP fs -rmr $OUT_PATH
$HADOOP jar $HADOOP_STREAMING \
        -input $INPUT_PATH \
        -output $OUT_PATH \
        -mapper "python map.py" \
        -reducer "python red.py" \
        -file "./map.py" \
        -file "./red.py" \
       -jobconf "mapred.reduce.tasks=3" \
       -jobconf "stream.num.map.output.key.fields=2" \#设置前2个为key
       -jobconf "num.key.fields.for.partition=1" \ #设置第一个为partition
       -partitioner "org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner"

实验三:通过参数调控实现全局排序:

数据如下:

aaa.txt

d.1.5.23
e.9.4.5
e.5.9.22
e.5.1.45
e.5.1.23
a.7.2.6
f.8.3.3

目的:在streaming模式默认hadoop会把map输出的一行中遇到的第一个设定的字段分隔符前面的部分作为key,后面的作为 value,这里我们将map输出的前2个字段作为key,后面字段作为value,并且不使用hadoop默认的“\t”字段分隔符,而是根据该 文本特点使用“.”来分割

实现前3个字段为key排序,后面为value;

第2个和第三个字段为partition

run.sh如下:

HADOOP="/usr/local/src/hadoop-1.2.1/bin/hadoop"
HADOOP_STREAMING="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"
INPUT_PATH="/mapreduce/allsort/aaa.txt"
OUT_PATH="/mapreduce/allsort/out"
$HADOOP fs -rmr $OUT_PATH
$HADOOP jar $HADOOP_STREAMING \
        -input $INPUT_PATH \
        -output $OUT_PATH \
        -mapper "cat" \
        -reducer "cat" \
        -jobconf stream.num.map.output.key.fields=3 \
        -jobconf stream.map.output.field.separator=. \
        -jobconf map.output.key.field.separator=. \
        -jobconf mapred.text.key.partitioner.options=-k2,3 \
        -jobconf mapred.reduce.tasks=3 \
        -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner