• Posts tagged "Mahout"

Blog Archives

Mahout构建图书推荐系统

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-recommend-book/

mahout-recommendation-book

前言

本文是Mahout实现推荐系统的又一案例,用Mahout构建图书推荐系统。与之前的两篇文章,思路上面类似,侧重点在于图书的属性如何利用。本文的数据在自于Amazon网站,由爬虫抓取获得。

目录

  1. 项目背景
  2. 需求分析
  3. 数据说明
  4. 算法模型
  5. 程序开发

1. 项目背景

Amazon是最早的电子商务网站之一,以网上图书起家,最后发展成为音像,电子消费品,游戏,生活用品等的综合性电子商务平台。Amazon的推荐系统,是互联网上最早的商品推荐系统,它为Amazon带来了至少30%的流量,和可观的销售利润。

如今推荐系统已经成为电子商务网站的标配,如果还没有推荐系统都不好意思,说自己是做电商的。

2. 需求分析

推荐系统如此重要,我们应该如果理解?

打开Amazon的Mahout In Action图书页面:
http://www.amazon.com/Mahout-Action-Sean-Owen/dp/1935182684/ref=pd_sim_b_1?ie=UTF8&refRID=0H4H2NSSR8F34R76E2TP

网页上的元素:

  • 广告位:广告商投放广告的位置,网站可以靠网络广告赚钱,一般是网页最好的位置。
  • 平均分:用户对图书的打分
  • 关联规则:通过关联规则,推荐位
  • 协同过滤:通过基于物品的协同过滤算法的,推荐位
  • 图书属性:包括页数,出版社,ISBN,语言等
  • 作者介绍:有关作者的介绍,和作者的其他著作
  • 用户评分:用户评分行为
  • 用户评论:用户评论的内容

amazon-book

在网页上,其他的推荐位:

amazon-book-2

结合上面2张截图,我们不难发现,推荐对于Amazon的重要性。除了最明显的广告位给了能直接带来利润的广告商,网页中有4处推荐位,分别从不同的维度,用不同的推荐算法,猜用户喜欢的商品。

3. 数据说明

2个数据文件:

  • rating.csv :用户评分行为数据
  • users.csv :用户属性数据

1). book-ratings.csv

  • 3列数据:用户ID,图书ID, 用户对图书的评分
  • 记录数: 4000次的图书评分
  • 用户数: 200个
  • 图书数: 1000个
  • 评分:1-10

数据示例


1,565,3
1,807,2
1,201,1
1,557,9
1,987,10
1,59,5
1,305,6
1,153,3
1,139,7
1,875,5
1,722,10
2,977,4
2,806,3
2,654,8
2,21,8
2,662,5
2,437,6
2,576,3
2,141,8
2,311,4
2,101,3
2,540,9
2,87,3
2,65,8
2,501,6
2,710,5
2,331,9
2,542,4
2,757,9
2,590,7

2). users.csv

  • 3列数据:用户ID,用户性别,用户年龄
  • 用户数: 200个
  • 用户性别: M为男性,F为女性
  • 用户年龄: 11-80岁之间

数据示例


1,M,40
2,M,27
3,M,41
4,F,43
5,F,16
6,M,36
7,F,36
8,F,46
9,M,50
10,M,21
11,F,11
12,M,42
13,F,40
14,F,28
15,M,25
16,M,68
17,M,53
18,F,69
19,F,48
20,F,56
21,F,36

4. 算法模型

本文主要介绍Mahout的基于物品的协同过滤模型,其他的算法模型将不再这里解释。

针对上面的数据,我将用7种算法组合进行测试:有关Mahout算法组合的详细解释,请参考文章:从源代码剖析Mahout推荐引擎

7种算法组合

  • userCF1: EuclideanSimilarity+ NearestNUserNeighborhood+ GenericUserBasedRecommender
  • userCF2: LogLikelihoodSimilarity+ NearestNUserNeighborhood+ GenericUserBasedRecommender
  • userCF3: EuclideanSimilarity+ NearestNUserNeighborhood+ GenericBooleanPrefUserBasedRecommender
  • itemCF1: EuclideanSimilarity + GenericItemBasedRecommender
  • itemCF2: LogLikelihoodSimilarity + GenericItemBasedRecommender
  • itemCF3: EuclideanSimilarity + GenericBooleanPrefItemBasedRecommender
  • slopeOne:SlopeOneRecommender

对上面的算法进行算法评估,有关于算法评估的详细解释,请参考文章:Mahout推荐算法API详解

  • 查准率:
  • 召回率(查全率):

5. 程序开发

系统架构:Mahout中推荐过滤算法支持单机算法和分步式算法两种。

  • 单机算法: 在单机内存计算,支持多种算法推荐算法,部署运行简单,修正处理数据量有限
  • 分步式算法: 基于Hadoop集群运行,支持有限的几种推荐算法,部署运行复杂,支持海量数据

mahout-recommend-job-architect

开发环境

  • Win7 64bit
  • Java 1.6.0_45
  • Maven3
  • Eclipse Juno Service Release 2
  • Mahout-0.8
  • Hadoop-1.1.2

开发环境mahout版本为0.8。 请参考文章:用Maven构建Mahout项目

新建Java类:

  • BookEvaluator.java, 选出“评估推荐器”验证得分较高的算法
  • BookResult.java, 对指定数量的结果人工比较
  • BookFilterGenderResult.java,只保留男性用户的图书列表

1). BookEvaluator.java, 选出“评估推荐器”验证得分较高的算法

源代码


package org.conan.mymahout.recommendation.book;

import java.io.IOException;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.neighborhood.UserNeighborhood;
import org.apache.mahout.cf.taste.similarity.ItemSimilarity;
import org.apache.mahout.cf.taste.similarity.UserSimilarity;

public class BookEvaluator {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/book/rating.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        userEuclidean(dataModel);
        userLoglikelihood(dataModel);
        userEuclideanNoPref(dataModel);
        itemEuclidean(dataModel);
        itemLoglikelihood(dataModel);
        itemEuclideanNoPref(dataModel);
        slopeOne(dataModel);
    }

    public static RecommenderBuilder userEuclidean(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userEuclidean");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
    
    public static RecommenderBuilder userLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userLoglikelihood");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
    
    public static RecommenderBuilder userEuclideanNoPref(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userEuclideanNoPref");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemEuclidean(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemEuclidean");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemLoglikelihood");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
    
    public static RecommenderBuilder itemEuclideanNoPref(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemEuclideanNoPref");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder slopeOne(DataModel dataModel) throws TasteException, IOException {
        System.out.println("slopeOne");
        RecommenderBuilder recommenderBuilder = RecommendFactory.slopeOneRecommender();

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
}

控制台输出:


userEuclidean
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.33333325386047363
Recommender IR Evaluator: [Precision:0.3010752688172043,Recall:0.08542713567839195]
userLoglikelihood
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.5245869159698486
Recommender IR Evaluator: [Precision:0.11764705882352945,Recall:0.017587939698492466]
userEuclideanNoPref
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:4.288461538461536
Recommender IR Evaluator: [Precision:0.09045226130653267,Recall:0.09296482412060306]
itemEuclidean
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:1.408880928305655
Recommender IR Evaluator: [Precision:0.0,Recall:0.0]
itemLoglikelihood
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.448554412835434
Recommender IR Evaluator: [Precision:0.0,Recall:0.0]
itemEuclideanNoPref
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.5665197873957957
Recommender IR Evaluator: [Precision:0.6005025125628134,Recall:0.6055276381909548]
slopeOne
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.6893078179405814
Recommender IR Evaluator: [Precision:0.0,Recall:0.0]

可视化“评估推荐器”输出:

推荐的结果的平均距离

difference

推荐器的评分

evaluator

只有itemEuclideanNoPref算法评估的结果是非常好的,其他算法的结果都不太好。

2). BookResult.java, 对指定数量的结果人工比较

为得到差异化结果,我们分别取4个算法:userEuclidean,itemEuclidean,userEuclideanNoPref,itemEuclideanNoPref,对推荐结果人工比较。

源代码


package org.conan.mymahout.recommendation.book;

import java.io.IOException;
import java.util.List;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;

public class BookResult {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/book/rating.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        RecommenderBuilder rb1 = BookEvaluator.userEuclidean(dataModel);
        RecommenderBuilder rb2 = BookEvaluator.itemEuclidean(dataModel);
        RecommenderBuilder rb3 = BookEvaluator.userEuclideanNoPref(dataModel);
        RecommenderBuilder rb4 = BookEvaluator.itemEuclideanNoPref(dataModel);
        
        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            System.out.print("userEuclidean       =>");
            result(uid, rb1, dataModel);
            System.out.print("itemEuclidean       =>");
            result(uid, rb2, dataModel);
            System.out.print("userEuclideanNoPref =>");
            result(uid, rb3, dataModel);
            System.out.print("itemEuclideanNoPref =>");
            result(uid, rb4, dataModel);
        }
    }

    public static void result(long uid, RecommenderBuilder recommenderBuilder, DataModel dataModel) throws TasteException {
        List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
        RecommendFactory.showItems(uid, list, false);
    }
}

控制台输出:只截取部分结果


...
userEuclidean       =>uid:63,
itemEuclidean       =>uid:63,(984,9.000000)(690,9.000000)(943,8.875000)
userEuclideanNoPref =>uid:63,(4,1.000000)(723,1.000000)(300,1.000000)
itemEuclideanNoPref =>uid:63,(867,3.791667)(947,3.083333)(28,2.750000)
userEuclidean       =>uid:64,
itemEuclidean       =>uid:64,(368,8.615385)(714,8.200000)(290,8.142858)
userEuclideanNoPref =>uid:64,(860,1.000000)(490,1.000000)(64,1.000000)
itemEuclideanNoPref =>uid:64,(409,3.950000)(715,3.830627)(901,3.444048)
userEuclidean       =>uid:65,(939,7.000000)
itemEuclidean       =>uid:65,(550,9.000000)(334,9.000000)(469,9.000000)
userEuclideanNoPref =>uid:65,(939,2.000000)(185,1.000000)(736,1.000000)
itemEuclideanNoPref =>uid:65,(666,4.166667)(96,3.093931)(345,2.958333)
userEuclidean       =>uid:66,
itemEuclidean       =>uid:66,(971,9.900000)(656,9.600000)(918,9.577709)
userEuclideanNoPref =>uid:66,(6,1.000000)(492,1.000000)(676,1.000000)
itemEuclideanNoPref =>uid:66,(185,3.650000)(533,3.617307)(172,3.500000)
userEuclidean       =>uid:67,
itemEuclidean       =>uid:67,(663,9.700000)(987,9.625000)(486,9.600000)
userEuclideanNoPref =>uid:67,(732,1.000000)(828,1.000000)(113,1.000000)
itemEuclideanNoPref =>uid:67,(724,3.000000)(279,2.950000)(890,2.750000)
...

我们查看uid=65的用户推荐信息:

查看user.csv数据集


> user[65,]
userid gender age
65     65      M  14

用户65,男性,14岁。

以itemEuclideanNoPref的算法的推荐结果,查看bookid=666的图书评分情况


> rating[which(rating$bookid==666),]
userid bookid pref
646      44    666   10
1327     89    666    7
2470    165    666    3
2697    179    666    7

发现有4个用户对666的图书评分,查看这4个用户的属性数据


> user[c(44,89,165,179),]
userid gender age
44      44      F  76
89      89      M  40
165    165      F  59
179    179      F  68

这4个用户,3女1男。

我们假设男性和男性有相同的图书兴趣,女性和女性有相同的图书偏好。因为用户65是男性,所以我们接下来排除女性的评分者,只保留男性评分者的评分记录。

3). BookFilterGenderResult.java,只保留男性用户的图书列表

源代码


package org.conan.mymahout.recommendation.book;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.IDRescorer;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;

public class BookFilterGenderResult {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/book/rating.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        RecommenderBuilder rb1 = BookEvaluator.userEuclidean(dataModel);
        RecommenderBuilder rb2 = BookEvaluator.itemEuclidean(dataModel);
        RecommenderBuilder rb3 = BookEvaluator.userEuclideanNoPref(dataModel);
        RecommenderBuilder rb4 = BookEvaluator.itemEuclideanNoPref(dataModel);
        
