Apache Mahout的Taste基于Hadoop实现协同过滤推荐引擎的代码分析

Taste 是 Apache Mahout 提供的一个协同过滤算法的高效实现,它是一个基于Java实现的可扩展的高效的推荐引擎。

该推荐引擎是用<userid,itemid,preference>这样简单的数据格式表达用户对物品的偏好。

以此为输入数据,计算后就可以得到为每个user推荐的items列表。

他提供了方便的单机版的编程接口,也提供了基于hadoop的分布式的实现。

单机版的编程接口主要适用于写demo和做算法的评估,若处理大规模数据,还是需分布式的实现。

以下是对org.apache.mahout.cf.taste.hadoop.item.RecommenderJob的各MapReduce步骤的一个解读。

Taste 实现一个分布式的协同过滤推荐共经历了如下12个MapReduce步骤。

以下分析了各步骤的Mapper和Reducer都做了哪些工作,并有什么格式的数据输出。

代码分析:

1、计算item的itemid_index和最小itemid值

1.1、ItemIDIndexMapper.class, VarIntWritable.class, VarLongWritable.class,

用原始输入,将userid,itemid,pref数据转成itemid_index,itemid

1.2、ItemIDIndexReducer.class, VarIntWritable.class, VarLongWritable.class,

在itemid_index,Iterator<itemid>中找最小的itemid,输出itemid_index,minimum_itemid

此处只是保存一个int型的itemid_index索引和对应的long型的itemid的映射

2、计算各user的item偏好向量,即Vector<item,pref>

2.1、ToItemPrefsMapper.class, VarLongWritable.class, booleanData ? VarLongWritable.class : EntityPrefWritable.class,

用原始输入,读入偏好数据,得到userid,<itemid,pref>

2.2、ToUserVectorReducer.class, VarLongWritable.class, VectorWritable.class,

将userid,Iterator<itemid,pref>中的itemid变成itemid_index,得到userid,Vector<itemid_index,pref>,后者用RandomAccessSparseVector来存。

3、统计数据中有多少个user

3.1、CountUsersMapper.class,CountUsersKeyWritable.class,VarLongWritable.class,

用步骤2的输出,统计独立userid数目,先转换数据为userid,userid

3.2、CountUsersReducer.class,VarIntWritable.class,NullWritable.class,

通过CountUsersPartitioner将所有数据发到一个区,一个Reducer来处理

由于userid都已排序,所以可以用极简单的方式来统计出独立userid数

输出只有一个值,即用户数

4、计算item的user偏好向量,即Vector<userid,pref>,也即拿步骤2的结果做矩阵的修剪和转置

4.1、MaybePruneRowsMapper.class,IntWritable.class,DistributedRowMatrix.MatrixEntryWritable.class,

用步骤2的输出,按指定的maxCooccurrences参数值来修剪Vector的数目,目的是控制计算的规模,减少计算量

然后转为以userid_index为列号、itemid_index为行号、pref为值的矩阵,用MatrixEntryWritable表示矩阵。

输出为itemid_index,Matrix<userid_index,itemid_index,pref>

4.2、ToItemVectorsReducer.class,IntWritable.class,VectorWritable.class,

输出为itemid_index,Vector<userid_index,pref>,相当于对步骤2的结果进行了矩阵的转置,

有了偏好矩阵数据,接下来会调用RowSimilarityJob来计算行的相似度

此处的行是item,所以默认是item-base的CF。

但其实可以通过传入是否转置的参数来对步骤1进行调整,将userid和itemid转换,就可以实现user-base的CF。

此处也可以通过similarityClassname参数来指定用哪种算法来计算相似度。

RowSimilarityJob将通过接下来的3个步骤来实现:

5、用相似度算法给向量赋权

5.1、RowWeightMapper.class,VarIntWritable.class,WeightedOccurrence.class,

用相应的相似度算法来计算步骤4的输出,计算每个itemid_index所对应的Vector<userid_index,pref>的weight。

输出为userid_index,WeightedOccurrence<itemid_index,pref,weight>,WeightedOccurrence是一个简单的数据封装类。

5.2、WeightedOccurrencesPerColumnReducer.class,VarIntWritable.class,WeightedOccurrenceArray.class,

将Iterator<WeightedOccurrence>简单变为WeightedOccurrenceArray,后者只是简单继承了ArrayWritable。

最后输出结果为userid_index,WeightedOccurrenceArray,数组的数据项是WeightedOccurrence<itemid_index,pref,weight>

6、用相似度算法计算相似度,得到相似度矩阵

6.1、CooccurrencesMapper.class,WeightedRowPair.class,Cooccurrence.class,

取出步骤5的结果,将WeightedOccurrenceArray的数据双重循环,拼装如下的KV数据结构

WeightedRowPair<itemid_indexA,itemid_indexB,weightA,weightB>,Cooccurrence<userid_index,prefA,prefB>

6.2、SimilarityReducer.class,SimilarityMatrixEntryKey.class,DistributedRowMatrix.MatrixEntryWritable.class,

此步骤的Map输出,也即Reduce的输入是WeightedRowPair<itemid_indexA,itemid_indexB,weightA,weightB>, Iterator<Cooccurrence<userid_index,prefA,prefB>>

也即itemA和itemB的weight,以及不同user对itemA和itemB的pref。