        long uid = 65;
        System.out.print("userEuclidean       =>");
        filterGender(uid, rb1, dataModel);
        System.out.print("itemEuclidean       =>");
        filterGender(uid, rb2, dataModel);
        System.out.print("userEuclideanNoPref =>");
        filterGender(uid, rb3, dataModel);
        System.out.print("itemEuclideanNoPref =>");
        filterGender(uid, rb4, dataModel);
    }

    /**
     * 对用户性别进行过滤
     */
    public static void filterGender(long uid, RecommenderBuilder recommenderBuilder, DataModel dataModel) throws TasteException, IOException {
        Set userids = getMale("datafile/book/user.csv");

        //计算男性用户打分过的图书
        Set bookids = new HashSet();
        for (long uids : userids) {
            LongPrimitiveIterator iter = dataModel.getItemIDsFromUser(uids).iterator();
            while (iter.hasNext()) {
                long bookid = iter.next();
                bookids.add(bookid);
            }
        }

        IDRescorer rescorer = new FilterRescorer(bookids);
        List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM, rescorer);
        RecommendFactory.showItems(uid, list, false);
    }

    /**
     * 获得男性用户ID
     */
    public static Set getMale(String file) throws IOException {
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));
        Set userids = new HashSet();
        String s = null;
        while ((s = br.readLine()) != null) {
            String[] cols = s.split(",");
            if (cols[1].equals("M")) {// 判断男性用户
                userids.add(Long.parseLong(cols[0]));
            }
        }
        br.close();
        return userids;
    }

}

/**
 * 对结果重计算
 */
class FilterRescorer implements IDRescorer {
    final private Set userids;

    public FilterRescorer(Set userids) {
        this.userids = userids;
    }

    @Override
    public double rescore(long id, double originalScore) {
        return isFiltered(id) ? Double.NaN : originalScore;
    }

    @Override
    public boolean isFiltered(long id) {
        return userids.contains(id);
    }
}

控制台输出:


userEuclidean       =>uid:65,
itemEuclidean       =>uid:65,(784,8.090909)(276,8.000000)(476,7.666667)
userEuclideanNoPref =>uid:65,
itemEuclideanNoPref =>uid:65,(887,2.250000)(356,2.166667)(430,1.866667)

我们发现,由于只保留男性的评分记录,数据量就变得比较少了,基于用户的协同过滤算法,已经没有输出的结果了。基于物品的协同过滤算法,结果集也有所变化。

对于itemEuclideanNoPref算法,输出排名第一条为ID为887的图书。

我再进一步向下追踪:查询哪些用户对图书887进行了打分。


> rating[which(rating$bookid==887),]
userid bookid pref
1280     85    887    2
1743    119    887    8
2757    184    887    4
2791    186    887    5

有4个用户对图书887评分,再分别查看这个用户的属性


> user[c(85,119,184,186),]
userid gender age
85      85      F  31
119    119      F  49
184    184      M  27
186    186      M  35

其中2男,2女。由于我们的算法,已经排除了女性的评分,我们可以推断图书887的推荐应该来自于2个男性的评分者的推荐。

分别计算用户65,与用户184和用户186的评分的图书交集。


rat65<-rating[which(rating$userid==65),]
rat184<-rating[which(rating$userid==184),]
rat186<-rating[which(rating$userid==186),]

> intersect(rat65$bookid ,rat184$bookid)
integer(0)
> intersect(rat65$bookid ,rat186$bookid)
[1]  65 375

最后发现,用户65与用户186都给图书65和图书375打过分。我们再打分出用户186的评分记录。


> rat186
userid bookid pref
2790    186     65    7
2791    186    887    5
2792    186    529    3
2793    186    375    6
2794    186    566    7
2795    186    169    4
2796    186    907    1
2797    186    821    2
2798    186    720    5
2799    186    642    5
2800    186    137    3
2801    186    744    1
2802    186    896    2
2803    186    156    6
2804    186    392    3
2805    186    386    3
2806    186    901    7
2807    186     69    6
2808    186    845    6
2809    186    998    3

用户186,还给图书887打过分,所以对于给65用户推荐图书887,是合理的。

我们通过一个实际的图书推荐的案例,更进一步地了解了如何用Mahout构建推荐系统。

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-mahout-recommend-book/

打赏作者

用Mahout构建职位推荐引擎

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-recommend-job/

mahout-recommender-job

前言

随着大数据思想实施的落地,推荐系统也开始倍受关注。不光是电商,各种互联网应用都开始应用推荐系统,像搜索,社交网络,音乐,餐饮,地图服务等等。

在以前,我们没有使用推荐算法的时候,我们是通过设置各种约束条件,匹配数据的自然属性呈现给用户,这种就是基于规则的系统。比如,用户购买了一个商品,我们会推荐同类别的其他商品,通过类别属性作为推荐的规则。后来问题就出现了,当用户一次性买了多种类别的不同商品的时候,前一条规则就失败了,我们要进一步设计规则,IT类别优先推荐,价格高的产品优先推荐…..几个回合下来,我们要不停的增加规则,以至于规则有可能的会前后冲突,增加一条新的规则会让推荐结果越来越不好,而且还无法解释是为什么。

推荐算法从另一角度入手,解决了基于规则设置的问题。下面将用Mahout来构建一个职位推荐算法引擎。

目录

  1. Mahout推荐框架概述
  2. 需求分析:职位推荐引擎指标设计
  3. 算法模型:推荐算法
  4. 架构设计:职位推荐引擎系统架构
  5. 程序开发:基于Mahout的推荐算法实现

1. Mahout推荐系统框架概述

Mahout框架包含了一套完整的推荐系统引擎,标准化的数据结构,多样的算法实现,简单的开发流程。Mahout推荐的推荐系统引擎是模块化的,分为5个主要部分组成:数据模型,相似度算法,近邻算法,推荐算法,算法评分器。

更详细的介绍,请参考文章:从源代码剖析Mahout推荐引擎

2. 需求分析:职位推荐引擎指标设计

下面我们将从一个公司案例出发来全面的解释,如何进行职位推荐引擎指标设计。

案例介绍:
互联网某职业社交网站,主要产品包括 个人简历展示页,人脉圈,微博及分享链接,职位发布,职位申请,教育培训等。

用户在完成注册后,需要完善自己的个人信息,包括教育背景,工作经历,项目经历,技能专长等等信息。然后,你要告诉网站,你是否想找工作!!当你选择“是”(求职中),网站会从数据库中为你推荐你可能感兴趣的职位。

通过简短的描述,我们可以粗略地看出,这家职业社交网站的定位和主营业务。核心点有2个:

  • 用户:尽可能多的保存有效完整的用户资料
  • 服务:帮助用户找到工作,帮助猎头和企业找到员工

因此,职位推荐引擎 将成为这个网站的核心功能。

KPI指标设计

  • 通过推荐带来的职位浏览量: 职位网页的PV
  • 通过推荐带来的职位申请量: 职位网页的有效转化

3. 算法模型:推荐算法

2个测试数据集:

  • pv.csv: 职位被浏览的信息,包括用户ID,职位ID
  • job.csv: 职位基本信息,包括职位ID,发布时间,工资标准

1). pv.csv

  • 2列数据:用户ID,职位ID(userid,jobid)
  • 浏览记录:2500条
  • 用户数:1000个,用户ID:1-1000
  • 职位数:200个,职位ID:1-200

部分数据:

1,11
2,136
2,187
3,165
3,1
3,24
4,8
4,199
5,32
5,100
6,14
7,59
7,147
8,92
9,165
9,80
9,171
10,45
10,31
10,1
10,152

2). job.csv

  • 3列数据:职位ID,发布时间,工资标准(jobid,create_date,salary)
  • 职位数:200个,职位ID:1-200

部分数据:

1,2013-01-24,5600
2,2011-03-02,5400
3,2011-03-14,8100
4,2012-10-05,2200
5,2011-09-03,14100
6,2011-03-05,6500
7,2012-06-06,37000
8,2013-02-18,5500
9,2010-07-05,7500
10,2010-01-23,6700
11,2011-09-19,5200
12,2010-01-19,29700
13,2013-09-28,6000
14,2013-10-23,3300
15,2010-10-09,2700
16,2010-07-14,5100
17,2010-05-13,29000
18,2010-01-16,21800
19,2013-05-23,5700
20,2011-04-24,5900

为了完成KPI的指标,我们把问题用“技术”语言转化一下:我们需要让职位的推荐结果更准确,从而增加用户的点击。

  • 1. 组合使用推荐算法,选出“评估推荐器”验证得分较高的算法
  • 2. 人工验证推荐结果
  • 3. 职位有时效性,推荐的结果应该是发布半年内的职位
  • 4. 工资的标准,应不低于用户浏览职位工资的平均值的80%

我们选择UserCF,ItemCF,SlopeOne的 3种推荐算法,进行7种组合的测试。

  • userCF1: LogLikelihoodSimilarity + NearestNUserNeighborhood + GenericBooleanPrefUserBasedRecommender
  • userCF2: CityBlockSimilarity+ NearestNUserNeighborhood + GenericBooleanPrefUserBasedRecommender
  • userCF3: UserTanimoto + NearestNUserNeighborhood + GenericBooleanPrefUserBasedRecommender
  • itemCF1: LogLikelihoodSimilarity + GenericBooleanPrefItemBasedRecommender
  • itemCF2: CityBlockSimilarity+ GenericBooleanPrefItemBasedRecommender
  • itemCF3: ItemTanimoto + GenericBooleanPrefItemBasedRecommender
  • slopeOne:SlopeOneRecommender

关于的推荐算法的详细介绍,请参考文章:Mahout推荐算法API详解

关于算法的组合的详细介绍,请参考文章:从源代码剖析Mahout推荐引擎

4. 架构设计:职位推荐引擎系统架构

mahout-recommend-job-architect

上图中,左边是Application业务系统,右边是Mahout,下边是Hadoop集群。

  • 1. 当数据量不太大时,并且算法复杂,直接选择用Mahout读取CSV或者Database数据,在单机内存中进行计算。Mahout是多线程的应用,会并行使用单机所有系统资源。
  • 2. 当数据量很大时,选择并行化算法(ItemCF),先业务系统的数据导入到Hadoop的HDFS中,然后用Mahout访问HDFS实现算法,这时算法的性能与整个Hadoop集群有关。
  • 3. 计算后的结果,保存到数据库中,方便查询

5. 程序开发:基于Mahout的推荐算法实现

开发环境mahout版本为0.8。 ,请参考文章:用Maven构建Mahout项目

新建Java类:

  • RecommenderEvaluator.java, 选出“评估推荐器”验证得分较高的算法
  • RecommenderResult.java, 对指定数量的结果人工比较
  • RecommenderFilterOutdateResult.java,排除过期职位
  • RecommenderFilterSalaryResult.java,排除工资过低的职位

1). RecommenderEvaluator.java, 选出“评估推荐器”验证得分较高的算
源代码:


public class RecommenderEvaluator {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/job/pv.csv";
        DataModel dataModel = RecommendFactory.buildDataModelNoPref(file);
        userLoglikelihood(dataModel);
        userCityBlock(dataModel);
        userTanimoto(dataModel);
        itemLoglikelihood(dataModel);
        itemCityBlock(dataModel);
        itemTanimoto(dataModel);
        slopeOne(dataModel);
    }

    public static RecommenderBuilder userLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userLoglikelihood");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder userCityBlock(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userCityBlock");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.CITYBLOCK, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder userTanimoto(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userTanimoto");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.TANIMOTO, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemLoglikelihood");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemCityBlock(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemCityBlock");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.CITYBLOCK, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemTanimoto(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemTanimoto");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.TANIMOTO, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder slopeOne(DataModel dataModel) throws TasteException, IOException {
        System.out.println("slopeOne");
        RecommenderBuilder recommenderBuilder = RecommendFactory.slopeOneRecommender();

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder knnLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("knnLoglikelihood");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemKNNRecommender(itemSimilarity, new NonNegativeQuadraticOptimizer(), 10);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        return recommenderBuilder;
    }

    public static RecommenderBuilder knnTanimoto(DataModel dataModel) throws TasteException, IOException {
        System.out.println("knnTanimoto");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.TANIMOTO, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemKNNRecommender(itemSimilarity, new NonNegativeQuadraticOptimizer(), 10);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        return recommenderBuilder;
    }

    public static RecommenderBuilder knnCityBlock(DataModel dataModel) throws TasteException, IOException {
        System.out.println("knnCityBlock");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.CITYBLOCK, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemKNNRecommender(itemSimilarity, new NonNegativeQuadraticOptimizer(), 10);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        return recommenderBuilder;
    }

    public static RecommenderBuilder svd(DataModel dataModel) throws TasteException {
        System.out.println("svd");
        RecommenderBuilder recommenderBuilder = RecommendFactory.svdRecommender(new ALSWRFactorizer(dataModel, 5, 0.05, 10));

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        return recommenderBuilder;
    }

    public static RecommenderBuilder treeClusterLoglikelihood(DataModel dataModel) throws TasteException {
        System.out.println("treeClusterLoglikelihood");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        ClusterSimilarity clusterSimilarity = RecommendFactory.clusterSimilarity(RecommendFactory.SIMILARITY.FARTHEST_NEIGHBOR_CLUSTER, userSimilarity);
        RecommenderBuilder recommenderBuilder = RecommendFactory.treeClusterRecommender(clusterSimilarity, 3);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        return recommenderBuilder;
    }
}

运行结果,控制台输出:


userLoglikelihood
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.2741487771272658
Recommender IR Evaluator: [Precision:0.6424242424242422,Recall:0.4098360655737705]
userCityBlock
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.575306732961736
Recommender IR Evaluator: [Precision:0.919580419580419,Recall:0.4371584699453552]
userTanimoto
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.5546485136181523
Recommender IR Evaluator: [Precision:0.6625766871165644,Recall:0.41803278688524603]
itemLoglikelihood
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.5398332608612343
Recommender IR Evaluator: [Precision:0.26229508196721296,Recall:0.26229508196721296]
itemCityBlock
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.9251437840891661
Recommender IR Evaluator: [Precision:0.02185792349726776,Recall:0.02185792349726776]
itemTanimoto
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.9176432856689655
Recommender IR Evaluator: [Precision:0.26229508196721296,Recall:0.26229508196721296]
slopeOne
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.0
Recommender IR Evaluator: [Precision:0.01912568306010929,Recall:0.01912568306010929]

可视化“评估推荐器”输出:

difference

evaluator

UserCityBlock算法评估的结果是最好的,基于UserCF的算法比ItemCF都要好,SlopeOne算法几乎没有得分。

2). RecommenderResult.java, 对指定数量的结果人工比较
为得到差异化结果,我们分别取UserCityBlock,itemLoglikelihood,对推荐结果人工比较。

源代码:


public class RecommenderResult {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/job/pv.csv";
        DataModel dataModel = RecommendFactory.buildDataModelNoPref(file);
        RecommenderBuilder rb1 = RecommenderEvaluator.userCityBlock(dataModel);
        RecommenderBuilder rb2 = RecommenderEvaluator.itemLoglikelihood(dataModel);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            System.out.print("userCityBlock    =>");
            result(uid, rb1, dataModel);
            System.out.print("itemLoglikelihood=>");
            result(uid, rb2, dataModel);
        }
    }

    public static void result(long uid, RecommenderBuilder recommenderBuilder, DataModel dataModel) throws TasteException {
        List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
        RecommendFactory.showItems(uid, list, false);
    }

}

控制台输出:只截取部分结果


...
userCityBlock    =>uid:968,(61,0.333333)
itemLoglikelihood=>uid:968,(121,1.429362)(153,1.239939)(198,1.207726)
userCityBlock    =>uid:969,
itemLoglikelihood=>uid:969,(75,1.326499)(30,0.873100)(85,0.763344)
userCityBlock    =>uid:970,
itemLoglikelihood=>uid:970,(13,0.748417)(156,0.748417)(122,0.748417)
userCityBlock    =>uid:971,
itemLoglikelihood=>uid:971,(38,2.060951)(104,1.951208)(83,1.941735)
userCityBlock    =>uid:972,
itemLoglikelihood=>uid:972,(131,1.378395)(4,1.349386)(87,0.881816)
userCityBlock    =>uid:973,
itemLoglikelihood=>uid:973,(196,1.432040)(140,1.398066)(130,1.380335)
userCityBlock    =>uid:974,(19,0.200000)
itemLoglikelihood=>uid:974,(145,1.994049)(121,1.794289)(98,1.738027)
...

我们查看uid=974的用户推荐信息:

搜索pv.csv:


> pv[which(pv$userid==974),]
     userid jobid
2426    974   106
2427    974   173
2428    974    82
2429    974   188
2430    974    78

搜索job.csv:


> job[job$jobid %in% c(145,121,98,19),]
    jobid create_date salary
19     19  2013-05-23   5700
98     98  2010-01-15   2900
121   121  2010-06-19   5300
145   145  2013-08-02   6800

上面两种算法,推荐的结果都是2010年的职位,这些结果并不是太好,接下来我们要排除过期职位,只保留2013年的职位。

3).RecommenderFilterOutdateResult.java,排除过期职位
源代码:



public class RecommenderFilterOutdateResult {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/job/pv.csv";
        DataModel dataModel = RecommendFactory.buildDataModelNoPref(file);
        RecommenderBuilder rb1 = RecommenderEvaluator.userCityBlock(dataModel);
        RecommenderBuilder rb2 = RecommenderEvaluator.itemLoglikelihood(dataModel);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            System.out.print("userCityBlock    =>");
            filterOutdate(uid, rb1, dataModel);
            System.out.print("itemLoglikelihood=>");
            filterOutdate(uid, rb2, dataModel);
        }
    }

    public static void filterOutdate(long uid, RecommenderBuilder recommenderBuilder, DataModel dataModel) throws TasteException, IOException {
        Set jobids = getOutdateJobID("datafile/job/job.csv");
        IDRescorer rescorer = new JobRescorer(jobids);
        List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM, rescorer);
        RecommendFactory.showItems(uid, list, true);
    }

    public static Set getOutdateJobID(String file) throws IOException {
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));
        Set jobids = new HashSet();
        String s = null;
        while ((s = br.readLine()) != null) {
            String[] cols = s.split(",");
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
            Date date = null;
            try {
                date = df.parse(cols[1]);
                if (date.getTime() < df.parse("2013-01-01").getTime()) {
                    jobids.add(Long.parseLong(cols[0]));
                }
            } catch (ParseException e) {
                e.printStackTrace();
            }

        }
        br.close();
        return jobids;
    }

}

class JobRescorer implements IDRescorer {
    final private Set jobids;

    public JobRescorer(Set jobs) {
        this.jobids = jobs;
    }

    @Override
    public double rescore(long id, double originalScore) {
        return isFiltered(id) ? Double.NaN : originalScore;
    }

    @Override
    public boolean isFiltered(long id) {
        return jobids.contains(id);
    }
}

控制台输出:只截取部分结果


...
itemLoglikelihood=>uid:965,(200,0.829600)(122,0.748417)(170,0.736340)
userCityBlock    =>uid:966,(114,0.250000)
itemLoglikelihood=>uid:966,(114,1.516898)(101,0.864536)(99,0.856057)
userCityBlock    =>uid:967,
itemLoglikelihood=>uid:967,(105,0.873100)(114,0.725016)(168,0.707119)
userCityBlock    =>uid:968,
itemLoglikelihood=>uid:968,(174,0.735004)(39,0.696716)(185,0.696171)
userCityBlock    =>uid:969,
itemLoglikelihood=>uid:969,(197,0.723203)(81,0.710230)(167,0.668358)
userCityBlock    =>uid:970,
itemLoglikelihood=>uid:970,(13,0.748417)(122,0.748417)(28,0.736340)
userCityBlock    =>uid:971,
itemLoglikelihood=>uid:971,(28,1.540753)(174,1.511881)(39,1.435575)
userCityBlock    =>uid:972,
itemLoglikelihood=>uid:972,(14,0.800605)(60,0.794088)(163,0.710230)
userCityBlock    =>uid:973,
itemLoglikelihood=>uid:973,(56,0.795529)(13,0.712680)(120,0.701026)
userCityBlock    =>uid:974,(19,0.200000)
itemLoglikelihood=>uid:974,(145,1.994049)(89,1.578694)(19,1.435193)
...

我们查看uid=994的用户推荐信息:
搜索pv.csv:


> pv[which(pv$userid==974),]
     userid jobid
2426    974   106
2427    974   173
2428    974    82
2429    974   188
2430    974    78

搜索job.csv:


> job[job$jobid %in% c(19,145,89),]
    jobid create_date salary
19     19  2013-05-23   5700
89     89  2013-06-15   8400
145   145  2013-08-02   6800

排除过期的职位比较,我们发现userCityBlock结果都是19,itemLoglikelihood的第2,3的结果被替换为了得分更低的89和19。

4).RecommenderFilterSalaryResult.java,排除工资过低的职位

我们查看uid=994的用户,浏览过的职位。


> job[job$jobid %in% c(106,173,82,188,78),]
    jobid create_date salary
78     78  2012-01-29   6800
82     82  2010-07-05   7500
106   106  2011-04-25   5200
173   173  2013-09-13   5200
188   188  2010-07-14   6000

平均工资为=6140,我们觉得用户的浏览职位的行为,一般不会看比自己现在工资低的职位,因此设计算法,排除工资低于平均工资80%的职位,即排除工资小于4912的推荐职位(6140*0.8=4912)

大家可以参考上文中RecommenderFilterOutdateResult.java,自行实现。

这样,我们就完成用Mahout构建职位推荐引擎的算法。如果没有Mahout,我们自己写这个算法引擎估计还要花个小半年的时间,善加利用开源技术会帮助我们飞一样的成长!!

原代码下载:
https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.8/src/main/java/org/conan/mymahout/recommendation/job

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-mahout-recommend-job/

打赏作者

从源代码剖析Mahout推荐引擎

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/mahout-recommend-engine/

mahout-recommender-engine

前言

Mahout框架中cf.taste包实现了推荐算法引擎,它提供了一套完整的推荐算法工具集,同时规范了数据结构,并标准化了程序开发过程。应用推荐算法时,代码也就7-8行,简单地有点像R了。为了使用简单的目标,Mahout推荐引擎必然要做到精巧的程序设计。

本文将介绍Mahout推荐引擎的程序设计。

目录

  1. Mahout推荐引擎概况
  2. 标准化的程序开发过程
  3. 数据模型
  4. 相似度算法工具集
  5. 近邻算法工具集
  6. 推荐算法工具集
  7. 创建自己的推荐引擎构造器

1. Mahout推荐引擎概况

Mahout的推荐引擎,要从org.apache.mahout.cf.taste包说起。

mahout-core-class

packages的说明:

  • common: 公共类包括,异常,数据刷新接口,权重常量
  • eval: 定义构造器接口,类似于工厂模式
  • model: 定义数据模型接口
  • neighborhood: 定义近邻算法的接口
  • recommender: 定义推荐算法的接口
  • similarity: 定义相似度算法的接口
  • transforms: 定义数据转换的接口
  • hadoop: 基于hadoop的分步式算法的实现类
  • impl: 单机内存算法实现类

从上面的package情况,我可以粗略地看出推荐引擎分为5个主要部分组成:数据模型,相似度算法,近邻算法,推荐算法,算法评分器。

从数据处理能力上,算法可以分为:单机内存算法,基于hadoop的分步式算法。

下面我们将基于单机内存算法,研究Mahout的推荐引擎的结构。

2. 标准化的程序开发过程

以UserCF的推荐算法为例,官方建议我们的开发过程:

mahout_recommendation-process

图片摘自Mahout in Action