相应的Similarity实例就可以利用以上数据计算itemA与itemB的相似度评分similarityValue

输出结果为SimilarityMatrixEntryKey<itemid_indexA,similarityValue>,Matrix<itemid_indexA,itemid_indexB,similarityValue>

也就是不同item和itemA的俩俩相似度,得到一个相似度矩阵

7、将相似度矩阵转为向量存储

7.1、Mapper.class,SimilarityMatrixEntryKey.class,DistributedRowMatrix.MatrixEntryWritable.class,

将步骤6的结果简单读入,item相似度矩阵

7.2、EntriesToVectorsReducer.class,IntWritable.class,VectorWritable.class,

输出为itemid_indexA,Vector<itemid_indexX,similarityValue>,Vector用SequentialAccessSparseVector存储。

也就是输出为不同的其他item与itemA之间的相似度值

8、PartialMultiply的预处理1,填充vector部分的数据

8.1、SimilarityMatrixRowWrapperMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,

用步骤7的相似度数据,输出itemid_index,VectorOrPrefWritable(vector,null,null)

8.2、Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,

默认Reducer,直接输出Mapper的输出

9、PartialMultiply的预处理2,填充userid和pref部分的数据

9.1、UserVectorSplitterMapper.class, VarIntWritable.class, VectorOrPrefWritable.class,

如果提供了一个userid列表文件,Mapper初始化时会先读入该文件到FastIDSet<userid>中

如果userid不在这个Set中,则会直接return,也就是只会为该列表中的user做推荐

用步骤2的用户对各item的偏好数据,输出itemid_index,VectorOrPrefWritable(null,userid,pref)

9.2、Reducer.class, VarIntWritable.class, VectorOrPrefWritable.class,

默认Reducer,直接输出Mapper的输出

10、拼装两个PartialMultiply预处理的数据

10.1、Mapper.class, VarIntWritable.class, VectorOrPrefWritable.class,

用FileInputFormat.setInputPaths指定多个路径,将步骤8和9的输出同时作为输入

10.2、ToVectorAndPrefReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,

将VectorOrPrefWritable(vector,null,null)和VectorOrPrefWritable(null,userid,pref)

变为VectorAndPrefsWritable(vector,List<userid>,List<pref>)

最后的输出是itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)

11、如果设置了item过滤文件则读取,作为黑名单

11.1、ItemFilterMapper.class, VarLongWritable.class, VarLongWritable.class,

简单读入item过滤文件,输出为itemid,userid,这相当于“黑”名单,用于后面推荐结果的过滤。

11.2、ItemFilterAsVectorAndPrefsReducer.class, VarIntWritable.class, VectorAndPrefsWritable.class,

输出为itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>)

其中vector的值为vector(itemid_index,Double.NaN),pref的值都用1.0f来填充。

注意,vector的第二项数据,也即similarityValue被设置为Double.NaN,后面将会用这个来判断这是否是黑名单。

12、用相似度矩阵的PartialMultiply做推荐计算

12.1、PartialMultiplyMapper.class, VarLongWritable.class, PrefAndSimilarityColumnWritable.class,

如果步骤11存在,则用FileInputFormat.setInputPaths指定多个路径,将步骤10和11的输出同时作为输入

也即输入为itemid_index,VectorAndPrefsWritable(vector,List<userid>,List<pref>),其中vector的值为Vector<itemid_index,similarityValue>

输出为userid,PrefAndSimilarityColumnWritable(pref,vector<itemid_index,similarityValue>)

12.2、AggregateAndRecommendReducer.class, VarLongWritable.class, RecommendedItemsWritable.class,

初始化时,会读入步骤1的结果,是一个HashMap<itemid_index,itemid>,也即index和itemid的映射

若设置了item白名单文件,则初始化时也会读入文件到FastIDSet<itemid>,推荐结果必须在这里边。和步骤11的黑名单相反。

Reducer在处理时会区分是否是booleanData而用不同的处理逻辑,此处我们主要讨论非booleanData,也即有实际pref数据的情况而不是默认用1.0f来填充的pref。

Reducer中进行PartialMultiply,按乘积得到的推荐度的大小取出最大的几个item。

处理的过程中需要将itemid_index通过HashMap转换回itemid,并且用“黑”“白”名单进行过滤。

白名单很容易理解,用集合是否为空和集合的contains();

黑名单是判断Float.isNaN(similarityValue),因为此前在步骤11的输出时黑名单的similarityValue被设置为了Double.NaN。

对于非booleanData,是用pref和相似度矩阵的PartialMultiply得到推荐度的值来进行排序。

而booleanData的pref值都是1.0f,所以去计算矩阵相乘的过程没有意义,直接累加相似度的值即可。

用这个数据排序就可得到推荐结果。

输出为userid,RecommendedItemsWritable,后者实际是List<RecommendedItem<itemid,pref>>,

这里的pref是相似度矩阵的PartialMultiply或是相似度累加计算出来的值而非实际值。

后注:

以上提到的FastIDSet,SequentialAccessSparseVector,RandomAccessSparseVector等等数据结构,

是Mahout提供的一些大数据量存储和处理的一些高效实现,

针对数据的特点而做的有针对性的优化,同时解决性能和空间的问题。

在Mahout in Action的讨论CF和Cluster等的“数据的表达”章节中有专门的阐述,此处不再详细解释。