从上图中我们可以看到,算法是被模块化的,通过1,2,3,4的过程进行方法调用。

程序代码:


public class UserCF {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws IOException, TasteException {
        String file = "datafile/item.csv";
        DataModel model = new FileDataModel(new File(file));
        UserSimilarity user = new EuclideanDistanceSimilarity(model);
        NearestNUserNeighborhood neighbor = new NearestNUserNeighborhood(NEIGHBORHOOD_NUM, user, model);
        Recommender r = new GenericUserBasedRecommender(model, neighbor, user);
        LongPrimitiveIterator iter = model.getUserIDs();

        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = r.recommend(uid, RECOMMENDER_NUM);
            System.out.printf("uid:%s", uid);
            for (RecommendedItem ritem : list) {
                System.out.printf("(%s,%f)", ritem.getItemID(), ritem.getValue());
            }
            System.out.println();
        }
    }
}

我们调用算法的程序,要用到4个对象:DataModel, UserSimilarity, NearestNUserNeighborhood, Recommender。

3. 数据模型

Mahout的推荐引擎的数据模型,以DataModel接口为父类。

mahout-datamodel

通过“策略模式”匹配不同的数据源,支持File, JDBC(MySQL, PostgreSQL), NoSQL(Cassandra, HBase, MongoDB)。

注:NoSQL的实现在mahout-integration-0.8.jar中。

数据格式支持2种:

  • GenericDataModel: 用户ID,物品ID,用户对物品的打分(UserID,ItemID,PreferenceValue)
  • GenericBooleanPrefDataModel: 用户ID,物品ID (UserID,ItemID),这种方式表达用户是否浏览过该物品,但并未对物品进行打分。

mahout-pref

4. 相似度算法工具集

相似度算法分为2种

  • 基于用户(UserCF)的相似度算法
  • 基于物品(ItemCF)的相似度算法

1). 基于用户(UserCF)的相似度算法

mahout-UserSimilarity

计算用户的相似矩阵,可以通过上图中几种算法。

2). 基于物品(ItemCF)的相似度算法

mahout-ItemSimilarity

计算物品的相似矩阵,可以通过上图中几种算法。

关于相似度距离的说明:

  • EuclideanDistanceSimilarity: 欧氏距离相似度

    image003

    原理:利用欧式距离d定义的相似度s,s=1 / (1+d)。

    范围:[0,1],值越大,说明d越小,也就是距离越近,则相似度越大。

    说明:同皮尔森相似度一样,该相似度也没有考虑重叠数对结果的影响,同样地,Mahout通过增加一个枚举类型(Weighting)的参数来使得重叠数也成为计算相似度的影响因子。

  • PearsonCorrelationSimilarity: 皮尔森相似度

    image004

    原理:用来反映两个变量线性相关程度的统计量

    范围:[-1,1],绝对值越大,说明相关性越强,负相关对于推荐的意义小。

    说明:1、 不考虑重叠的数量;2、 如果只有一项重叠,无法计算相似性(计算过程被除数有n-1);3、 如果重叠的值都相等,也无法计算相似性(标准差为0,做除数)。

    该相似度并不是最好的选择,也不是最坏的选择,只是因为其容易理解,在早期研究中经常被提起。使用Pearson线性相关系数必须假设数据是成对地从正态分布中取得的,并且数据至少在逻辑范畴内必须是等间距的数据。Mahout中,为皮尔森相关计算提供了一个扩展,通过增加一个枚举类型(Weighting)的参数来使得重叠数也成为计算相似度的影响因子。

  • UncenteredCosineSimilarity: 余弦相似度

    image005

    原理:多维空间两点与所设定的点形成夹角的余弦值。

    范围:[-1,1],值越大,说明夹角越大,两点相距就越远,相似度就越小。

    说明:在数学表达中,如果对两个项的属性进行了数据中心化,计算出来的余弦相似度和皮尔森相似度是一样的,在mahout中,实现了数据中心化的过程,所以皮尔森相似度值也是数据中心化后的余弦相似度。另外在新版本中,Mahout提供了UncenteredCosineSimilarity类作为计算非中心化数据的余弦相似度。

  • SpearmanCorrelationSimilarity: Spearman秩相关系数相似度

    原理:Spearman秩相关系数通常被认为是排列后的变量之间的Pearson线性相关系数。

    范围:{-1.0,1.0},当一致时为1.0,不一致时为-1.0。

    说明:计算非常慢,有大量排序。针对推荐系统中的数据集来讲,用Spearman秩相关系数作为相似度量是不合适的。

  • CityBlockSimilarity: 曼哈顿距离相似度

    原理:曼哈顿距离的实现,同欧式距离相似,都是用于多维数据空间距离的测度

    范围:[0,1],同欧式距离一致,值越小,说明距离值越大,相似度越大。

    说明:比欧式距离计算量少,性能相对高。

  • LogLikelihoodSimilarity: 对数似然相似度

    原理:重叠的个数,不重叠的个数,都没有的个数

    范围:具体可去百度文库中查找论文《Accurate Methods for the Statistics of Surprise and Coincidence》

    说明:处理无打分的偏好数据,比Tanimoto系数的计算方法更为智能。

  • TanimotoCoefficientSimilarity: Tanimoto系数相似度

    image006

    原理:又名广义Jaccard系数,是对Jaccard系数的扩展,等式为

    范围:[0,1],完全重叠时为1,无重叠项时为0,越接近1说明越相似。

    说明:处理无打分的偏好数据。

相似度算法介绍,摘自:http://www.cnblogs.com/dlts26/archive/2012/06/20/2555772.html

5. 近邻算法工具集

近邻算法只对于UserCF适用,通过近邻算法给相似的用户进行排序,选出前N个最相似的,作为最终推荐的参考的用户。

mahout-UserNeighborhood

近邻算法分为2种:

  • NearestNUserNeighborhood:指定N的个数,比如,选出前10最相似的用户。
  • ThresholdUserNeighborhood:指定比例,比如,选择前10%最相似的用户。

mahout-Neighborhood

6. 推荐算法工具集

推荐算法是以Recommender作为基础的父类,关于推荐算法的详细介绍,请参考文章:Mahout推荐算法API详解

mahout-Recommender

7. 创建自己的推荐引擎构造器

有了上面的知识,我就清楚地知道了Mahout推荐引擎的原理和使用,我们就可以写一个自己的构造器,通过“策略模式”实现,算法的组合。

新建文件:org.conan.mymahout.recommendation.job.RecommendFactory.java


public final class RecommendFactory {
...
}

1). 构造数据模型


    public static DataModel buildDataModel(String file) throws TasteException, IOException {
        return new FileDataModel(new File(file));
    }

    public static DataModel buildDataModelNoPref(String file) throws TasteException, IOException {
        return new GenericBooleanPrefDataModel(GenericBooleanPrefDataModel.toDataMap(new FileDataModel(new File(file))));
    }

    public static DataModelBuilder buildDataModelNoPrefBuilder() {
        return new DataModelBuilder() {
            @Override
            public DataModel buildDataModel(FastByIDMap trainingData) {
                return new GenericBooleanPrefDataModel(GenericBooleanPrefDataModel.toDataMap(trainingData));
            }
        };
    }

2). 构造相似度算法模型


public enum SIMILARITY {
        PEARSON, EUCLIDEAN, COSINE, TANIMOTO, LOGLIKELIHOOD, FARTHEST_NEIGHBOR_CLUSTER, NEAREST_NEIGHBOR_CLUSTER
    }

    public static UserSimilarity userSimilarity(SIMILARITY type, DataModel m) throws TasteException {
        switch (type) {
        case PEARSON:
            return new PearsonCorrelationSimilarity(m);
        case COSINE:
            return new UncenteredCosineSimilarity(m);
        case TANIMOTO:
            return new TanimotoCoefficientSimilarity(m);
        case LOGLIKELIHOOD:
            return new LogLikelihoodSimilarity(m);
        case EUCLIDEAN:
        default:
            return new EuclideanDistanceSimilarity(m);
        }
    }

    public static ItemSimilarity itemSimilarity(SIMILARITY type, DataModel m) throws TasteException {
        switch (type) {
        case LOGLIKELIHOOD:
            return new LogLikelihoodSimilarity(m);
        case TANIMOTO:
        default:
            return new TanimotoCoefficientSimilarity(m);
        }
    }

    public static ClusterSimilarity clusterSimilarity(SIMILARITY type, UserSimilarity us) throws TasteException {
        switch (type) {
        case NEAREST_NEIGHBOR_CLUSTER:
            return new NearestNeighborClusterSimilarity(us);
        case FARTHEST_NEIGHBOR_CLUSTER:
        default:
            return new FarthestNeighborClusterSimilarity(us);
        }
    }

3). 构造近邻算法模型


  public enum NEIGHBORHOOD {
        NEAREST, THRESHOLD
    }

    public static UserNeighborhood userNeighborhood(NEIGHBORHOOD type, UserSimilarity s, DataModel m, double num) throws TasteException {
        switch (type) {
        case NEAREST:
            return new NearestNUserNeighborhood((int) num, s, m);
        case THRESHOLD:
        default:
            return new ThresholdUserNeighborhood(num, s, m);
        }
    }

4). 构造推荐算法模型


 public enum RECOMMENDER {
        USER, ITEM
    }

    public static RecommenderBuilder userRecommender(final UserSimilarity us, final UserNeighborhood un, boolean pref) throws TasteException {
        return pref ? new RecommenderBuilder() {
            @Override
            public Recommender buildRecommender(DataModel model) throws TasteException {
                return new GenericUserBasedRecommender(model, un, us);
            }
        } : new RecommenderBuilder() {
            @Override
            public Recommender buildRecommender(DataModel model) throws TasteException {
                return new GenericBooleanPrefUserBasedRecommender(model, un, us);
            }
        };
    }

    public static RecommenderBuilder itemRecommender(final ItemSimilarity is, boolean pref) throws TasteException {
        return pref ? new RecommenderBuilder() {
            @Override
            public Recommender buildRecommender(DataModel model) throws TasteException {
                return new GenericItemBasedRecommender(model, is);
            }
        } : new RecommenderBuilder() {
            @Override
            public Recommender buildRecommender(DataModel model) throws TasteException {
                return new GenericBooleanPrefItemBasedRecommender(model, is);
            }
        };
    }

    public static RecommenderBuilder slopeOneRecommender() throws TasteException {
        return new RecommenderBuilder() {
            @Override
            public Recommender buildRecommender(DataModel dataModel) throws TasteException {
                return new SlopeOneRecommender(dataModel);
            }

        };
    }

    public static RecommenderBuilder itemKNNRecommender(final ItemSimilarity is, final Optimizer op, final int n) throws TasteException {
        return new RecommenderBuilder() {
            @Override
            public Recommender buildRecommender(DataModel dataModel) throws TasteException {
                return new KnnItemBasedRecommender(dataModel, is, op, n);
            }
        };
    }

    public static RecommenderBuilder svdRecommender(final Factorizer factorizer) throws TasteException {
        return new RecommenderBuilder() {
            @Override
            public Recommender buildRecommender(DataModel dataModel) throws TasteException {
                return new SVDRecommender(dataModel, factorizer);
            }
        };
    }

    public static RecommenderBuilder treeClusterRecommender(final ClusterSimilarity cs, final int n) throws TasteException {
        return new RecommenderBuilder() {
            @Override
            public Recommender buildRecommender(DataModel dataModel) throws TasteException {
                return new TreeClusteringRecommender(dataModel, cs, n);
            }
        };
    }

5). 构造算法评估模型


 public enum EVALUATOR {
        AVERAGE_ABSOLUTE_DIFFERENCE, RMS
    }

    public static RecommenderEvaluator buildEvaluator(EVALUATOR type) {
        switch (type) {
        case RMS:
            return new RMSRecommenderEvaluator();
        case AVERAGE_ABSOLUTE_DIFFERENCE:
        default:
            return new AverageAbsoluteDifferenceRecommenderEvaluator();
        }
    }

    public static void evaluate(EVALUATOR type, RecommenderBuilder rb, DataModelBuilder mb, DataModel dm, double trainPt) throws TasteException {
        System.out.printf("%s Evaluater Score:%s\n", type.toString(), buildEvaluator(type).evaluate(rb, mb, dm, trainPt, 1.0));
    }

    public static void evaluate(RecommenderEvaluator re, RecommenderBuilder rb, DataModelBuilder mb, DataModel dm, double trainPt) throws TasteException {
        System.out.printf("Evaluater Score:%s\n", re.evaluate(rb, mb, dm, trainPt, 1.0));
    }

    /**
     * statsEvaluator
     */
    public static void statsEvaluator(RecommenderBuilder rb, DataModelBuilder mb, DataModel m, int topn) throws TasteException {
        RecommenderIRStatsEvaluator evaluator = new GenericRecommenderIRStatsEvaluator();
        IRStatistics stats = evaluator.evaluate(rb, mb, m, null, topn, GenericRecommenderIRStatsEvaluator.CHOOSE_THRESHOLD, 1.0);
        // System.out.printf("Recommender IR Evaluator: %s\n", stats);
        System.out.printf("Recommender IR Evaluator: [Precision:%s,Recall:%s]\n", stats.getPrecision(), stats.getRecall());
    }

6). 推荐结果输出


    public static void showItems(long uid, List recommendations, boolean skip) {
        if (!skip || recommendations.size() > 0) {
            System.out.printf("uid:%s,", uid);
            for (RecommendedItem recommendation : recommendations) {
                System.out.printf("(%s,%f)", recommendation.getItemID(), recommendation.getValue());
            }
            System.out.println();
        }
    }

7). 完整源代码文件及使用样例:
https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.8/src/main/java/org/conan/mymahout/recommendation/job

转载请注明出处:
http://blog.fens.me/mahout-recommend-engine/

打赏作者

Mahout推荐算法API详解

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/mahout-recommendation-api

mahout-Recommendation

前言

用Mahout来构建推荐系统,是一件既简单又困难的事情。简单是因为Mahout完整地封装了“协同过滤”算法,并实现了并行化,提供非常简单的API接口;困难是因为我们不了解算法细节,很难去根据业务的场景进行算法配置和调优。

本文将深入算法API去解释Mahout推荐算法底层的一些事。

目录

  1. Mahout推荐算法介绍
  2. 算法评判标准:召回率与准确率
  3. Recommender.java的API接口
  4. 测试程序:RecommenderTest.java
  5. 基于用户的协同过滤算法UserCF
  6. 基于物品的协同过滤算法ItemCF
  7. SlopeOne算法
  8. KNN Linear interpolation item–based推荐算法
  9. SVD推荐算法
  10. Tree Cluster-based 推荐算法
  11. Mahout推荐算法总结

1. Mahout推荐算法介绍

Mahoutt推荐算法,从数据处理能力上,可以划分为2类:

  • 单机内存算法实现
  • 基于Hadoop的分步式算法实现

1). 单机内存算法实现

单机内存算法实现:就是在单机下运行的算法,是由cf.taste项目实现的,像我的们熟悉的UserCF,ItemCF都支持单机内存运行,并且参数可以灵活配置。单机算法的基本实例,请参考文章:用Maven构建Mahout项目

单机内存算法的问题在于,受限于单机的资源。对于中等规模的数据,像1G,10G的数据量,有能力进行计算,但是超过100G的数据量,对于单机来说是不可能完成的任务。

2). 基于Hadoop的分步式算法实现

基于Hadoop的分步式算法实现:就是把单机内存算法并行化,把任务分散到多台计算机一起运行。Mahout提供了ItemCF基于Hadoop并行化算法实现。基于Hadoop的分步式算法实现,请参考文章:
Mahout分步式程序开发 基于物品的协同过滤ItemCF

分步式并行算法的问题在于,如何让单机算法并行化。在单机算法中,我们只需要考虑算法,数据结构,内存,CPU就够了,但是分步式算法还要额外考虑很多的情况,比如多节点的数据合并,数据排序,网路通信的效率,节点宕机重算,数据分步式存储等等的很多问题。

2. 算法评判标准:召回率(recall)与查准率(precision)

Mahout提供了2个评估推荐器的指标,查准率和召回率(查全率),这两个指标是搜索引擎中经典的度量方法。

precision_recall


         相关 不相关
检索到     A    C
未检索到   B    D
  • A:检索到的,相关的 (搜到的也想要的)
  • B:未检索到的,但是相关的 (没搜到,然而实际上想要的)
  • C:检索到的,但是不相关的 (搜到的但没用的)
  • D:未检索到的,也不相关的 (没搜到也没用的)

被检索到的越多越好,这是追求“查全率”,即A/(A+B),越大越好。
被检索到的,越相关的越多越好,不相关的越少越好,这是追求“查准率”,即A/(A+C),越大越好。

在大规模数据集合中,这两个指标是相互制约的。当希望索引出更多的数据的时候,查准率就会下降,当希望索引更准确的时候,会索引更少的数据。

3. Recommender的API接口

1). 系统环境:

  • Win7 64bit
  • Java 1.6.0_45
  • Maven 3
  • Eclipse Juno Service Release 2
  • Mahout 0.8
  • Hadoop 1.1.2

2). Recommender接口文件:
org.apache.mahout.cf.taste.recommender.Recommender.java

mahout-Recommender-class

接口中方法的解释:

  • recommend(long userID, int howMany): 获得推荐结果,给userID推荐howMany个Item
  • recommend(long userID, int howMany, IDRescorer rescorer): 获得推荐结果,给userID推荐howMany个Item,可以根据rescorer对结构重新排序。
  • estimatePreference(long userID, long itemID): 当打分为空,估计用户对物品的打分
  • setPreference(long userID, long itemID, float value): 赋值用户,物品,打分
  • removePreference(long userID, long itemID): 删除用户对物品的打分
  • getDataModel(): 提取推荐数据

通过Recommender接口,我可以猜出核心算法,应该会在子类的estimatePreference()方法中进行实现。

3). 通过继承关系到Recommender接口的子类:

mahout-Recommender-hierarchy

推荐算法实现类:

  • GenericUserBasedRecommender: 基于用户的推荐算法
  • GenericItemBasedRecommender: 基于物品的推荐算法
  • KnnItemBasedRecommender: 基于物品的KNN推荐算法
  • SlopeOneRecommender: Slope推荐算法
  • SVDRecommender: SVD推荐算法
  • TreeClusteringRecommender:TreeCluster推荐算法

下面将分别介绍每种算法的实现。

4. 测试程序:RecommenderTest.java

测试数据集:item.csv


1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

测试程序:org.conan.mymahout.recommendation.job.RecommenderTest.java


package org.conan.mymahout.recommendation.job;

import java.io.IOException;
import java.util.List;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.common.RandomUtils;

public class RecommenderTest {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        RandomUtils.useTestSeed();
        String file = "datafile/item.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        slopeOne(dataModel);
    }

    public static void userCF(DataModel dataModel) throws TasteException{}
    public static void itemCF(DataModel dataModel) throws TasteException{}
    public static void slopeOne(DataModel dataModel) throws TasteException{}

    ...

每种算法都一个单独的方法进行算法测试,如userCF(),itemCF(),slopeOne()….

5. 基于用户的协同过滤算法UserCF

基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲就是:给用户推荐和他兴趣相似的其他用户喜欢的物品。

举例说明:

image015

基于用户的 CF 的基本思想相当简单,基于用户对物品的偏好找到相邻邻居用户,然后将邻居用户喜欢的推荐给当前用户。计算上,就是将一个用户对所有物品的偏好作为一个向量来计算用户之间的相似度,找到 K 邻居后,根据邻居的相似度权重以及他们对物品的偏好,预测当前用户没有偏好的未涉及物品,计算得到一个排序的物品列表作为推荐。图 2 给出了一个例子,对于用户 A,根据用户的历史偏好,这里只计算得到一个邻居 – 用户 C,然后将用户 C 喜欢的物品 D 推荐给用户 A。

上文中图片和解释文字,摘自: https://www.ibm.com/developerworks/cn/web/1103_zhaoct_recommstudy2/

算法API: org.apache.mahout.cf.taste.impl.recommender.GenericUserBasedRecommender


  @Override
  public float estimatePreference(long userID, long itemID) throws TasteException {
    DataModel model = getDataModel();
    Float actualPref = model.getPreferenceValue(userID, itemID);
    if (actualPref != null) {
      return actualPref;
    }
    long[] theNeighborhood = neighborhood.getUserNeighborhood(userID);
    return doEstimatePreference(userID, theNeighborhood, itemID);
  }

 protected float doEstimatePreference(long theUserID, long[] theNeighborhood, long itemID) throws TasteException {
    if (theNeighborhood.length == 0) {
      return Float.NaN;
    }
    DataModel dataModel = getDataModel();
    double preference = 0.0;
    double totalSimilarity = 0.0;
    int count = 0;
    for (long userID : theNeighborhood) {
      if (userID != theUserID) {
        // See GenericItemBasedRecommender.doEstimatePreference() too
        Float pref = dataModel.getPreferenceValue(userID, itemID);
        if (pref != null) {
          double theSimilarity = similarity.userSimilarity(theUserID, userID);
          if (!Double.isNaN(theSimilarity)) {
            preference += theSimilarity * pref;
            totalSimilarity += theSimilarity;
            count++;
          }
        }
      }
    }
    // Throw out the estimate if it was based on no data points, of course, but also if based on
    // just one. This is a bit of a band-aid on the 'stock' item-based algorithm for the moment.
    // The reason is that in this case the estimate is, simply, the user's rating for one item
    // that happened to have a defined similarity. The similarity score doesn't matter, and that
    // seems like a bad situation.
    if (count <= 1) {
      return Float.NaN;
    }
    float estimate = (float) (preference / totalSimilarity);
    if (capper != null) {
      estimate = capper.capEstimate(estimate);
    }
    return estimate;
  }

测试程序:


    public static void userCF(DataModel dataModel) throws TasteException {
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:1.0
Recommender IR Evaluator: [Precision:0.5,Recall:0.5]
uid:1,(104,4.333333)(106,4.000000)
uid:2,(105,4.049678)
uid:3,(103,3.512787)(102,2.747869)
uid:4,(102,3.000000)

用R语言重写UserCF的实现,请参考文章:用R解析Mahout用户推荐协同过滤算法(UserCF)

6. 基于物品的协同过滤算法ItemCF

基于item的协同过滤,通过用户对不同item的评分来评测item之间的相似性,基于item之间的相似性做出推荐。简单来讲就是:给用户推荐和他之前喜欢的物品相似的物品。

举例说明:

image017

基于物品的 CF 的原理和基于用户的 CF 类似,只是在计算邻居时采用物品本身,而不是从用户的角度,即基于用户对物品的偏好找到相似的物品,然后根据用户的历史偏好,推荐相似的物品给他。从计算的角度看,就是将所有用户对某个物品的偏好作为一个向量来计算物品之间的相似度,得到物品的相似物品后,根据用户历史的偏好预测当前用户还没有表示偏好的物品,计算得到一个排序的物品列表作为推荐。图 3 给出了一个例子,对于物品 A,根据所有用户的历史偏好,喜欢物品 A 的用户都喜欢物品 C,得出物品 A 和物品 C 比较相似,而用户 C 喜欢物品 A,那么可以推断出用户 C 可能也喜欢物品 C。

上文中图片和解释文字,摘自: https://www.ibm.com/developerworks/cn/web/1103_zhaoct_recommstudy2/

算法API: org.apache.mahout.cf.taste.impl.recommender.GenericItemBasedRecommender


  @Override
  public float estimatePreference(long userID, long itemID) throws TasteException {
    PreferenceArray preferencesFromUser = getDataModel().getPreferencesFromUser(userID);
    Float actualPref = getPreferenceForItem(preferencesFromUser, itemID);
    if (actualPref != null) {
      return actualPref;
    }
    return doEstimatePreference(userID, preferencesFromUser, itemID);
  }

protected float doEstimatePreference(long userID, PreferenceArray preferencesFromUser, long itemID)
    throws TasteException {
    double preference = 0.0;
    double totalSimilarity = 0.0;
    int count = 0;
    double[] similarities = similarity.itemSimilarities(itemID, preferencesFromUser.getIDs());
    for (int i = 0; i < similarities.length; i++) {
      double theSimilarity = similarities[i];
      if (!Double.isNaN(theSimilarity)) {
        // Weights can be negative!
        preference += theSimilarity * preferencesFromUser.getValue(i);
        totalSimilarity += theSimilarity;
        count++;
      }
    }
    // Throw out the estimate if it was based on no data points, of course, but also if based on
    // just one. This is a bit of a band-aid on the 'stock' item-based algorithm for the moment.
    // The reason is that in this case the estimate is, simply, the user's rating for one item
    // that happened to have a defined similarity. The similarity score doesn't matter, and that
    // seems like a bad situation.
    if (count <= 1) {
      return Float.NaN;
    }
    float estimate = (float) (preference / totalSimilarity);
    if (capper != null) {
      estimate = capper.capEstimate(estimate);
    }
    return estimate;
  }

测试程序:


    public static void itemCF(DataModel dataModel) throws TasteException {
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.8676552772521973
Recommender IR Evaluator: [Precision:0.5,Recall:1.0]
uid:1,(105,3.823529)(104,3.722222)(106,3.478261)
uid:2,(106,2.984848)(105,2.537037)(107,2.000000)
uid:3,(106,3.648649)(102,3.380000)(103,3.312500)
uid:4,(107,4.722222)(105,4.313953)(102,4.025000)
uid:5,(107,3.736842)

7. SlopeOne算法

这个算法在mahout-0.8版本中,已经被@Deprecated。

SlopeOne是一种简单高效的协同过滤算法。通过均差计算进行评分。SlopeOne论文下载(PDF)

1). 举例说明:
用户X,Y,Z,对于物品A,B进行打分,如下表,求Z对B的打分是多少?

slopeone

Slope one算法认为:平均值可以代替某两个未知个体之间的打分差异,事物A对事物B的平均差是:((5 - 4) + (4 - 2)) / 2 = 1.5,就得到Z对B的打分是,3-1.5 = 1.5。

Slope one算法将用户的评分之间的关系看作简单的线性关系:

Y = mX + b

2). 平均加权计算:
用户X,Y,Z,对于物品A,B,C进行打分,如下表,求Z对A的打分是多少?

slopeone2

  • 1. 计算A和B的平均差, ((5-3)+(3-4))/2=0.5
  • 2. 计算A和C的平均差, (5-2)/1=3
  • 3. Z对A的评分,通过AB得到, 2+0.5=2.5
  • 4. Z对A的评分,通过AC得到,5+3=8
  • 5. 通过加权平均计算Z对A的评分:A和B都有评价的用户数为2,A和C都有评价的用户数为1,权重为别是2和1, (2*2.5+1*8)/(2+1)=13/3=4.33

通过这种简单的方式,我们可以快速计算出一个评分项,完成推荐过程!

算法API: org.apache.mahout.cf.taste.impl.recommender.slopeone.SlopeOneRecommender


@Override
  public float estimatePreference(long userID, long itemID) throws TasteException {
    DataModel model = getDataModel();
    Float actualPref = model.getPreferenceValue(userID, itemID);
    if (actualPref != null) {
      return actualPref;
    }
    return doEstimatePreference(userID, itemID);
  }
  
  private float doEstimatePreference(long userID, long itemID) throws TasteException {
    double count = 0.0;
    double totalPreference = 0.0;
    PreferenceArray prefs = getDataModel().getPreferencesFromUser(userID);
    RunningAverage[] averages = diffStorage.getDiffs(userID, itemID, prefs);
    int size = prefs.length();
    for (int i = 0; i < size; i++) {
      RunningAverage averageDiff = averages[i];
      if (averageDiff != null) {
        double averageDiffValue = averageDiff.getAverage();
        if (weighted) {
          double weight = averageDiff.getCount();
          if (stdDevWeighted) {
            double stdev = ((RunningAverageAndStdDev) averageDiff).getStandardDeviation();
            if (!Double.isNaN(stdev)) {
              weight /= 1.0 + stdev;
            }
            // If stdev is NaN, then it is because count is 1. Because we're weighting by count,
            // the weight is already relatively low. We effectively assume stdev is 0.0 here and
            // that is reasonable enough. Otherwise, dividing by NaN would yield a weight of NaN
            // and disqualify this pref entirely
            // (Thanks Daemmon)
          }
          totalPreference += weight * (prefs.getValue(i) + averageDiffValue);
          count += weight;
        } else {
          totalPreference += prefs.getValue(i) + averageDiffValue;
          count += 1.0;
        }
      }
    }
    if (count <= 0.0) {
      RunningAverage itemAverage = diffStorage.getAverageItemPref(itemID);
      return itemAverage == null ? Float.NaN : (float) itemAverage.getAverage();
    } else {
      return (float) (totalPreference / count);
    }
  }

测试程序:


    public static void slopeOne(DataModel dataModel) throws TasteException {
        RecommenderBuilder recommenderBuilder = RecommendFactory.slopeOneRecommender();

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:1.3333333333333333
Recommender IR Evaluator: [Precision:0.25,Recall:0.5]
uid:1,(105,5.750000)(104,5.250000)(106,4.500000)
uid:2,(105,2.286115)(106,1.500000)
uid:3,(106,2.000000)(102,1.666667)(103,1.625000)
uid:4,(105,4.976859)(102,3.509071)

8. KNN Linear interpolation item–based推荐算法

这个算法在mahout-0.8版本中,已经被@Deprecated。

算法来自论文:
This algorithm is based in the paper of Robert M. Bell and Yehuda Koren in ICDM '07.

(TODO未完)

算法API: org.apache.mahout.cf.taste.impl.recommender.knn.KnnItemBasedRecommender


@Override
  protected float doEstimatePreference(long theUserID, PreferenceArray preferencesFromUser, long itemID)
    throws TasteException {
    
    DataModel dataModel = getDataModel();
    int size = preferencesFromUser.length();
    FastIDSet possibleItemIDs = new FastIDSet(size);
    for (int i = 0; i < size; i++) {
      possibleItemIDs.add(preferencesFromUser.getItemID(i));
    }
    possibleItemIDs.remove(itemID);
    
    List mostSimilar = mostSimilarItems(itemID, possibleItemIDs.iterator(),
      neighborhoodSize, null);
    long[] theNeighborhood = new long[mostSimilar.size() + 1];
    theNeighborhood[0] = -1;
  
    List usersRatedNeighborhood = Lists.newArrayList();
    int nOffset = 0;
    for (RecommendedItem rec : mostSimilar) {
      theNeighborhood[nOffset++] = rec.getItemID();
    }
    
    if (!mostSimilar.isEmpty()) {
      theNeighborhood[mostSimilar.size()] = itemID;
      for (int i = 0; i < theNeighborhood.length; i++) {
        PreferenceArray usersNeighborhood = dataModel.getPreferencesForItem(theNeighborhood[i]);
        int size1 = usersRatedNeighborhood.isEmpty() ? usersNeighborhood.length() : usersRatedNeighborhood.size();
        for (int j = 0; j < size1; j++) {
          if (i == 0) {
            usersRatedNeighborhood.add(usersNeighborhood.getUserID(j));
          } else {
            if (j >= usersRatedNeighborhood.size()) {
              break;
            }
            long index = usersRatedNeighborhood.get(j);
            if (!usersNeighborhood.hasPrefWithUserID(index) || index == theUserID) {
              usersRatedNeighborhood.remove(index);
              j--;
            }
          }
        }
      }
    }

    double[] weights = null;
    if (!mostSimilar.isEmpty()) {
      weights = getInterpolations(itemID, theNeighborhood, usersRatedNeighborhood);
    }
    
    int i = 0;
    double preference = 0.0;
    double totalSimilarity = 0.0;
    for (long jitem : theNeighborhood) {
      
      Float pref = dataModel.getPreferenceValue(theUserID, jitem);
      
      if (pref != null) {
        double weight = weights[i];
        preference += pref * weight;
        totalSimilarity += weight;
      }
      i++;
      
    }
    return totalSimilarity == 0.0 ? Float.NaN : (float) (preference / totalSimilarity);
  }
  
}

测试程序:


    public static void itemKNN(DataModel dataModel) throws TasteException {
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemKNNRecommender(itemSimilarity, new NonNegativeQuadraticOptimizer(), 10);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:1.5
Recommender IR Evaluator: [Precision:0.5,Recall:1.0]
uid:1,(107,5.000000)(104,3.501168)(106,3.498198)
uid:2,(105,2.878995)(106,2.878086)(107,2.000000)
uid:3,(103,3.667444)(102,3.667161)(106,3.667019)
uid:4,(107,4.750247)(102,4.122755)(105,4.122709)
uid:5,(107,3.833621)

9. SVD推荐算法

(TODO未完)

算法API: org.apache.mahout.cf.taste.impl.recommender.svd.SVDRecommender


@Override
  public float estimatePreference(long userID, long itemID) throws TasteException {
    double[] userFeatures = factorization.getUserFeatures(userID);
    double[] itemFeatures = factorization.getItemFeatures(itemID);
    double estimate = 0;
    for (int feature = 0; feature < userFeatures.length; feature++) {
      estimate += userFeatures[feature] * itemFeatures[feature];
    }
    return (float) estimate;
  }

测试程序:


    public static void svd(DataModel dataModel) throws TasteException {
        RecommenderBuilder recommenderBuilder = RecommendFactory.svdRecommender(new ALSWRFactorizer(dataModel, 10, 0.05, 10));

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.09990564982096355
Recommender IR Evaluator: [Precision:0.5,Recall:1.0]
uid:1,(104,4.032909)(105,3.390885)(107,1.858541)
uid:2,(105,3.761718)(106,2.951908)(107,1.561116)
uid:3,(103,5.593422)(102,2.458930)(106,-0.091259)
uid:4,(105,4.068329)(102,3.534025)(107,0.206257)
uid:5,(107,0.105169)

10. Tree Cluster-based 推荐算法

这个算法在mahout-0.8版本中,已经被@Deprecated。

(TODO未完)

算法API: org.apache.mahout.cf.taste.impl.recommender.TreeClusteringRecommender


  @Override
  public float estimatePreference(long userID, long itemID) throws TasteException {
    DataModel model = getDataModel();
    Float actualPref = model.getPreferenceValue(userID, itemID);
    if (actualPref != null) {
      return actualPref;
    }
    buildClusters();
    List topRecsForUser = topRecsByUserID.get(userID);
    if (topRecsForUser != null) {
      for (RecommendedItem item : topRecsForUser) {
        if (itemID == item.getItemID()) {
          return item.getValue();
        }
      }
    }
    // Hmm, we have no idea. The item is not in the user's cluster
    return Float.NaN;
  }

测试程序:


    public static void treeCluster(DataModel dataModel) throws TasteException {
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        ClusterSimilarity clusterSimilarity = RecommendFactory.clusterSimilarity(RecommendFactory.SIMILARITY.FARTHEST_NEIGHBOR_CLUSTER, userSimilarity);
        RecommenderBuilder recommenderBuilder = RecommendFactory.treeClusterRecommender(clusterSimilarity, 10);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:NaN
Recommender IR Evaluator: [Precision:NaN,Recall:0.0]

11. Mahout推荐算法总结

算法及适用场景:

recommender-intro

算法评分的结果:

recommender-score

通过对上面几种算法的一平分比较:itemCF,itemKNN,SVD的Rrecision,Recall的评分值是最好的,并且itemCF和SVD的AVERAGE_ABSOLUTE_DIFFERENCE是最低的,所以,从算法的角度知道了,哪个算法是更准确的或者会索引到更多的数据集。

另外的一些因素:

  • 1. 这3个指标,并不能直接决定计算结果一定itemCF,SVD好
  • 2. 各种算法的参数我们并没有调优
  • 3. 数据量和数据分布,是影响算法的评分

程序源代码下载

https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.8/src/main/java/org/conan/mymahout/recommendation/job

转载请注明出处:
http://blog.fens.me/mahout-recommendation-api

打赏作者

Mahout分步式程序开发 聚类Kmeans

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-kmeans/

mahout-kmeans

前言

Mahout是基于Hadoop用于机器学习的程序开发框架,Mahout封装了3大类的机器学习算法,其中包括聚类算法。kmeans是我们经常会提到用到的聚类算法之一,特别处理未知数据集的时,都会先聚类一下,看看数据集会有一些什么样的规则。

本文主要讲解,基于Mahout程序开发,实现分步式的kmeans算法。

目录

  1. 聚类算法kmeans
  2. Mahout开发环境介绍
  3. 用Mahout实现聚类算法kmeans
  4. 用R语言可视化结果
  5. 模板项目上传github

1. 聚类算法kmeans

聚类分析是数据挖掘及机器学习领域内的重点问题之一,在数据挖掘、模式识别、决策支持、机器学习及图像分割等领域有广泛的应用,是最重要的数据分析方法之一。聚类是在给定的数据集合中寻找同类的数据子集合,每一个子集合形成一个类簇,同类簇中的数据具有更大的相似性。聚类算法大体上可分为基于划分的方法、基于层次的方法、基于密度的方法、基于网格的方法以及基于模型的方法。

k-means algorithm算法是一种得到最广泛使用的基于划分的聚类算法,把n个对象分为k个簇,以使簇内具有较高的相似度。相似度的计算根据一个簇中对象的平均值来进行。它与处理混合正态分布的最大期望算法很相似,因为他们都试图找到数据中自然聚类的中心。

算法首先随机地选择k个对象,每个对象初始地代表了一个簇的平均值或中心。对剩余的每个对象根据其与各个簇中心的距离,将它赋给最近的簇,然后重新计算每个簇的平均值。这个过程不断重复,直到准则函数收敛。

kmeans介绍摘自:http://zh.wikipedia.org/wiki/K平均算法

2. Mahout开发环境介绍

接上一篇文章:Mahout分步式程序开发 基于物品的协同过滤ItemCF

所有环境变量 和 系统配置 与上文一致!

3. 用Mahout实现聚类算法kmeans

实现步骤:

  • 1. 准备数据文件: randomData.csv
  • 2. Java程序:KmeansHadoop.java
  • 3. 运行程序
  • 4. 聚类结果解读
  • 5. HDFS产生的目录

1). 准备数据文件: randomData.csv
数据文件randomData.csv,由R语言通过“随机正太分布函数”程序生成,单机内存实验请参考文章:
用Maven构建Mahout项目

原始数据文件:这里只截取了一部分数据。


~ vi datafile/randomData.csv

-0.883033363823402 -3.31967192630249
-2.39312626419456 3.34726861118871
2.66976353341256 1.85144276077058
-1.09922906899594 -6.06261735207489
-4.36361936997216 1.90509905380532
-0.00351835125495037 -0.610105996559153
-2.9962958796338 -3.60959839525735
-3.27529418132066 0.0230099799641799
2.17665594420569 6.77290756817957
-2.47862038335637 2.53431833167278
5.53654901906814 2.65089785582474
5.66257474538338 6.86783609641077
-0.558946883114376 1.22332819416237
5.11728525486132 3.74663871584768
1.91240516693351 2.95874731384062
-2.49747101306535 2.05006504756875
3.98781883213459 1.00780938946366
5.47470532716682 5.35084411045171

注:由于Mahout中kmeans算法,默认的分融符是” “(空格),因些我把逗号分隔的数据文件,改成以空格分隔。

2). Java程序:KmeansHadoop.java

kmeans的算法实现,请查看Mahout in Action。

mahout-kmeans-process


package org.conan.mymahout.cluster08;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.mahout.clustering.conversion.InputDriver;
import org.apache.mahout.clustering.kmeans.KMeansDriver;
import org.apache.mahout.clustering.kmeans.RandomSeedGenerator;
import org.apache.mahout.common.distance.DistanceMeasure;
import org.apache.mahout.common.distance.EuclideanDistanceMeasure;
import org.apache.mahout.utils.clustering.ClusterDumper;
import org.conan.mymahout.hdfs.HdfsDAO;
import org.conan.mymahout.recommendation.ItemCFHadoop;

public class KmeansHadoop {
    private static final String HDFS = "hdfs://192.168.1.210:9000";

    public static void main(String[] args) throws Exception {
        String localFile = "datafile/randomData.csv";
        String inPath = HDFS + "/user/hdfs/mix_data";
        String seqFile = inPath + "/seqfile";
        String seeds = inPath + "/seeds";
        String outPath = inPath + "/result/";
        String clusteredPoints = outPath + "/clusteredPoints";

        JobConf conf = config();
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(inPath);
        hdfs.mkdirs(inPath);
        hdfs.copyFile(localFile, inPath);
        hdfs.ls(inPath);

        InputDriver.runJob(new Path(inPath), new Path(seqFile), "org.apache.mahout.math.RandomAccessSparseVector");

        int k = 3;
        Path seqFilePath = new Path(seqFile);
        Path clustersSeeds = new Path(seeds);
        DistanceMeasure measure = new EuclideanDistanceMeasure();
        clustersSeeds = RandomSeedGenerator.buildRandom(conf, seqFilePath, clustersSeeds, k, measure);
        KMeansDriver.run(conf, seqFilePath, clustersSeeds, new Path(outPath), measure, 0.01, 10, true, 0.01, false);

        Path outGlobPath = new Path(outPath, "clusters-*-final");
        Path clusteredPointsPath = new Path(clusteredPoints);
        System.out.printf("Dumping out clusters from clusters: %s and clusteredPoints: %s\n", outGlobPath, clusteredPointsPath);

        ClusterDumper clusterDumper = new ClusterDumper(outGlobPath, clusteredPointsPath);
        clusterDumper.printClusters(null);
    }
    
    public static JobConf config() {
        JobConf conf = new JobConf(ItemCFHadoop.class);
        conf.setJobName("ItemCFHadoop");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

}

3). 运行程序
控制台输出:


Delete: hdfs://192.168.1.210:9000/user/hdfs/mix_data
Create: hdfs://192.168.1.210:9000/user/hdfs/mix_data
copy from: datafile/randomData.csv to hdfs://192.168.1.210:9000/user/hdfs/mix_data
ls: hdfs://192.168.1.210:9000/user/hdfs/mix_data
==========================================================
name: hdfs://192.168.1.210:9000/user/hdfs/mix_data/randomData.csv, folder: false, size: 36655
==========================================================
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2013-10-14 15:39:31 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-10-14 15:39:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:31 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:31 org.apache.hadoop.io.compress.snappy.LoadSnappy 
警告: Snappy native library not loaded
2013-10-14 15:39:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2013-10-14 15:39:31 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:31 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:31 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:31 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_m_000000_0 is allowed to commit now
2013-10-14 15:39:31 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_m_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/seqfile
2013-10-14 15:39:31 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:31 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2013-10-14 15:39:32 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2013-10-14 15:39:32 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息: Counters: 11
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=31390
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=36655
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=475910
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=36655
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=506350
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=68045
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=0
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=188284928
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=124
2013-10-14 15:39:32 org.apache.hadoop.mapred.Counters log
信息:     Map output records=1000
2013-10-14 15:39:32 org.apache.hadoop.io.compress.CodecPool getCompressor
信息: Got brand-new compressor
2013-10-14 15:39:32 org.apache.hadoop.io.compress.CodecPool getDecompressor
信息: Got brand-new decompressor
2013-10-14 15:39:32 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:32 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:32 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0002
2013-10-14 15:39:32 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:32 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 15:39:32 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 15:39:32 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 15:39:33 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 15:39:33 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 15:39:33 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0002_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:33 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:33 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0002_m_000000_0' done.
2013-10-14 15:39:33 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:33 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:33 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 15:39:33 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 623 bytes
2013-10-14 15:39:33 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:33 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0002_r_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:33 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:33 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0002_r_000000_0 is allowed to commit now
2013-10-14 15:39:33 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0002_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-1
2013-10-14 15:39:33 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 15:39:33 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0002_r_000000_0' done.
2013-10-14 15:39:33 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 15:39:33 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0002
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=695
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=4239303
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=203963
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=4457168
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=140321
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=627
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=6
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=612
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=376569856
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=3
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=3
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=3
2013-10-14 15:39:33 org.apache.hadoop.mapred.Counters log
信息:     Map output records=3
2013-10-14 15:39:34 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:34 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:34 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0003
2013-10-14 15:39:34 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 15:39:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 15:39:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 15:39:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 15:39:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 15:39:34 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:34 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0003_m_000000_0' done.
2013-10-14 15:39:34 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:34 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 15:39:34 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 677 bytes
2013-10-14 15:39:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:34 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0003_r_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:34 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0003_r_000000_0 is allowed to commit now
2013-10-14 15:39:34 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0003_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-2
2013-10-14 15:39:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 15:39:34 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0003_r_000000_0' done.
2013-10-14 15:39:35 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 15:39:35 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0003
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=695
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=7527467
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=271193
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=7901744
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=142099
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=681
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=6
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=666
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=575930368
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=3
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=3
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=3
2013-10-14 15:39:35 org.apache.hadoop.mapred.Counters log
信息:     Map output records=3
2013-10-14 15:39:35 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:35 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:35 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0004
2013-10-14 15:39:35 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:35 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 15:39:35 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 15:39:35 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 15:39:35 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 15:39:35 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 15:39:35 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0004_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:35 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:35 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0004_m_000000_0' done.
2013-10-14 15:39:35 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:35 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:35 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 15:39:35 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 677 bytes
2013-10-14 15:39:35 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:35 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0004_r_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:35 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:35 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0004_r_000000_0 is allowed to commit now
2013-10-14 15:39:35 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0004_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-3
2013-10-14 15:39:35 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 15:39:35 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0004_r_000000_0' done.
2013-10-14 15:39:36 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 15:39:36 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0004
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=695
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=10815685
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=338143
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=11346320
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=143877
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=681
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=6
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=666
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=775290880
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=3
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=3
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=3
2013-10-14 15:39:36 org.apache.hadoop.mapred.Counters log
信息:     Map output records=3
2013-10-14 15:39:36 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:36 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:36 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0005
2013-10-14 15:39:36 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 15:39:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 15:39:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 15:39:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 15:39:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 15:39:36 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0005_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:36 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0005_m_000000_0' done.
2013-10-14 15:39:36 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:36 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 15:39:36 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 677 bytes
2013-10-14 15:39:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:36 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0005_r_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:36 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0005_r_000000_0 is allowed to commit now
2013-10-14 15:39:36 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0005_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-4
2013-10-14 15:39:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 15:39:36 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0005_r_000000_0' done.
2013-10-14 15:39:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 15:39:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0005
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=695
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=14103903
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=405093
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=14790888
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=145655
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=681
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=6
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=666
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=974651392
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=3
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=3
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=3
2013-10-14 15:39:37 org.apache.hadoop.mapred.Counters log
信息:     Map output records=3
2013-10-14 15:39:37 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:37 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0006
2013-10-14 15:39:37 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 15:39:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 15:39:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 15:39:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 15:39:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 15:39:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0006_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0006_m_000000_0' done.
2013-10-14 15:39:37 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 15:39:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 677 bytes
2013-10-14 15:39:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0006_r_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:37 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0006_r_000000_0 is allowed to commit now
2013-10-14 15:39:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0006_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-5
2013-10-14 15:39:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 15:39:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0006_r_000000_0' done.
2013-10-14 15:39:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 15:39:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0006
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=695
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=17392121
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=472043
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=18235456
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=147433
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=681
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=6
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=666
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1174011904
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=3
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=3
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=3
2013-10-14 15:39:38 org.apache.hadoop.mapred.Counters log
信息:     Map output records=3
2013-10-14 15:39:38 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:38 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0007
2013-10-14 15:39:38 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 15:39:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 15:39:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 15:39:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 15:39:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 15:39:38 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:38 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_m_000000_0' done.
2013-10-14 15:39:38 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:38 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 15:39:38 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 677 bytes
2013-10-14 15:39:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:38 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_r_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:38 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0007_r_000000_0 is allowed to commit now
2013-10-14 15:39:38 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0007_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-6
2013-10-14 15:39:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 15:39:38 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_r_000000_0' done.
2013-10-14 15:39:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 15:39:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0007
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=695
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=20680339
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=538993
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=21680040
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=149211
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=681
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=6
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=666
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1373372416
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=3
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=3
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=3
2013-10-14 15:39:39 org.apache.hadoop.mapred.Counters log
信息:     Map output records=3
2013-10-14 15:39:39 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:39 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0008
2013-10-14 15:39:39 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 15:39:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 15:39:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 15:39:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 15:39:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 15:39:40 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0008_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0008_m_000000_0' done.
2013-10-14 15:39:40 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:40 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 15:39:40 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 677 bytes
2013-10-14 15:39:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:40 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0008_r_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:40 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0008_r_000000_0 is allowed to commit now
2013-10-14 15:39:40 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0008_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-7
2013-10-14 15:39:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 15:39:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0008_r_000000_0' done.
2013-10-14 15:39:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 15:39:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0008
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=695
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=23968557
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=605943
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=25124624
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=150989
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=681
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=6
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=666
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1572732928
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=3
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=3
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=3
2013-10-14 15:39:40 org.apache.hadoop.mapred.Counters log
信息:     Map output records=3
2013-10-14 15:39:41 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:41 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0009
2013-10-14 15:39:41 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 15:39:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 15:39:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 15:39:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 15:39:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 15:39:41 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0009_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:41 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0009_m_000000_0' done.
2013-10-14 15:39:41 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:41 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 15:39:41 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 677 bytes
2013-10-14 15:39:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:41 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0009_r_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:41 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0009_r_000000_0 is allowed to commit now
2013-10-14 15:39:41 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0009_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-8
2013-10-14 15:39:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 15:39:41 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0009_r_000000_0' done.
2013-10-14 15:39:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 15:39:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0009
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=695
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=27256775
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=673669
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=28569192
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=152767
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=681
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=6
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=666
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1772093440
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=3
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=3
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=3
2013-10-14 15:39:42 org.apache.hadoop.mapred.Counters log
信息:     Map output records=3
2013-10-14 15:39:42 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:42 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0010
2013-10-14 15:39:42 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 15:39:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 15:39:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 15:39:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 15:39:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 15:39:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0010_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0010_m_000000_0' done.
2013-10-14 15:39:42 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:42 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 15:39:42 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 677 bytes
2013-10-14 15:39:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0010_r_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:42 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0010_r_000000_0 is allowed to commit now
2013-10-14 15:39:42 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0010_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-9
2013-10-14 15:39:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 15:39:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0010_r_000000_0' done.
2013-10-14 15:39:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 15:39:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0010
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=695
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=30544993
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=741007
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=32013760
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=154545
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=681
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=6
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=666
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1966735360
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=3
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=3
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=3
2013-10-14 15:39:43 org.apache.hadoop.mapred.Counters log
信息:     Map output records=3
2013-10-14 15:39:43 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:43 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0011
2013-10-14 15:39:43 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 15:39:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 15:39:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 15:39:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 15:39:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 15:39:43 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0011_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:43 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0011_m_000000_0' done.
2013-10-14 15:39:43 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:43 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 15:39:43 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 677 bytes
2013-10-14 15:39:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:43 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0011_r_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:43 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0011_r_000000_0 is allowed to commit now
2013-10-14 15:39:43 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0011_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-10
2013-10-14 15:39:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 15:39:43 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0011_r_000000_0' done.
2013-10-14 15:39:44 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 15:39:44 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0011
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=695
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=33833211
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=808345
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=35458320
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=156323
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=681
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=6
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=666
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=2166095872
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=3
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=3
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=3
2013-10-14 15:39:44 org.apache.hadoop.mapred.Counters log
信息:     Map output records=3
2013-10-14 15:39:44 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-10-14 15:39:44 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 15:39:44 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0012
2013-10-14 15:39:44 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 15:39:44 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0012_m_000000_0 is done. And is in the process of commiting
2013-10-14 15:39:44 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:44 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0012_m_000000_0 is allowed to commit now
2013-10-14 15:39:44 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0012_m_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusteredPoints
2013-10-14 15:39:44 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 15:39:44 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0012_m_000000_0' done.
2013-10-14 15:39:45 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2013-10-14 15:39:45 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0012
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息: Counters: 11
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=41520
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=31390
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=18560374
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=437203
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=19450325
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=120417
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     Map input records=1000
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=0
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1083047936
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=130
2013-10-14 15:39:45 org.apache.hadoop.mapred.Counters log
信息:     Map output records=1000
Dumping out clusters from clusters: hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-*-final and clusteredPoints: hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusteredPoints
CL-552{n=443 c=[1.631, -0.412] r=[1.563, 1.407]}
	Weight : [props - optional]:  Point:
	1.0: [-2.393, 3.347]
	1.0: [-4.364, 1.905]
	1.0: [-3.275, 0.023]
	1.0: [-2.479, 2.534]
	1.0: [-0.559, 1.223]
	...
	
CL-847{n=77 c=[-2.953, -0.971] r=[1.767, 2.189]}
	Weight : [props - optional]:  Point:
	1.0: [-0.883, -3.320]
	1.0: [-1.099, -6.063]
	1.0: [-0.004, -0.610]
	1.0: [-2.996, -3.610]
	1.0: [3.988, 1.008]
	...

CL-823{n=480 c=[0.219, 2.600] r=[1.479, 1.385]}
	Weight : [props - optional]:  Point:
	1.0: [2.670, 1.851]
	1.0: [2.177, 6.773]
	1.0: [5.537, 2.651]
	1.0: [5.663, 6.868]
	1.0: [5.117, 3.747]
	1.0: [1.912, 2.959]
	...

4). 聚类结果解读
我们可以把上面的日志分解析成3个部分解读

  • a. 初始化环境
  • b. 算法执行
  • c. 打印聚类结果

a. 初始化环境
出初HDFS的数据目录和工作目录,并上传数据文件。


Delete: hdfs://192.168.1.210:9000/user/hdfs/mix_data
Create: hdfs://192.168.1.210:9000/user/hdfs/mix_data
copy from: datafile/randomData.csv to hdfs://192.168.1.210:9000/user/hdfs/mix_data
ls: hdfs://192.168.1.210:9000/user/hdfs/mix_data
==========================================================
name: hdfs://192.168.1.210:9000/user/hdfs/mix_data/randomData.csv, folder: false, size: 36655

b. 算法执行
算法执行,有3个步骤。

  • 1):把原始数据randomData.csv,转成Mahout sequence files of VectorWritable。
  • 2):通过随机的方法,选中kmeans的3个中心,做为初始集群
  • 3):根据迭代次数的设置,执行MapReduce,进行计算

1):把原始数据randomData.csv,转成Mahout sequence files of VectorWritable。

程序源代码:


      InputDriver.runJob(new Path(inPath), new Path(seqFile), "org.apache.mahout.math.RandomAccessSparseVector");

日志输出:

Job complete: job_local_0001

2):通过随机的方法,选中kmeans的3个中心,做为初始集群

程序源代码:


        int k = 3;
        Path seqFilePath = new Path(seqFile);
        Path clustersSeeds = new Path(seeds);
        DistanceMeasure measure = new EuclideanDistanceMeasure();
        clustersSeeds = RandomSeedGenerator.buildRandom(conf, seqFilePath, clustersSeeds, k, measure);

日志输出:

Job complete: job_local_0002

3):根据迭代次数的设置,执行MapReduce,进行计算
程序源代码:


        KMeansDriver.run(conf, seqFilePath, clustersSeeds, new Path(outPath), measure, 0.01, 10, true, 0.01, false);

日志输出:


Job complete: job_local_0003
Job complete: job_local_0004
Job complete: job_local_0005
Job complete: job_local_0006
Job complete: job_local_0007
Job complete: job_local_0008
Job complete: job_local_0009
Job complete: job_local_0010
Job complete: job_local_0011
Job complete: job_local_0012

c. 打印聚类结果


Dumping out clusters from clusters: hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusters-*-final and clusteredPoints: hdfs://192.168.1.210:9000/user/hdfs/mix_data/result/clusteredPoints
CL-552{n=443 c=[1.631, -0.412] r=[1.563, 1.407]}
CL-847{n=77 c=[-2.953, -0.971] r=[1.767, 2.189]}
CL-823{n=480 c=[0.219, 2.600] r=[1.479, 1.385]}

运行结果:有3个中心。

  • Cluster1, 包括443个点,中心坐标[1.631, -0.412]
  • Cluster2, 包括77个点,中心坐标[-2.953, -0.971]
  • Cluster3, 包括480 个点,中心坐标[0.219, 2.600]

5). HDFS产生的目录


# 根目录
~ hadoop fs -ls /user/hdfs/mix_data
Found 4 items
-rw-r--r--   3 Administrator supergroup      36655 2013-10-04 15:31 /user/hdfs/mix_data/randomData.csv
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/seeds
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/seqfile

# 输出目录
~ hadoop fs -ls /user/hdfs/mix_data/result
Found 13 items
-rw-r--r--   3 Administrator supergroup        194 2013-10-04 15:31 /user/hdfs/mix_data/result/_policy
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusteredPoints
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-0
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-1
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-10-final
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-2
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-3
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-4
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-5
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-6
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-7
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-8
drwxr-xr-x   - Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/result/clusters-9

# 产生的随机中心种子目录
~ hadoop fs -ls /user/hdfs/mix_data/seeds
Found 1 items
-rw-r--r--   3 Administrator supergroup        599 2013-10-04 15:31 /user/hdfs/mix_data/seeds/part-randomSeed

# 输入文件换成Mahout格式文件的目录
~ hadoop fs -ls /user/hdfs/mix_data/seqfile
Found 2 items
-rw-r--r--   3 Administrator supergroup          0 2013-10-04 15:31 /user/hdfs/mix_data/seqfile/_SUCCESS
-rw-r--r--   3 Administrator supergroup      31390 2013-10-04 15:31 /user/hdfs/mix_data/seqfile/part-m-00000

4. 用R语言可视化结果

分别把聚类后的点,保存到不同的cluster*.csv文件,然后用R语言画图。


c1<-read.csv(file="cluster1.csv",sep=",",header=FALSE)
c2<-read.csv(file="cluster2.csv",sep=",",header=FALSE)
c3<-read.csv(file="cluster3.csv",sep=",",header=FALSE)
y<-rbind(c1,c2,c3)
cols<-c(rep(1,nrow(c1)),rep(2,nrow(c2)),rep(3,nrow(c3)))
plot(y, col=c("black","blue","green")[cols])
center<-matrix(c(1.631, -0.412,-2.953, -0.971,0.219, 2.600),ncol=2,byrow=TRUE)
points(center, col="violetred", pch = 19)

kmeans

从上图中,我们看到有 黑,蓝,绿,三种颜色的空心点,这些点就是原始数据。
3个紫色实点,是Mahout的kmeans后生成的3个中心。

对比文章中用R语言实现的kmeans的分类和中心,都不太一样。 用Maven构建Mahout项目

简单总结一下,在使用kmeans时,根据距离算法,阈值,出始中心,迭代次数的不同,kmeans计算的结果是不相同的。因此,用kmeans算法,我们一般只能得到一个模糊的分类标准,这个标准对于我们认识未知领域的数据集是很有帮助的。不能做为精确衡量数据的指标。

5. 模板项目上传github

https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.8

大家可以下载这个项目,做为开发的起点。


~ git clone https://github.com/bsspirit/maven_mahout_template
~ git checkout mahout-0.8

这样,我们完成了Mahout的聚类算法Kmeans的分步式实现。接下来,我们会继续做关于Mahout中分类的实验!

转载请注明出处:
http://blog.fens.me/hadoop-mahout-kmeans/

打赏作者