• Posts tagged "MapReduce"

Blog Archives

PeopleRank从社交网络中发现个体价值

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-social-peoplerank/

peoplerank

前言

如果说Google改变了互联网,那么社交网络就改变人们的生活方式。通过社交网络,我们每个个体,都是成为了网络的中心。我们的生活半径,被无限放大,通过6个朋友关系,就可以认识世界上任何一个人。

未来的互联网将是属于我们每一个人。

目录

  1. PeopleRank和PageRank
  2. 需求分析:从社交网络中发现个体价值
  3. 算法模型:PeopleRank算法
  4. 架构设计:PeopleRank计算引擎系统架构
  5. 程序开发:PeopleRank算法实现

1. PeopleRank和PageRank

PageRank让Google成为搜索领域的No.1,也是当今最有影响力的互联网公司之一,用技术创新改变人们的生活。PageRank主要用于网页评分计算,把互联网上的所有网页都进行打分,给网页价值的体现。

自2012以来,中国开始进入社交网络的时代,开心网,人人网,新浪微博,腾讯微博,微信等社交网络应用,开始进入大家的生活。最早是由“抢车位”,“偷菜”等社交游戏带动的社交网络的兴起,如今人们会更多的利用社交网络,获取信息和分享信息。我们的互联网,正在从以网页信息为核心的网络,向着以人为核心的网络转变着。

于是有人就提出了,把PageRank模型应用于社交网络,定义以人为核心的个体价值。这样PageRank模型就有了新的应用领域,同时也有了一个新的名字PeopleRank。

关于PageRank的介绍,请参考文章:PageRank算法R语言实现

注:PeopleRank网上还有不同的解释,我这里仅仅表示用来解释“PageRank模型”。

下面我们将从一个PeopleRank的案例来解释,如何从社交网络中发现个体价值。

2. 需求分析:从社交网络中发现个体价值

案例介绍:
以新浪微博为例,给微博中每个用户进行评分!
从新浪微博上,把我们的关注和粉丝的关系都找到。

如下图:我在微博上,随便找了几个微博账号。

weibo-logo

我们的任务是,需要给这些账号评分!

  • 方法一,简单求和:评分=关注数+粉丝数+微博数
  • 方法二,加权求和:评分=a*关注数+b*粉丝数+c*微博数

新建数据文件:weibo.csv


~ vi weibo.csv

A,314,1091,1488
B,196,10455,327
C,557,51635228,13793
D,55,14655464,1681
E,318,547,4899
F,166,145,170
G,17,890,169
H,54,946759,17229

R语言读入数据文件


weibo<-read.csv(file="weibo.csv",header=FALSE)
names(weibo)<-c("id","follow","fans","tweet")

1). 方法一,简单相加法


> data.frame(weibo[1],rank=rowSums(weibo[2:4]))
  id     rank
1  A     2893
2  B    10978
3  C 51649578
4  D 14657200
5  E     5764
6  F      481
7  G     1076
8  H   964042

这种方法简单粗暴的方式,是否能代码公平的打分呢?!

2). 方法二,加权求和

通过a,b,c的3个参数,分别设置权重求和。与方法一存在同样的问题,a,b,c的权值都是人为指定的,也是不能表示公平的打分的。

除了上面的两个方法,你能否想到不一样的思路呢!

3. 算法模型:PeopleRank算法

基于PageRank的理论,我们以每个微博账户的“关注”为链出链接,“粉丝”为链入链接,我们把这种以人为核心的关系,叫PeopleRank。

关于PageRank的介绍,请参考文章:PageRank算法R语言实现

PeopleRank假设条件:

  • 数量假设:如果一个用户节点接收到的其他用户“关注”的数量越多,那么这个用户越重要。
  • 质量假设:用户A的“粉丝”质量不同,质量高的“粉丝”会通“关注”接向其他用户传递更多的权重。所以越是质量高的“用户”关注用户A,则用户A越重要。

衡量PeopleRank的3个指标:

  • 粉丝数
  • 粉丝是否有较高PeopleRank值
  • 粉丝关注了多少人

我们以下的数据为例,构造基于微博的数据模型:
(由于微博数据已增加访问权限,我无法拿到当前的实际数据,我用以前@晒粉丝应用,收集到的微博数据为例,这里ID已经过处理)

测试数据集:people.csv

  • 25个用户
  • 66个关系,关注和粉丝的关系

数据集: people.csv


1,19
1,21
2,11
2,17
2,21
3,1
3,20
3,2
3,7
3,6
3,10
4,3
4,6
5,19
5,11
5,2
6,4
6,12
6,18
6,15
6,10
6,5
7,9
7,18
7,10
8,3
8,11
8,7
8,16
8,14
9,6
10,8
10,18
11,13
11,3
12,9
12,4
12,16
12,5
13,19
13,1
13,6
14,7
14,17
14,19
14,1
14,5
14,2
15,11
15,14
15,12
16,20
17,4
17,6
18,10
18,11
18,15
18,14
19,18
20,10
20,5
21,24
22,11
23,17
24,15
25,24

第一列为用户ID,第二列也是用户ID。第一列用户,关注了第二用户。

以R语言可视化输出

igraph-refs

R语言程序


library(igraph)
people<-read.csv(file="people.csv",header=FALSE)
drawGraph<-function(data){
  names(data)<-c("from","to") 
  g <- graph.data.frame(data, directed=TRUE)
  V(g)$label <- V(g)$name
  V(g)$size <- 15
  E(g)$color <- grey(0.5)
  g2 <- simplify(g)
  plot(g2,layout=layout.circle)
}
drawGraph(people)

用R语言构建PeopleRank的算法原型

  • 构建邻接矩阵
  • 变换概率矩阵
  • 递归计算矩阵特征值
  • 标准化结果
  • 对结果排序输出

R语言算法模型


#构建邻接矩阵
adjacencyMatrix<-function(pages){
  n<-max(apply(pages,2,max))
  A <- matrix(0,n,n)
  for(i in 1:nrow(pages)) A[pages[i,]$dist,pages[i,]$src]<-1
  A
}

#变换概率矩阵
dProbabilityMatrix<-function(G,d=0.85){
  cs <- colSums(G)
  cs[cs==0] <- 1
  n <- nrow(G)
  delta <- (1-d)/n
  A <- matrix(delta,n,n)
  for (i in 1:n) A[i,] <- A[i,] + d*G[i,]/cs
  A
}

#递归计算矩阵特征值
eigenMatrix<-function(G,iter=100){
  n<-nrow(G)
  x <- rep(1,n)
  for (i in 1:iter) x <- G %*% x
  x/sum(x)
}

#直接计算矩阵特征值
calcEigenMatrix<-function(G){
  x <- Re(eigen(G)$vectors[,1])
  x/sum(x)
}

PeopleRank计算,带入数据集people.csv


people<-read.csv(file="people.csv",header=FALSE)
names(people)<-c("src","dist");people
A<-adjacencyMatrix(people);A
G<-dProbabilityMatrix(A);G
q<-calcEigenMatrix(G);

q
[1] 0.03274732 0.03404052 0.05983465 0.03527074 0.04366519 0.07042752 0.02741232
 [8] 0.03378595 0.02118713 0.06537870 0.07788465 0.03491910 0.03910097 0.05076803
[15] 0.06685364 0.01916392 0.02793695 0.09450614 0.05056016 0.03076591 0.02956243
[22] 0.00600000 0.00600000 0.03622806 0.00600000

我们给这25用户进行打分,从高到低进行排序。

对结果排序输出:


result<-data.frame(userid=userid,PR=q[userid])
result
   userid          PR
1      18 0.09450614
2      11 0.07788465
3       6 0.07042752
4      15 0.06685364
5      10 0.06537870
6       3 0.05983465
7      14 0.05076803
8      19 0.05056016
9       5 0.04366519
10     13 0.03910097
11     24 0.03622806
12      4 0.03527074
13     12 0.03491910
14      2 0.03404052
15      8 0.03378595
16      1 0.03274732
17     20 0.03076591
18     21 0.02956243
19     17 0.02793695
20      7 0.02741232
21      9 0.02118713
22     16 0.01916392
23     22 0.00600000
24     23 0.00600000
25     25 0.00600000

查看评分最高的用户18的关系数据:


people[c(which(people$src==18), which(people$dist==18)),]

   src dist
55  18   10
56  18   11
57  18   15
58  18   14
19   6   18
24   7   18
33  10   18
59  19   18

粉丝的PeopleRank排名:


which(result$userid %in% people$src[which(people$dist==18)])

[1]  3  5  8 20

粉丝的关注数:


table(people$src)[people$src[which(people$dist==18)]]

 6  7 10 19 
 6  3  2  1 

数据解释:用户18

  • 有4个粉丝为别是6,7,10,19。(粉丝数)
  • 4个粉丝的PeopleRank排名,是3,5,8,20。(粉丝是否有较高PeopleRank值)
  • 粉丝的关注数量,是6,3,2,1。(粉丝关注了多少人)

因此,通过对上面3个指标的综合打分,用户18是评分最高的用户。

通过R语言实现的计算模型,已经比较符合我们的评分标准了,下面我们把PeopleRank用MapReduce实现,以满足对海量数据的计算需求。

4. 架构设计:PeopleRank计算引擎系统架构

pagerank-spider

上图中,左边是数据爬虫系统,右边是Hadoop的HDFS, MapReduce。

  • 数据爬虫系统实时爬取微博数据
  • 设置系统定时器CRON,每xx小时,增量向HDFS导入数据(userid1,userid2)
  • 完成导入后,设置系统定时器,启动MapReduce程序,运行推荐算法。
  • 完成计算后,设置系统定时器,从HDFS导出推荐结果数据到数据库,方便以后的及时查询。

5. 程序开发:PeopleRank算法实现

win7的开发环境 和 Hadoop的运行环境 ,请参考文章:用Maven构建Hadoop项目

开发步骤:

  • 微博好友的关系数据: people.csv
  • 出始的PR数据:peoplerank.csv
  • 邻接矩阵: AdjacencyMatrix.java
  • PeopleRank计算: PageRank.java
  • PR标准化: Normal.java
  • 启动程序: PageRankJob.java

1). 微博好友的关系数据: people.csv,在上文中已列出

2). 出始的PR数据:peoplerank.csv


1,1
2,1
3,1
4,1
5,1
6,1
7,1
8,1
9,1
10,1
11,1
12,1
13,1
14,1
15,1
16,1
17,1
18,1
19,1
20,1
21,1
22,1
23,1
24,1
25,1

3). 邻接矩阵

矩阵解释:

  • 阻尼系数为0.85
  • 用户数为25
  • reduce以行输出矩阵的列,输出列主要用于分步式存储,下一步需要转成行

部分数据输出:


~ hadoop fs -cat /user/hdfs/pagerank/tmp1/part-r-00000|head -n 4

1	0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999
10	0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999
11	0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999
12	0.005999999,0.005999999,0.005999999,0.2185,0.2185,0.005999999,0.005999999,0.005999999,0.2185,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.2185,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999

4). PeopleRank计算: PageRank.java

迭代一次的PeopleRank值


~ hadoop fs -cat /user/hdfs/pagerank/pr/part-r-00000

1       0.716666
10      1.354167
11      2.232500
12      0.575000
13      0.575000
14      0.815833
15      1.354167
16      0.532500
17      1.425000
18      1.850000
19      1.283334
2       0.716667
20      1.141667
21      0.858333
22      0.150000
23      0.150000
24      1.850000
25      0.150000
3       1.170001
4       0.929167
5       1.070833
6       2.275001
7       0.603333
8       0.575000
9       0.645833

5). PR标准化: Normal.java

迭代10次,并标准化的结果:


~ hadoop fs -cat /user/hdfs/pagerank/result/part-r-00000

1       0.032842
10      0.065405
11      0.077670
12      0.034864
13      0.039175
14      0.050574
15      0.066614
16      0.019167
17      0.027990
18      0.094460
19      0.050673
2       0.034054
20      0.030835
21      0.029657
22      0.006000
23      0.006000
24      0.036111
25      0.006000
3       0.059864
4       0.035314
5       0.043805
6       0.070516
7       0.027444
8       0.033715
9       0.021251

我们对结果进行排序


   id       pr
10 18 0.094460
3  11 0.077670
22  6 0.070516
7  15 0.066614
2  10 0.065405
19  3 0.059864
11 19 0.050673
6  14 0.050574
21  5 0.043805
5  13 0.039175
17 24 0.036111
20  4 0.035314
4  12 0.034864
12  2 0.034054
24  8 0.033715
1   1 0.032842
13 20 0.030835
14 21 0.029657
9  17 0.027990
23  7 0.027444
25  9 0.021251
8  16 0.019167
15 22 0.006000
16 23 0.006000
18 25 0.006000

第一名是用户18,第二名是用户11,第三名是用户6,第三名与之前R语言单机计算的结果有些不一样,而且PR值也稍有不同,这是因为我们迭代10次时,特征值还没有完全的收敛,需要更多次的迭代计算,才能得矩阵的特征值。

程序API的实现,请参考文章:PageRank算法并行实现

我们通过PageRank的模型,成功地应用到了社交网络,实现了PeopleRank的计算,通过设计数据挖掘算法,来取代不成熟的人脑思想。算法模型将更客观,更精准。

最后,大家可以利用这个案例的设计思路,认真地了解社交网络,做出属于的自己的算法。

由于时间仓促,代码可能存在bug。请有能力同学,自行发现问题,解决问题!!

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-social-peoplerank/

打赏作者

PageRank算法并行实现

算法为王系列文章,涵盖了计算机算法,数据挖掘(机器学习)算法,统计算法,金融算法等的多种跨学科算法组合。在大数据时代的背景下,算法已经成为了金字塔顶的明星。一个好的算法可以创造一个伟大帝国,就像Google。

算法为王的时代正式到来….

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/algorithm-pagerank-mapreduce/

pagerank-mapreduce

前言

Google通过PageRank算法模型,实现了对全互联网网页的打分。但对于海量数据的处理,在单机下是不可能实现,所以如何将PageRank并行计算,将是本文的重点。

本文将继续上一篇文章 PageRank算法R语言实现,把PageRank单机实现,改成并行实现,利用MapReduce计算框架,在集群中跑起来。

目录

  1. PageRank算法并行化原理
  2. MapReduce分步式编程

1. PageRank算法分步式原理

单机算法原理请参考文章:PageRank算法R语言实现

pagerank-sample

PageRank的分步式算法原理,简单来讲,就是通过矩阵计算实现并行化。

1). 把邻接矩阵的列,按数据行存储

邻接矩阵


          [,1]   [,2]   [,3]   [,4]
[1,] 0.0375000 0.0375 0.0375 0.0375
[2,] 0.3208333 0.0375 0.0375 0.8875
[3,] 0.3208333 0.4625 0.0375 0.0375
[4,] 0.3208333 0.4625 0.8875 0.0375

按行存储HDFS


1       0.037499994,0.32083333,0.32083333,0.32083333
2       0.037499994,0.037499994,0.4625,0.4625
3       0.037499994,0.037499994,0.037499994,0.88750005
4       0.037499994,0.88750005,0.037499994,0.037499994

2). 迭代:求矩阵特征值

pagerank-mr

map过程:

  • input: 邻接矩阵, pr值
  • output: key为pr的行号,value为邻接矩阵和pr值的乘法求和公式

reduce过程:

  • input: key为pr的行号,value为邻接矩阵和pr值的乘法求和公式
  • output: key为pr的行号, value为计算的结果,即pr值

第1次迭代


0.0375000 0.0375 0.0375 0.0375     1     0.150000
0.3208333 0.0375 0.0375 0.8875  *  1  =  1.283333
0.3208333 0.4625 0.0375 0.0375     1     0.858333
0.3208333 0.4625 0.8875 0.0375     1     1.708333

第2次迭代


0.0375000 0.0375 0.0375 0.0375     0.150000      0.150000
0.3208333 0.0375 0.0375 0.8875  *  1.283333  =   1.6445833
0.3208333 0.4625 0.0375 0.0375     0.858333      0.7379167
0.3208333 0.4625 0.8875 0.0375     1.708333      1.4675000

… 10次迭代

特征值


0.1500000
1.4955721
0.8255034
1.5289245

3). 标准化PR值


0.150000                                              0.0375000
1.4955721  / (0.15+1.4955721+0.8255034+1.5289245) =   0.3738930
0.8255034                                             0.2063759
1.5289245                                             0.3822311

2. MapReduce分步式编程

MapReduce流程分解

PageRankJob

HDFS目录

  • input(/user/hdfs/pagerank): HDFS的根目录
  • input_pr(/user/hdfs/pagerank/pr): 临时目录,存储中间结果PR值
  • tmp1(/user/hdfs/pagerank/tmp1):临时目录,存储邻接矩阵
  • tmp2(/user/hdfs/pagerank/tmp2):临时目录,迭代计算PR值,然后保存到input_pr
  • result(/user/hdfs/pagerank/result): PR值输出结果

开发步骤:

  • 网页链接关系数据: page.csv
  • 出始的PR数据:pr.csv
  • 邻接矩阵: AdjacencyMatrix.java
  • PageRank计算: PageRank.java
  • PR标准化: Normal.java
  • 启动程序: PageRankJob.java

1). 网页链接关系数据: page.csv

新建文件:page.csv


1,2
1,3
1,4
2,3
2,4
3,4
4,2

2). 出始的PR数据:pr.csv

设置网页的初始值都是1

新建文件:pr.csv


1,1
2,1
3,1
4,1

3). 邻接矩阵: AdjacencyMatrix.java

adjacencyMatrix

矩阵解释:

  • 阻尼系数为0.85
  • 页面数为4
  • reduce以行输出矩阵的列,输出列主要用于分步式存储,下一步需要转成行

新建程序:AdjacencyMatrix.java


package org.conan.myhadoop.pagerank;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class AdjacencyMatrix {

    private static int nums = 4;// 页面数
    private static float d = 0.85f;// 阻尼系数

    public static class AdjacencyMatrixMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = PageRankJob.DELIMITER.split(values.toString());
            Text k = new Text(tokens[0]);
            Text v = new Text(tokens[1]);
            context.write(k, v);
        }
    }

    public static class AdjacencyMatrixReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            float[] G = new float[nums];// 概率矩阵列
            Arrays.fill(G, (float) (1 - d) / G.length);

            float[] A = new float[nums];// 近邻矩阵列
            int sum = 0;// 链出数量
            for (Text val : values) {
                int idx = Integer.parseInt(val.toString());
                A[idx - 1] = 1;
                sum++;
            }

            if (sum == 0) {// 分母不能为0
                sum = 1;
            }

            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < A.length; i++) {
                sb.append("," + (float) (G[i] + d * A[i] / sum));
            }

            Text v = new Text(sb.toString().substring(1));
            System.out.println(key + ":" + v.toString());
            context.write(key, v);
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();

        String input = path.get("input");
        String input_pr = path.get("input_pr");
        String output = path.get("tmp1");
        String page = path.get("page");
        String pr = path.get("pr");

        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.mkdirs(input_pr);
        hdfs.copyFile(page, input);
        hdfs.copyFile(pr, input_pr);

        Job job = new Job(conf);
        job.setJarByClass(AdjacencyMatrix.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(AdjacencyMatrixMapper.class);
        job.setReducerClass(AdjacencyMatrixReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(page));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }
}

4). PageRank计算: PageRank.java

pagerank-step1

矩阵解释:

  • 实现邻接与PR矩阵的乘法
  • map以邻接矩阵的行号为key,由于上一步是输出的是列,所以这里需要转成行
  • reduce计算得到未标准化的特征值

新建文件: PageRank.java


package org.conan.myhadoop.pagerank;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class PageRank {

    public static class PageRankMapper extends Mapper<LongWritable, Text, Text, Text> {

        private String flag;// tmp1 or result
        private static int nums = 4;// 页面数

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();// 判断读的数据集
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = PageRankJob.DELIMITER.split(values.toString());

            if (flag.equals("tmp1")) {
                String row = values.toString().substring(0,1);
                String[] vals = PageRankJob.DELIMITER.split(values.toString().substring(2));// 矩阵转置
                for (int i = 0; i < vals.length; i++) {
                    Text k = new Text(String.valueOf(i + 1));
                    Text v = new Text(String.valueOf("A:" + (row) + "," + vals[i]));
                    context.write(k, v);
                }

            } else if (flag.equals("pr")) {
                for (int i = 1; i <= nums; i++) {
                    Text k = new Text(String.valueOf(i));
                    Text v = new Text("B:" + tokens[0] + "," + tokens[1]);
                    context.write(k, v);
                }
            }
        }
    }

    public static class PageRankReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            Map<Integer, Float> mapA = new HashMap<Integer, Float>();
            Map<Integer, Float> mapB = new HashMap<Integer, Float>();
            float pr = 0f;

            for (Text line : values) {
                System.out.println(line);
                String vals = line.toString();

                if (vals.startsWith("A:")) {
                    String[] tokenA = PageRankJob.DELIMITER.split(vals.substring(2));
                    mapA.put(Integer.parseInt(tokenA[0]), Float.parseFloat(tokenA[1]));
                }

                if (vals.startsWith("B:")) {
                    String[] tokenB = PageRankJob.DELIMITER.split(vals.substring(2));
                    mapB.put(Integer.parseInt(tokenB[0]), Float.parseFloat(tokenB[1]));
                }
            }

            Iterator iterA = mapA.keySet().iterator();
            while(iterA.hasNext()){
                int idx = iterA.next();
                float A = mapA.get(idx);
                float B = mapB.get(idx);
                pr += A * B;
            }

            context.write(key, new Text(PageRankJob.scaleFloat(pr)));
            // System.out.println(key + ":" + PageRankJob.scaleFloat(pr));
        }

    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();

        String input = path.get("tmp1");
        String output = path.get("tmp2");
        String pr = path.get("input_pr");

        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(PageRank.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(PageRankMapper.class);
         job.setReducerClass(PageRankReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input), new Path(pr));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);

        hdfs.rmr(pr);
        hdfs.rename(output, pr);
    }
}

5). PR标准化: Normal.java

normal-step1

矩阵解释:

  • 对PR的计算结果标准化,让所以PR值落在(0,1)区间

新建文件:Normal.java


package org.conan.myhadoop.pagerank;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Normal {

    public static class NormalMapper extends Mapper<LongWritable, Text, Text, Text> {

        Text k = new Text("1");

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            context.write(k, values);
        }
    }

    public static class NormalReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            List vList = new ArrayList();

            float sum = 0f;
            for (Text line : values) {
                vList.add(line.toString());

                String[] vals = PageRankJob.DELIMITER.split(line.toString());
                float f = Float.parseFloat(vals[1]);
                sum += f;
            }

            for (String line : vList) {
                String[] vals = PageRankJob.DELIMITER.split(line.toString());
                Text k = new Text(vals[0]);

                float f = Float.parseFloat(vals[1]);
                Text v = new Text(PageRankJob.scaleFloat((float) (f / sum)));
                context.write(k, v);

                System.out.println(k + ":" + v);
            }
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();

        String input = path.get("input_pr");
        String output = path.get("result");

        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(Normal.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(NormalMapper.class);
        job.setReducerClass(NormalReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }
}

6). 启动程序: PageRankJob.java

新建文件:PageRankJob.java


package org.conan.myhadoop.pagerank;

import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.mapred.JobConf;

public class PageRankJob {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static void main(String[] args) {
        Map<String, String> path = new HashMap<String, String>();
        path.put("page", "logfile/pagerank/page.csv");// 本地的数据文件
        path.put("pr", "logfile/pagerank/pr.csv");// 本地的数据文件

        path.put("input", HDFS + "/user/hdfs/pagerank");// HDFS的目录
        path.put("input_pr", HDFS + "/user/hdfs/pagerank/pr");// pr存储目
        path.put("tmp1", HDFS + "/user/hdfs/pagerank/tmp1");// 临时目录,存放邻接矩阵
        path.put("tmp2", HDFS + "/user/hdfs/pagerank/tmp2");// 临时目录,计算到得PR,覆盖input_pr

        path.put("result", HDFS + "/user/hdfs/pagerank/result");// 计算结果的PR

        try {

            AdjacencyMatrix.run(path);
            int iter = 3;
            for (int i = 0; i < iter; i++) {// 迭代执行
                PageRank.run(path);
            }
            Normal.run(path);

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(0);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(PageRankJob.class);
        conf.setJobName("PageRank");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static String scaleFloat(float f) {// 保留6位小数
        DecimalFormat df = new DecimalFormat("##0.000000");
        return df.format(f);
    }
}

程序代码已上传到github:

https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myhadoop/pagerank

这样就实现了,PageRank的并行吧!接下来,我们就可以用PageRank做一些有意思的应用了。

转载请注明出处:
http://blog.fens.me/algorithm-pagerank-mapreduce/

打赏作者

用Hadoop构建电影推荐系统

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-recommend/

hadoop-recommand

前言

Netflix电影推荐的百万美金比赛,把“推荐”变成了时下最热门的数据挖掘算法之一。也正是由于Netflix的比赛,让企业界和学科界有了更深层次的技术碰撞。引发了各种网站“推荐”热,个性时代已经到来。

目录

  1. 推荐系统概述
  2. 需求分析:推荐系统指标设计
  3. 算法模型:Hadoop并行算法
  4. 架构设计:推荐系统架构
  5. 程序开发:MapReduce程序实现
  6. 补充内容:对Step4过程优化

1. 推荐系统概述

电子商务网站是个性化推荐系统重要地应用的领域之一,亚马逊就是个性化推荐系统的积极应用者和推广者,亚马逊的推荐系统深入到网站的各类商品,为亚马逊带来了至少30%的销售额。

不光是电商类,推荐系统无处不在。QQ,人人网的好友推荐;新浪微博的你可能感觉兴趣的人;优酷,土豆的电影推荐;豆瓣的图书推荐;大从点评的餐饮推荐;世纪佳缘的相亲推荐;天际网的职业推荐等。

推荐算法分类:

按数据使用划分:

  • 协同过滤算法:UserCF, ItemCF, ModelCF
  • 基于内容的推荐: 用户内容属性和物品内容属性
  • 社会化过滤:基于用户的社会网络关系

按模型划分:

  • 最近邻模型:基于距离的协同过滤算法
  • Latent Factor Mode(SVD):基于矩阵分解的模型
  • Graph:图模型,社会网络图模型

基于用户的协同过滤算法UserCF

基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲就是:给用户推荐和他兴趣相似的其他用户喜欢的物品。

用例说明:

image015

算法实现及使用介绍,请参考文章:Mahout推荐算法API详解

基于物品的协同过滤算法ItemCF

基于item的协同过滤,通过用户对不同item的评分来评测item之间的相似性,基于item之间的相似性做出推荐。简单来讲就是:给用户推荐和他之前喜欢的物品相似的物品。

用例说明:

image017

算法实现及使用介绍,请参考文章:Mahout推荐算法API详解

注:基于物品的协同过滤算法,是目前商用最广泛的推荐算法。

协同过滤算法实现,分为2个步骤

  • 1. 计算物品之间的相似度
  • 2. 根据物品的相似度和用户的历史行为给用户生成推荐列表

有关协同过滤的另一篇文章,请参考:RHadoop实践系列之三 R实现MapReduce的协同过滤算法

2. 需求分析:推荐系统指标设计

下面我们将从一个公司案例出发来全面的解释,如何进行推荐系统指标设计。

案例介绍

Netflix电影推荐百万奖金比赛,http://www.netflixprize.com/
Netflix官方网站:www.netflix.com

Netflix,2006年组织比赛是的时候,是一家以在线电影租赁为生的公司。他们根据网友对电影的打分来判断用户有可能喜欢什么电影,并结合会员看过的电影以及口味偏好设置做出判断,混搭出各种电影风格的需求。

收集会员的一些信息,为他们指定个性化的电影推荐后,有许多冷门电影竟然进入了候租榜单。从公司的电影资源成本方面考量,热门电影的成本一般较高,如果Netflix公司能够在电影租赁中增加冷门电影的比例,自然能够提升自身盈利能力。

Netflix公司曾宣称60%左右的会员根据推荐名单定制租赁顺序,如果推荐系统不能准确地猜测会员喜欢的电影类型,容易造成多次租借冷门电影而并不符合个人口味的会员流失。为了更高效地为会员推荐电影,Netflix一直致力于不断改进和完善个性化推荐服务,在2006年推出百万美元大奖,无论是谁能最好地优化Netflix推荐算法就可获奖励100万美元。到2009年,奖金被一个7人开发小组夺得,Netflix随后又立即推出第二个百万美金悬赏。这充分说明一套好的推荐算法系统是多么重要,同时又是多么困难。

netflix_prize

上图为比赛的各支队伍的排名!

补充说明:

  • 1. Netflix的比赛是基于静态数据的,就是给定“训练级”,匹配“结果集”,“结果集”也是提前就做好的,所以这与我们每天运营的系统,其实是不一样的。
  • 2. Netflix用于比赛的数据集是小量的,整个全集才666MB,而实际的推荐系统都要基于大量历史数据的,动不动就会上GB,TB等

Netflix数据下载
部分训练集:http://graphlab.org/wp-content/uploads/2013/07/smallnetflix_mm.train_.gz
部分结果集:http://graphlab.org/wp-content/uploads/2013/07/smallnetflix_mm.validate.gz
完整数据集:http://www.lifecrunch.biz/wp-content/uploads/2011/04/nf_prize_dataset.tar.gz

所以,我们在真实的环境中设计推荐的时候,要全面考量数据量,算法性能,结果准确度等的指标。

  • 推荐算法选型:基于物品的协同过滤算法ItemCF,并行实现
  • 数据量:基于Hadoop架构,支持GB,TB,PB级数据量
  • 算法检验:可以通过 准确率,召回率,覆盖率,流行度 等指标评判。
  • 结果解读:通过ItemCF的定义,合理给出结果解释

3. 算法模型:Hadoop并行算法

这里我使用”Mahout In Action”书里,第一章第六节介绍的分步式基于物品的协同过滤算法进行实现。Chapter 6: Distributing recommendation computations

测试数据集:small.csv


1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.0
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

每行3个字段,依次是用户ID,电影ID,用户对电影的评分(0-5分,每0.5为一个评分点!)

算法的思想:

  • 1. 建立物品的同现矩阵
  • 2. 建立用户对物品的评分矩阵
  • 3. 矩阵计算推荐结果

1). 建立物品的同现矩阵
按用户分组,找到每个用户所选的物品,单独出现计数及两两一组计数。


      [101] [102] [103] [104] [105] [106] [107]
[101]   5     3     4     4     2     2     1
[102]   3     3     3     2     1     1     0
[103]   4     3     4     3     1     2     0
[104]   4     2     3     4     2     2     1
[105]   2     1     1     2     2     1     1
[106]   2     1     2     2     1     2     0
[107]   1     0     0     1     1     0     1

2). 建立用户对物品的评分矩阵
按用户分组,找到每个用户所选的物品及评分


       U3
[101] 2.0
[102] 0.0
[103] 0.0
[104] 4.0
[105] 4.5
[106] 0.0
[107] 5.0

3). 矩阵计算推荐结果
同现矩阵*评分矩阵=推荐结果

alogrithm_1

图片摘自”Mahout In Action”

MapReduce任务设计

aglorithm_2

图片摘自”Mahout In Action”

解读MapRduce任务:

  • 步骤1: 按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵
  • 步骤2: 对物品组合列表进行计数,建立物品的同现矩阵
  • 步骤3: 合并同现矩阵和评分矩阵
  • 步骤4: 计算推荐结果列表

4. 架构设计:推荐系统架构

hadoop-recommand-architect

上图中,左边是Application业务系统,右边是Hadoop的HDFS, MapReduce。

  1. 业务系统记录了用户的行为和对物品的打分
  2. 设置系统定时器CRON,每xx小时,增量向HDFS导入数据(userid,itemid,value,time)。
  3. 完成导入后,设置系统定时器,启动MapReduce程序,运行推荐算法。
  4. 完成计算后,设置系统定时器,从HDFS导出推荐结果数据到数据库,方便以后的及时查询。

5. 程序开发:MapReduce程序实现

win7的开发环境 和 Hadoop的运行环境 ,请参考文章:用Maven构建Hadoop项目

新建Java类:

  • Recommend.java,主任务启动程序
  • Step1.java,按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵
  • Step2.java,对物品组合列表进行计数,建立物品的同现矩阵
  • Step3.java,合并同现矩阵和评分矩阵
  • Step4.java,计算推荐结果列表
  • HdfsDAO.java,HDFS操作工具类

1). Recommend.java,主任务启动程序
源代码:


package org.conan.myhadoop.recommend;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.mapred.JobConf;

public class Recommend {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static void main(String[] args) throws Exception {
        Map<String, String> path = new HashMap<String, String>();
        path.put("data", "logfile/small.csv");
        path.put("Step1Input", HDFS + "/user/hdfs/recommend");
        path.put("Step1Output", path.get("Step1Input") + "/step1");
        path.put("Step2Input", path.get("Step1Output"));
        path.put("Step2Output", path.get("Step1Input") + "/step2");
        path.put("Step3Input1", path.get("Step1Output"));
        path.put("Step3Output1", path.get("Step1Input") + "/step3_1");
        path.put("Step3Input2", path.get("Step2Output"));
        path.put("Step3Output2", path.get("Step1Input") + "/step3_2");
        path.put("Step4Input1", path.get("Step3Output1"));
        path.put("Step4Input2", path.get("Step3Output2"));
        path.put("Step4Output", path.get("Step1Input") + "/step4");

        Step1.run(path);
        Step2.run(path);
        Step3.run1(path);
        Step3.run2(path);
        Step4.run(path);
        System.exit(0);
    }

    public static JobConf config() {
        JobConf conf = new JobConf(Recommend.class);
        conf.setJobName("Recommend");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

}

2). Step1.java,按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵

源代码:


package org.conan.myhadoop.recommend;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Step1 {

    public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, IntWritable, Text> {
        private final static IntWritable k = new IntWritable();
        private final static Text v = new Text();

        @Override
        public void map(Object key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
            String[] tokens = Recommend.DELIMITER.split(value.toString());
            int userID = Integer.parseInt(tokens[0]);
            String itemID = tokens[1];
            String pref = tokens[2];
            k.set(userID);
            v.set(itemID + ":" + pref);
            output.collect(k, v);
        }
    }

    public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
        private final static Text v = new Text();

        @Override
        public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
            StringBuilder sb = new StringBuilder();
            while (values.hasNext()) {
                sb.append("," + values.next());
            }
            v.set(sb.toString().replaceFirst(",", ""));
            output.collect(key, v);
        }
    }

    public static void run(Map<String, String> path) throws IOException {
        JobConf conf = Recommend.config();

        String input = path.get("Step1Input");
        String output = path.get("Step1Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(path.get("data"), input);

        conf.setMapOutputKeyClass(IntWritable.class);
        conf.setMapOutputValueClass(Text.class);

        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Step1_ToItemPreMapper.class);
        conf.setCombinerClass(Step1_ToUserVectorReducer.class);
        conf.setReducerClass(Step1_ToUserVectorReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            job.waitForCompletion();
        }
    }

}

计算结果:


~ hadoop fs -cat /user/hdfs/recommend/step1/part-00000

1       102:3.0,103:2.5,101:5.0
2       101:2.0,102:2.5,103:5.0,104:2.0
3       107:5.0,101:2.0,104:4.0,105:4.5
4       101:5.0,103:3.0,104:4.5,106:4.0
5       101:4.0,102:3.0,103:2.0,104:4.0,105:3.5,106:4.0

3). Step2.java,对物品组合列表进行计数,建立物品的同现矩阵
源代码:


package org.conan.myhadoop.recommend;

import java.io.IOException;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Step2 {
    public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
        private final static Text k = new Text();
        private final static IntWritable v = new IntWritable(1);

        @Override
        public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            String[] tokens = Recommend.DELIMITER.split(values.toString());
            for (int i = 1; i < tokens.length; i++) {
                String itemID = tokens[i].split(":")[0];
                for (int j = 1; j < tokens.length; j++) {
                    String itemID2 = tokens[j].split(":")[0];
                    k.set(itemID + ":" + itemID2);
                    output.collect(k, v);
                }
            }
        }
    }

    public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            result.set(sum);
            output.collect(key, result);
        }
    }

    public static void run(Map<String, String> path) throws IOException {
        JobConf conf = Recommend.config();

        String input = path.get("Step2Input");
        String output = path.get("Step2Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class);
        conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);
        conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            job.waitForCompletion();
        }
    }
}

计算结果:


~ hadoop fs -cat /user/hdfs/recommend/step2/part-00000

101:101 5
101:102 3
101:103 4
101:104 4
101:105 2
101:106 2
101:107 1
102:101 3
102:102 3
102:103 3
102:104 2
102:105 1
102:106 1
103:101 4
103:102 3
103:103 4
103:104 3
103:105 1
103:106 2
104:101 4
104:102 2
104:103 3
104:104 4
104:105 2
104:106 2
104:107 1
105:101 2
105:102 1
105:103 1
105:104 2
105:105 2
105:106 1
105:107 1
106:101 2
106:102 1
106:103 2
106:104 2
106:105 1
106:106 2
107:101 1
107:104 1
107:105 1
107:107 1

4). Step3.java,合并同现矩阵和评分矩阵
源代码:


package org.conan.myhadoop.recommend;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Step3 {

    public static class Step31_UserVectorSplitterMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
        private final static IntWritable k = new IntWritable();
        private final static Text v = new Text();

        @Override
        public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
            String[] tokens = Recommend.DELIMITER.split(values.toString());
            for (int i = 1; i < tokens.length; i++) {
                String[] vector = tokens[i].split(":");
                int itemID = Integer.parseInt(vector[0]);
                String pref = vector[1];

                k.set(itemID);
                v.set(tokens[0] + ":" + pref);
                output.collect(k, v);
            }
        }
    }

    public static void run1(Map<String, String> path) throws IOException {
        JobConf conf = Recommend.config();

        String input = path.get("Step3Input1");
        String output = path.get("Step3Output1");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Step31_UserVectorSplitterMapper.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            job.waitForCompletion();
        }
    }

    public static class Step32_CooccurrenceColumnWrapperMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
        private final static Text k = new Text();
        private final static IntWritable v = new IntWritable();

        @Override
        public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            String[] tokens = Recommend.DELIMITER.split(values.toString());
            k.set(tokens[0]);
            v.set(Integer.parseInt(tokens[1]));
            output.collect(k, v);
        }
    }

    public static void run2(Map<String, String> path) throws IOException {
        JobConf conf = Recommend.config();

        String input = path.get("Step3Input2");
        String output = path.get("Step3Output2");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Step32_CooccurrenceColumnWrapperMapper.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            job.waitForCompletion();
        }
    }

}

计算结果:


~ hadoop fs -cat /user/hdfs/recommend/step3_1/part-00000

101     5:4.0
101     1:5.0
101     2:2.0
101     3:2.0
101     4:5.0
102     1:3.0
102     5:3.0
102     2:2.5
103     2:5.0
103     5:2.0
103     1:2.5
103     4:3.0
104     2:2.0
104     5:4.0
104     3:4.0
104     4:4.5
105     3:4.5
105     5:3.5
106     5:4.0
106     4:4.0
107     3:5.0

~ hadoop fs -cat /user/hdfs/recommend/step3_2/part-00000

101:101 5
101:102 3
101:103 4
101:104 4
101:105 2
101:106 2
101:107 1
102:101 3
102:102 3
102:103 3
102:104 2
102:105 1
102:106 1
103:101 4
103:102 3
103:103 4
103:104 3
103:105 1
103:106 2
104:101 4
104:102 2
104:103 3
104:104 4
104:105 2
104:106 2
104:107 1
105:101 2
105:102 1
105:103 1
105:104 2
105:105 2
105:106 1
105:107 1
106:101 2
106:102 1
106:103 2
106:104 2
106:105 1
106:106 2
107:101 1
107:104 1
107:105 1
107:107 1

5). Step4.java,计算推荐结果列表
源代码:


package org.conan.myhadoop.recommend;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Step4 {

    public static class Step4_PartialMultiplyMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
        private final static IntWritable k = new IntWritable();
        private final static Text v = new Text();

        private final static Map<Integer, List> cooccurrenceMatrix = new HashMap<Integer, List>();

        @Override
        public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
            String[] tokens = Recommend.DELIMITER.split(values.toString());

            String[] v1 = tokens[0].split(":");
            String[] v2 = tokens[1].split(":");

            if (v1.length > 1) {// cooccurrence
                int itemID1 = Integer.parseInt(v1[0]);
                int itemID2 = Integer.parseInt(v1[1]);
                int num = Integer.parseInt(tokens[1]);

                List list = null;
                if (!cooccurrenceMatrix.containsKey(itemID1)) {
                    list = new ArrayList();
                } else {
                    list = cooccurrenceMatrix.get(itemID1);
                }
                list.add(new Cooccurrence(itemID1, itemID2, num));
                cooccurrenceMatrix.put(itemID1, list);
            }

            if (v2.length > 1) {// userVector
                int itemID = Integer.parseInt(tokens[0]);
                int userID = Integer.parseInt(v2[0]);
                double pref = Double.parseDouble(v2[1]);
                k.set(userID);
                for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) {
                    v.set(co.getItemID2() + "," + pref * co.getNum());
                    output.collect(k, v);
                }

            }
        }
    }

    public static class Step4_AggregateAndRecommendReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
        private final static Text v = new Text();

        @Override
        public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
            Map<String, Double> result = new HashMap<String, Double>();
            while (values.hasNext()) {
                String[] str = values.next().toString().split(",");
                if (result.containsKey(str[0])) {
                    result.put(str[0], result.get(str[0]) + Double.parseDouble(str[1]));
                } else {
                    result.put(str[0], Double.parseDouble(str[1]));
                }
            }
            Iterator iter = result.keySet().iterator();
            while (iter.hasNext()) {
                String itemID = iter.next();
                double score = result.get(itemID);
                v.set(itemID + "," + score);
                output.collect(key, v);
            }
        }
    }

    public static void run(Map<String, String> path) throws IOException {
        JobConf conf = Recommend.config();

        String input1 = path.get("Step4Input1");
        String input2 = path.get("Step4Input2");
        String output = path.get("Step4Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputValueClass(Text.class);

        conf.setMapperClass(Step4_PartialMultiplyMapper.class);
        conf.setCombinerClass(Step4_AggregateAndRecommendReducer.class);
        conf.setReducerClass(Step4_AggregateAndRecommendReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input1), new Path(input2));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        RunningJob job = JobClient.runJob(conf);
        while (!job.isComplete()) {
            job.waitForCompletion();
        }
    }

}

class Cooccurrence {
    private int itemID1;
    private int itemID2;
    private int num;

    public Cooccurrence(int itemID1, int itemID2, int num) {
        super();
        this.itemID1 = itemID1;
        this.itemID2 = itemID2;
        this.num = num;
    }

    public int getItemID1() {
        return itemID1;
    }

    public void setItemID1(int itemID1) {
        this.itemID1 = itemID1;
    }

    public int getItemID2() {
        return itemID2;
    }

    public void setItemID2(int itemID2) {
        this.itemID2 = itemID2;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

}

计算结果:


~ hadoop fs -cat /user/hdfs/recommend/step4/part-00000

1       107,5.0
1       106,18.0
1       105,15.5
1       104,33.5
1       103,39.0
1       102,31.5
1       101,44.0
2       107,4.0
2       106,20.5
2       105,15.5
2       104,36.0
2       103,41.5
2       102,32.5
2       101,45.5
3       107,15.5
3       106,16.5
3       105,26.0
3       104,38.0
3       103,24.5
3       102,18.5
3       101,40.0
4       107,9.5
4       106,33.0
4       105,26.0
4       104,55.0
4       103,53.5
4       102,37.0
4       101,63.0
5       107,11.5
5       106,34.5
5       105,32.0
5       104,59.0
5       103,56.5
5       102,42.5
5       101,68.0

对Step4过程优化,请参考本文最后的补充内容。

6). HdfsDAO.java,HDFS操作工具类
详细解释,请参考文章:Hadoop编程调用HDFS

源代码:


package org.conan.myhadoop.hdfs;

import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;

public class HdfsDAO {

    private static final String HDFS = "hdfs://192.168.1.210:9000/";

    public HdfsDAO(Configuration conf) {
        this(HDFS, conf);
    }

    public HdfsDAO(String hdfs, Configuration conf) {
        this.hdfsPath = hdfs;
        this.conf = conf;
    }

    private String hdfsPath;
    private Configuration conf;

    public static void main(String[] args) throws IOException {
        JobConf conf = config();
        HdfsDAO hdfs = new HdfsDAO(conf);
        hdfs.copyFile("datafile/item.csv", "/tmp/new");
        hdfs.ls("/tmp/new");
    }        

    public static JobConf config(){
        JobConf conf = new JobConf(HdfsDAO.class);
        conf.setJobName("HdfsDAO");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public void mkdirs(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        if (!fs.exists(path)) {
            fs.mkdirs(path);
            System.out.println("Create: " + folder);
        }
        fs.close();
    }

    public void rmr(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.deleteOnExit(path);
        System.out.println("Delete: " + folder);
        fs.close();
    }

    public void ls(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FileStatus[] list = fs.listStatus(path);
        System.out.println("ls: " + folder);
        System.out.println("==========================================================");
        for (FileStatus f : list) {
            System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
        }
        System.out.println("==========================================================");
        fs.close();
    }

    public void createFile(String file, String content) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        byte[] buff = content.getBytes();
        FSDataOutputStream os = null;
        try {
            os = fs.create(new Path(file));
            os.write(buff, 0, buff.length);
            System.out.println("Create: " + file);
        } finally {
            if (os != null)
                os.close();
        }
        fs.close();
    }

    public void copyFile(String local, String remote) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyFromLocalFile(new Path(local), new Path(remote));
        System.out.println("copy from: " + local + " to " + remote);
        fs.close();
    }

    public void download(String remote, String local) throws IOException {
        Path path = new Path(remote);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyToLocalFile(path, new Path(local));
        System.out.println("download: from" + remote + " to " + local);
        fs.close();
    }

    public void cat(String remoteFile) throws IOException {
        Path path = new Path(remoteFile);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FSDataInputStream fsdis = null;
        System.out.println("cat: " + remoteFile);
        try {  
            fsdis =fs.open(path);
            IOUtils.copyBytes(fsdis, System.out, 4096, false);  
          } finally {  
            IOUtils.closeStream(fsdis);
            fs.close();
          }
    }
}

这样我们就自己编程实现了MapReduce化基于物品的协同过滤算法。

RHadoop的实现方案,请参考文章:RHadoop实践系列之三 R实现MapReduce的协同过滤算法

Mahout的实现方案,请参考文章:Mahout分步式程序开发 基于物品的协同过滤ItemCF

我已经把整个MapReduce的实现都放到了github上面:
https://github.com/bsspirit/maven_hadoop_template/releases/tag/recommend

6. 补充内容:对Step4过程优化

在Step4.java这一步运行过程中,Mapper过程在Step4_PartialMultiplyMapper类通过分别读取两个input数据,在内存中进行了计算。

这种方式有明显的限制条件:

  • a. 两个输入数据集,有严格的读入顺序。由于Hadoop不能指定读入顺序,因此在多节点的Hadoop集群环境,读入顺序有可能会发生错误,造成程序的空指针错误。
  • b. 这个计算过程,在内存中实现。如果矩阵过大,会造成单节点的内存不足。

做为优化的方案,我们需要对Step4的过程,实现MapReduce的矩阵乘法,矩阵算法原理请参考文章:用MapReduce实现矩阵乘法

对Step4优化的实现:把矩阵计算通过两个MapReduce过程实现。

  • 矩阵乘法过程类文件:Step4_Update.java
  • 矩阵加法过程类文件:Step4_Update2.java
  • 修改启动程序:Recommend.java

增加文件:Step4_Update.java


package org.conan.myhadoop.recommend;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Step4_Update {

    public static class Step4_PartialMultiplyMapper extends Mapper {

        private String flag;// A同现矩阵 or B评分矩阵

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();// 判断读的数据集

            // System.out.println(flag);
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = Recommend.DELIMITER.split(values.toString());

            if (flag.equals("step3_2")) {// 同现矩阵
                String[] v1 = tokens[0].split(":");
                String itemID1 = v1[0];
                String itemID2 = v1[1];
                String num = tokens[1];

                Text k = new Text(itemID1);
                Text v = new Text("A:" + itemID2 + "," + num);

                context.write(k, v);
                // System.out.println(k.toString() + "  " + v.toString());

            } else if (flag.equals("step3_1")) {// 评分矩阵
                String[] v2 = tokens[1].split(":");
                String itemID = tokens[0];
                String userID = v2[0];
                String pref = v2[1];

                Text k = new Text(itemID);
                Text v = new Text("B:" + userID + "," + pref);

                context.write(k, v);
                // System.out.println(k.toString() + "  " + v.toString());
            }
        }

    }

    public static class Step4_AggregateReducer extends Reducer {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            System.out.println(key.toString() + ":");

            Map mapA = new HashMap();
            Map mapB = new HashMap();

            for (Text line : values) {
                String val = line.toString();
                System.out.println(val);

                if (val.startsWith("A:")) {
                    String[] kv = Recommend.DELIMITER.split(val.substring(2));
                    mapA.put(kv[0], kv[1]);

                } else if (val.startsWith("B:")) {
                    String[] kv = Recommend.DELIMITER.split(val.substring(2));
                    mapB.put(kv[0], kv[1]);

                }
            }

            double result = 0;
            Iterator iter = mapA.keySet().iterator();
            while (iter.hasNext()) {
                String mapk = iter.next();// itemID

                int num = Integer.parseInt(mapA.get(mapk));
                Iterator iterb = mapB.keySet().iterator();
                while (iterb.hasNext()) {
                    String mapkb = iterb.next();// userID
                    double pref = Double.parseDouble(mapB.get(mapkb));
                    result = num * pref;// 矩阵乘法相乘计算

                    Text k = new Text(mapkb);
                    Text v = new Text(mapk + "," + result);
                    context.write(k, v);
                    System.out.println(k.toString() + "  " + v.toString());
                }
            }
        }
    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = Recommend.config();

        String input1 = path.get("Step5Input1");
        String input2 = path.get("Step5Input2");
        String output = path.get("Step5Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(Step4_Update.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Step4_Update.Step4_PartialMultiplyMapper.class);
        job.setReducerClass(Step4_Update.Step4_AggregateReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

}

增加文件:Step4_Update2.java


package org.conan.myhadoop.recommend;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Step4_Update2 {

    public static class Step4_RecommendMapper extends Mapper {

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = Recommend.DELIMITER.split(values.toString());
            Text k = new Text(tokens[0]);
            Text v = new Text(tokens[1]+","+tokens[2]);
            context.write(k, v);
        }
    }

    public static class Step4_RecommendReducer extends Reducer {
        
        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            System.out.println(key.toString() + ":");
            Map map = new HashMap();// 结果
            
            for (Text line : values) {
                System.out.println(line.toString());
                String[] tokens = Recommend.DELIMITER.split(line.toString());
                String itemID = tokens[0];
                Double score = Double.parseDouble(tokens[1]);
                
                 if (map.containsKey(itemID)) {
                     map.put(itemID, map.get(itemID) + score);// 矩阵乘法求和计算
                 } else {
                     map.put(itemID, score);
                 }
            }
            
            Iterator iter = map.keySet().iterator();
            while (iter.hasNext()) {
                String itemID = iter.next();
                double score = map.get(itemID);
                Text v = new Text(itemID + "," + score);
                context.write(key, v);
            }
        }
    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = Recommend.config();

        String input = path.get("Step6Input");
        String output = path.get("Step6Output");

        HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(Step4_Update2.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(Step4_Update2.Step4_RecommendMapper.class);
        job.setReducerClass(Step4_Update2.Step4_RecommendReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

}

修改Recommend.java


package org.conan.myhadoop.recommend;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.mapred.JobConf;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Recommend {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static void main(String[] args) throws Exception {
        Map path = new HashMap();
        path.put("data", "logfile/small.csv");
        path.put("Step1Input", HDFS + "/user/hdfs/recommend");
        path.put("Step1Output", path.get("Step1Input") + "/step1");
        path.put("Step2Input", path.get("Step1Output"));
        path.put("Step2Output", path.get("Step1Input") + "/step2");
        path.put("Step3Input1", path.get("Step1Output"));
        path.put("Step3Output1", path.get("Step1Input") + "/step3_1");
        path.put("Step3Input2", path.get("Step2Output"));
        path.put("Step3Output2", path.get("Step1Input") + "/step3_2");
        
        path.put("Step4Input1", path.get("Step3Output1"));
        path.put("Step4Input2", path.get("Step3Output2"));
        path.put("Step4Output", path.get("Step1Input") + "/step4");
        
        path.put("Step5Input1", path.get("Step3Output1"));
        path.put("Step5Input2", path.get("Step3Output2"));
        path.put("Step5Output", path.get("Step1Input") + "/step5");
        
        path.put("Step6Input", path.get("Step5Output"));
        path.put("Step6Output", path.get("Step1Input") + "/step6");       

        Step1.run(path);
        Step2.run(path);
        Step3.run1(path);
        Step3.run2(path);
        //Step4.run(path);
        
        Step4_Update.run(path);
        Step4_Update2.run(path);
               
        System.exit(0);
    }

    public static JobConf config() {
        JobConf conf = new JobConf(Recommend.class);
        conf.setJobName("Recommand");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        conf.set("io.sort.mb", "1024");
        return conf;
    }

}

运行Step4_Update.java,查看输出结果


~ hadoop fs -cat /user/hdfs/recommend/step5/part-r-00000

3       107,2.0
2       107,2.0
1       107,5.0
5       107,4.0
4       107,5.0
3       106,4.0
2       106,4.0
1       106,10.0
5       106,8.0
4       106,10.0
3       105,4.0
2       105,4.0
1       105,10.0
5       105,8.0
4       105,10.0
3       104,8.0
2       104,8.0
1       104,20.0
5       104,16.0
4       104,20.0
3       103,8.0
2       103,8.0
1       103,20.0
5       103,16.0
4       103,20.0
3       102,6.0
2       102,6.0
1       102,15.0
5       102,12.0
4       102,15.0
3       101,10.0
2       101,10.0
1       101,25.0
5       101,20.0
4       101,25.0
2       106,2.5
1       106,3.0
5       106,3.0
2       105,2.5
1       105,3.0
5       105,3.0
2       104,5.0
1       104,6.0
5       104,6.0
2       103,7.5
1       103,9.0
5       103,9.0
2       102,7.5
1       102,9.0
5       102,9.0
2       101,7.5
1       101,9.0
5       101,9.0
2       106,10.0
1       106,5.0
5       106,4.0
4       106,6.0
2       105,5.0
1       105,2.5
5       105,2.0
4       105,3.0
2       104,15.0
1       104,7.5
5       104,6.0
4       104,9.0
2       103,20.0
1       103,10.0
5       103,8.0
4       103,12.0
2       102,15.0
1       102,7.5
5       102,6.0
4       102,9.0
2       101,20.0
1       101,10.0
5       101,8.0
4       101,12.0
3       107,4.0
2       107,2.0
5       107,4.0
4       107,4.5
3       106,8.0
2       106,4.0
5       106,8.0
4       106,9.0
3       105,8.0
2       105,4.0
5       105,8.0
4       105,9.0
3       104,16.0
2       104,8.0
5       104,16.0
4       104,18.0
3       103,12.0
2       103,6.0
5       103,12.0
4       103,13.5
3       102,8.0
2       102,4.0
5       102,8.0
4       102,9.0
3       101,16.0
2       101,8.0
5       101,16.0
4       101,18.0
3       107,4.5
5       107,3.5
3       106,4.5
5       106,3.5
3       105,9.0
5       105,7.0
3       104,9.0
5       104,7.0
3       103,4.5
5       103,3.5
3       102,4.5
5       102,3.5
3       101,9.0
5       101,7.0
5       106,8.0
4       106,8.0
5       105,4.0
4       105,4.0
5       104,8.0
4       104,8.0
5       103,8.0
4       103,8.0
5       102,4.0
4       102,4.0
5       101,8.0
4       101,8.0
3       107,5.0
3       105,5.0
3       104,5.0
3       101,5.0

运行Step4_Update2.java,查看输出结果


~ hadoop fs -cat /user/hdfs/recommend/step6/part-r-00000

1       107,5.0
1       106,18.0
1       105,15.5
1       104,33.5
1       103,39.0
1       102,31.5
1       101,44.0
2       107,4.0
2       106,20.5
2       105,15.5
2       104,36.0
2       103,41.5
2       102,32.5
2       101,45.5
3       107,15.5
3       106,16.5
3       105,26.0
3       104,38.0
3       103,24.5
3       102,18.5
3       101,40.0
4       107,9.5
4       106,33.0
4       105,26.0
4       104,55.0
4       103,53.5
4       102,37.0
4       101,63.0
5       107,11.5
5       106,34.5
5       105,32.0
5       104,59.0
5       103,56.5
5       102,42.5
5       101,68.0

这样我们就把原来内存中计算的部分,通过MapReduce实现了,结果与之间Step4的结果一致。

代码已经更新到github,请需要的同学更新查看。
https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myhadoop/recommend

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-recommend/

打赏作者

Mahout分步式程序开发 基于物品的协同过滤ItemCF

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-mapreduce-itemcf/

mahout-hadoop-itemcf

前言

Mahout是Hadoop家族一员,从血缘就继承了Hadoop程序的特点,支持HDFS访问和MapReduce分步式算法。随着Mahout的发展,从0.7版本开始,Mahout做了重大的升级。移除了部分算法的单机内存计算,只支持基于Hadoop的MapReduce平行计算。

从这点上,我们能看出Mahout走向大数据,坚持并行化的决心!相信在Hadoop的大框架下,Mahout最终能成为一个大数据的明星产品!

目录

  1. Mahout开发环境介绍
  2. Mahout基于Hadoop的分步环境介绍
  3. 用Mahout实现协同过滤ItemCF
  4. 模板项目上传github

1. Mahout开发环境介绍

用Maven构建Mahout项目 文章中,我们已经配置好了基于Maven的Mahout的开发环境,我们将继续完成Mahout的分步式的程序开发。

本文的mahout版本为0.8。

开发环境:

  • Win7 64bit
  • Java 1.6.0_45
  • Maven 3
  • Eclipse Juno Service Release 2
  • Mahout 0.8
  • Hadoop 1.1.2

找到pom.xml,修改mahout版本为0.8

<mahout.version>0.8</mahout.version>

然后,下载依赖库。

~ mvn clean install

由于 org.conan.mymahout.cluster06.Kmeans.java 类代码,是基于mahout-0.6的,所以会报错。我们可以先注释这个文件。

2. Mahout基于Hadoop的分步环境介绍

hadoop-mahout-cluster-dev

如上图所示,我们可以选择在win7中开发,也可以在linux中开发,开发过程我们可以在本地环境进行调试,标配的工具都是Maven和Eclipse。

Mahout在运行过程中,会把MapReduce的算法程序包,自动发布的Hadoop的集群环境中,这种开发和运行模式,就和真正的生产环境差不多了。

3. 用Mahout实现协同过滤ItemCF

实现步骤:

  • 1. 准备数据文件: item.csv
  • 2. Java程序:HdfsDAO.java
  • 3. Java程序:ItemCFHadoop.java
  • 4. 运行程序
  • 5. 推荐结果解读

1). 准备数据文件: item.csv
上传测试数据到HDFS,单机内存实验请参考文章:用Maven构建Mahout项目


~ hadoop fs -mkdir /user/hdfs/userCF
~ hadoop fs -copyFromLocal /home/conan/datafiles/item.csv /user/hdfs/userCF

~ hadoop fs -cat /user/hdfs/userCF/item.csv
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

2). Java程序:HdfsDAO.java
HdfsDAO.java,是一个HDFS操作的工具,用API实现Hadoop的各种HDFS命令,请参考文章:Hadoop编程调用HDFS

我们这里会用到HdfsDAO.java类中的一些方法:


        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(inPath);
        hdfs.mkdirs(inPath);
        hdfs.copyFile(localFile, inPath);
        hdfs.ls(inPath);
        hdfs.cat(inFile);

3). Java程序:ItemCFHadoop.java
用Mahout实现分步式算法,我们看到Mahout in Action中的解释。

aglorithm_2

实现程序:


package org.conan.mymahout.recommendation;

import org.apache.hadoop.mapred.JobConf;
import org.apache.mahout.cf.taste.hadoop.item.RecommenderJob;
import org.conan.mymahout.hdfs.HdfsDAO;

public class ItemCFHadoop {

    private static final String HDFS = "hdfs://192.168.1.210:9000";

    public static void main(String[] args) throws Exception {
        String localFile = "datafile/item.csv";
        String inPath = HDFS + "/user/hdfs/userCF";
        String inFile = inPath + "/item.csv";
        String outPath = HDFS + "/user/hdfs/userCF/result/";
        String outFile = outPath + "/part-r-00000";
        String tmpPath = HDFS + "/tmp/" + System.currentTimeMillis();

        JobConf conf = config();
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(inPath);
        hdfs.mkdirs(inPath);
        hdfs.copyFile(localFile, inPath);
        hdfs.ls(inPath);
        hdfs.cat(inFile);

        StringBuilder sb = new StringBuilder();
        sb.append("--input ").append(inPath);
        sb.append(" --output ").append(outPath);
        sb.append(" --booleanData true");
        sb.append(" --similarityClassname org.apache.mahout.math.hadoop.similarity.cooccurrence.measures.EuclideanDistanceSimilarity");
        sb.append(" --tempDir ").append(tmpPath);
        args = sb.toString().split(" ");

        RecommenderJob job = new RecommenderJob();
        job.setConf(conf);
        job.run(args);

        hdfs.cat(outFile);
    }

    public static JobConf config() {
        JobConf conf = new JobConf(ItemCFHadoop.class);
        conf.setJobName("ItemCFHadoop");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }
}

RecommenderJob.java,实际上就是封装了,上面整个图的分步式并行算法的执行过程!如果没有这层封装,我们需要自己去实现图中8个步骤MapReduce算法。

关于上面算法的深度剖析,请参考文章:R实现MapReduce的协同过滤算法

4). 运行程序
控制台输出:


Delete: hdfs://192.168.1.210:9000/user/hdfs/userCF
Create: hdfs://192.168.1.210:9000/user/hdfs/userCF
copy from: datafile/item.csv to hdfs://192.168.1.210:9000/user/hdfs/userCF
ls: hdfs://192.168.1.210:9000/user/hdfs/userCF
==========================================================
name: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv, folder: false, size: 229
==========================================================
cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv
1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2013-10-14 10:26:35 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-10-14 10:26:35 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:35 org.apache.hadoop.io.compress.snappy.LoadSnappy 
警告: Snappy native library not loaded
2013-10-14 10:26:36 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:36 org.apache.hadoop.io.compress.CodecPool getCompressor
信息: Got brand-new compressor
2013-10-14 10:26:36 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:36 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:36 org.apache.hadoop.io.compress.CodecPool getDecompressor
信息: Got brand-new decompressor
2013-10-14 10:26:36 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 42 bytes
2013-10-14 10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2013-10-14 10:26:36 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/preparePreferenceMatrix/itemIDIndex
2013-10-14 10:26:36 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:36 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2013-10-14 10:26:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=187
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=3287330
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=916
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=3443292
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=645
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=229
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=46
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Map input records=21
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=14
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=84
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=376569856
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=116
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=21
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=7
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=7
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=7
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=7
2013-10-14 10:26:37 org.apache.hadoop.mapred.Counters log
信息:     Map output records=21
2013-10-14 10:26:37 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:37 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0002
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:37 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0002_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0002_m_000000_0' done.
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 68 bytes
2013-10-14 10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0002_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0002_r_000000_0 is allowed to commit now
2013-10-14 10:26:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0002_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/preparePreferenceMatrix/userVectors
2013-10-14 10:26:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0002_r_000000_0' done.
2013-10-14 10:26:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0002
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:   org.apache.mahout.cf.taste.hadoop.item.ToUserVectorsReducer$Counters
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     USERS=5
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=288
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=6574274
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=1374
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=6887592
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=1120
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=229
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=72
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Map input records=21
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=42
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=63
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=575930368
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=116
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=21
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=5
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=5
2013-10-14 10:26:38 org.apache.hadoop.mapred.Counters log
信息:     Map output records=21
2013-10-14 10:26:38 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:38 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0003
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:38 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0003_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0003_m_000000_0' done.
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:38 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 89 bytes
2013-10-14 10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0003_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0003_r_000000_0 is allowed to commit now
2013-10-14 10:26:38 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0003_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/preparePreferenceMatrix/ratingMatrix
2013-10-14 10:26:38 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:38 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0003_r_000000_0' done.
2013-10-14 10:26:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0003
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息: Counters: 21
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=335
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:   org.apache.mahout.cf.taste.hadoop.preparation.ToItemVectorsMapper$Elements
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     USER_RATINGS_NEGLECTED=0
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     USER_RATINGS_USED=21
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=9861349
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=1950
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=10331958
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=1751
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=288
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=93
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Map input records=5
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=14
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=336
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=775290880
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=157
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=21
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=7
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=7
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=7
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=7
2013-10-14 10:26:39 org.apache.hadoop.mapred.Counters log
信息:     Map output records=21
2013-10-14 10:26:39 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:39 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0004
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:39 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0004_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0004_m_000000_0' done.
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:39 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 118 bytes
2013-10-14 10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0004_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0004_r_000000_0 is allowed to commit now
2013-10-14 10:26:39 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0004_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/weights
2013-10-14 10:26:39 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:39 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0004_r_000000_0' done.
2013-10-14 10:26:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0004
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=381
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=13148476
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=2628
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=13780408
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=2551
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=335
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:   org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob$Counters
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     ROWS=7
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=122
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Map input records=7
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=16
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=516
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=974651392
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=158
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=24
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=8
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=8
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=8
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=5
2013-10-14 10:26:40 org.apache.hadoop.mapred.Counters log
信息:     Map output records=24
2013-10-14 10:26:40 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:40 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0005
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:40 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0005_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0005_m_000000_0' done.
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:40 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 121 bytes
2013-10-14 10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0005_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0005_r_000000_0 is allowed to commit now
2013-10-14 10:26:40 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0005_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/pairwiseSimilarity
2013-10-14 10:26:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0005_r_000000_0' done.
2013-10-14 10:26:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0005
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息: Counters: 21
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=392
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=16435577
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=3488
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=17230010
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=3408
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=381
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:   org.apache.mahout.math.hadoop.similarity.cooccurrence.RowSimilarityJob$Counters
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     PRUNED_COOCCURRENCES=0
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     COOCCURRENCES=57
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=125
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Map input records=5
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=14
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=744
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1174011904
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=129
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=21
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=7
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=7
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=7
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=7
2013-10-14 10:26:41 org.apache.hadoop.mapred.Counters log
信息:     Map output records=21
2013-10-14 10:26:41 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0006
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:41 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0006_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0006_m_000000_0' done.
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:41 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 158 bytes
2013-10-14 10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0006_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0006_r_000000_0 is allowed to commit now
2013-10-14 10:26:41 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0006_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/similarityMatrix
2013-10-14 10:26:41 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:41 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0006_r_000000_0' done.
2013-10-14 10:26:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0006
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=554
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=19722740
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=4342
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=20674772
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=4354
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=392
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=162
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Map input records=7
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=14
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=599
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1373372416
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=140
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=25
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=7
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=7
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=7
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=7
2013-10-14 10:26:42 org.apache.hadoop.mapred.Counters log
信息:     Map output records=25
2013-10-14 10:26:42 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:42 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:42 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0007
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_m_000000_0' done.
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:42 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_m_000001_0 is done. And is in the process of commiting
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_m_000001_0' done.
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 2 sorted segments
2013-10-14 10:26:42 org.apache.hadoop.io.compress.CodecPool getDecompressor
信息: Got brand-new decompressor
2013-10-14 10:26:42 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 2 segments left of total size: 233 bytes
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0007_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0007_r_000000_0 is allowed to commit now
2013-10-14 10:26:42 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0007_r_000000_0' to hdfs://192.168.1.210:9000/tmp/1381717594500/partialMultiply
2013-10-14 10:26:42 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:42 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0007_r_000000_0' done.
2013-10-14 10:26:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0007
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=572
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=34517913
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=8751
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=36182630
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=7934
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=0
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=241
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Map input records=12
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=56
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=453
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=2558459904
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=665
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=28
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=7
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=7
2013-10-14 10:26:43 org.apache.hadoop.mapred.Counters log
信息:     Map output records=28
2013-10-14 10:26:43 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 1
2013-10-14 10:26:43 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0008
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-10-14 10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-10-14 10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-10-14 10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-10-14 10:26:43 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0008_m_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0008_m_000000_0' done.
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-10-14 10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 1 sorted segments
2013-10-14 10:26:43 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 1 segments left of total size: 206 bytes
2013-10-14 10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0008_r_000000_0 is done. And is in the process of commiting
2013-10-14 10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0008_r_000000_0 is allowed to commit now
2013-10-14 10:26:43 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0008_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/userCF/result
2013-10-14 10:26:43 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-10-14 10:26:43 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0008_r_000000_0' done.
2013-10-14 10:26:44 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-10-14 10:26:44 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0008
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=217
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=26299802
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=7357
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=27566408
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=6269
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=572
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=210
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Map input records=7
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=42
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=927
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1971453952
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=137
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=21
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=5
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=5
2013-10-14 10:26:44 org.apache.hadoop.mapred.Counters log
信息:     Map output records=21
cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/result//part-r-00000
1	[104:1.280239,106:1.1462644,105:1.0653841,107:0.33333334]
2	[106:1.560478,105:1.4795978,107:0.69935876]
3	[103:1.2475469,106:1.1944525,102:1.1462644]
4	[102:1.6462644,105:1.5277859,107:0.69935876]
5	[107:1.1993587]

5). 推荐结果解读
我们可以把上面的日志分解析成3个部分解读

  • a. 初始化环境
  • b. 算法执行
  • c. 打印推荐结果

a. 初始化环境
出初HDFS的数据目录和工作目录,并上传数据文件。


Delete: hdfs://192.168.1.210:9000/user/hdfs/userCF
Create: hdfs://192.168.1.210:9000/user/hdfs/userCF
copy from: datafile/item.csv to hdfs://192.168.1.210:9000/user/hdfs/userCF
ls: hdfs://192.168.1.210:9000/user/hdfs/userCF
==========================================================
name: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv, folder: false, size: 229
==========================================================
cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/item.csv

b. 算法执行
分别执行,上图中对应的8种MapReduce算法。
Job complete: job_local_0001
Job complete: job_local_0002
Job complete: job_local_0003
Job complete: job_local_0004
Job complete: job_local_0005
Job complete: job_local_0006
Job complete: job_local_0007
Job complete: job_local_0008

c. 打印推荐结果

方便我们看到计算后的推荐结果


cat: hdfs://192.168.1.210:9000/user/hdfs/userCF/result//part-r-00000
1	[104:1.280239,106:1.1462644,105:1.0653841,107:0.33333334]
2	[106:1.560478,105:1.4795978,107:0.69935876]
3	[103:1.2475469,106:1.1944525,102:1.1462644]
4	[102:1.6462644,105:1.5277859,107:0.69935876]
5	[107:1.1993587]

4. 模板项目上传github

https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.8

大家可以下载这个项目,做为开发的起点。


~ git clone https://github.com/bsspirit/maven_mahout_template
~ git checkout mahout-0.8

我们完成了基于物品的协同过滤分步式算法实现,下面将继续介绍Mahout的Kmeans的分步式算法实现,请参考文章:Mahout分步式程序开发 聚类Kmeans

转载请注明出处:
http://blog.fens.me/hadoop-mahout-mapreduce-itemcf/

打赏作者

用Maven构建Hadoop项目

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-maven-eclipse/

hadoop-maven

前言

Hadoop的MapReduce环境是一个复杂的编程环境,所以我们要尽可能地简化构建MapReduce项目的过程。Maven是一个很不错的自动化项目构建工具,通过Maven来帮助我们从复杂的环境配置中解脱出来,从而标准化开发过程。所以,写MapReduce之前,让我们先花点时间把刀磨快!!当然,除了Maven还有其他的选择Gradle(推荐), Ivy….

后面将会有介绍几篇MapReduce开发的文章,都要依赖于本文中Maven的构建的MapReduce环境。

目录

  1. Maven介绍
  2. Maven安装(win)
  3. Hadoop开发环境介绍
  4. 用Maven构建Hadoop环境
  5. MapReduce程序开发
  6. 模板项目上传github

1. Maven介绍

Apache Maven,是一个Java的项目管理及自动构建工具,由Apache软件基金会所提供。基于项目对象模型(缩写:POM)概念,Maven利用一个中央信息片断能管理一个项目的构建、报告和文档等步骤。曾是Jakarta项目的子项目,现为独立Apache项目。

maven的开发者在他们开发网站上指出,maven的目标是要使得项目的构建更加容易,它把编译、打包、测试、发布等开发过程中的不同环节有机的串联了起来,并产生一致的、高质量的项目信息,使得项目成员能够及时地得到反馈。maven有效地支持了测试优先、持续集成,体现了鼓励沟通,及时反馈的软件开发理念。如果说Ant的复用是建立在”拷贝–粘贴”的基础上的,那么Maven通过插件的机制实现了项目构建逻辑的真正复用。

2. Maven安装(win)

下载Maven:http://maven.apache.org/download.cgi

下载最新的xxx-bin.zip文件,在win上解压到 D:\toolkit\maven3

并把maven/bin目录设置在环境变量PATH:

win7-maven

然后,打开命令行输入mvn,我们会看到mvn命令的运行效果


~ C:\Users\Administrator>mvn
[INFO] Scanning for projects...
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 0.086s
[INFO] Finished at: Mon Sep 30 18:26:58 CST 2013
[INFO] Final Memory: 2M/179M
[INFO] ------------------------------------------------------------------------
[ERROR] No goals have been specified for this build. You must specify a valid lifecycle phase or a goal in the format : or :[:]:. Available lifecycle phases are: validate, initialize, generate-sources, process-sources, generate-resources, process-resources, compile, process-class
es, generate-test-sources, process-test-sources, generate-test-resources, process-test-resources, test-compile, process-test-classes, test, prepare-package, package, pre-integration-test, integration-test, post-integration-test, verify, install, deploy, pre-clean, clean, post-clean, pre-site, site, post-site, site-deploy. -> [Help 1]
[ERROR]
[ERROR] To see the full stack trace of the errors, re-run Maven with the -e switch.
[ERROR] Re-run Maven using the -X switch to enable full debug logging.
[ERROR]
[ERROR] For more information about the errors and possible solutions, please read the following articles:
[ERROR] [Help 1] http://cwiki.apache.org/confluence/display/MAVEN/NoGoalSpecifiedException

安装Eclipse的Maven插件:Maven Integration for Eclipse

Maven的Eclipse插件配置

eclipse-maven

3. Hadoop开发环境介绍

hadoop-dev

如上图所示,我们可以选择在win中开发,也可以在linux中开发,本地启动Hadoop或者远程调用Hadoop,标配的工具都是Maven和Eclipse。

Hadoop集群系统环境:

  • Linux: Ubuntu 12.04.2 LTS 64bit Server
  • Java: 1.6.0_29
  • Hadoop: hadoop-1.0.3,单节点,IP:192.168.1.210

4. 用Maven构建Hadoop环境

  • 1. 用Maven创建一个标准化的Java项目
  • 2. 导入项目到eclipse
  • 3. 增加hadoop依赖,修改pom.xml
  • 4. 下载依赖
  • 5. 从Hadoop集群环境下载hadoop配置文件
  • 6. 配置本地host

1). 用Maven创建一个标准化的Java项目


~ D:\workspace\java>mvn archetype:generate -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=org.conan.myhadoop.mr
-DartifactId=myHadoop -DpackageName=org.conan.myhadoop.mr -Dversion=1.0-SNAPSHOT -DinteractiveMode=false
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom >>>
[INFO]
[INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom <<<
[INFO]
[INFO] --- maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom ---
[INFO] Generating project in Batch mode
[INFO] No archetype defined. Using maven-archetype-quickstart (org.apache.maven.archetypes:maven-archetype-quickstart:1.
0)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archet
ype-quickstart-1.0.jar
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archety
pe-quickstart-1.0.jar (5 KB at 4.3 KB/sec)
Downloading: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archet
ype-quickstart-1.0.pom
Downloaded: http://repo.maven.apache.org/maven2/org/apache/maven/archetypes/maven-archetype-quickstart/1.0/maven-archety
pe-quickstart-1.0.pom (703 B at 1.6 KB/sec)
[INFO] ----------------------------------------------------------------------------
[INFO] Using following parameters for creating project from Old (1.x) Archetype: maven-archetype-quickstart:1.0
[INFO] ----------------------------------------------------------------------------
[INFO] Parameter: groupId, Value: org.conan.myhadoop.mr
[INFO] Parameter: packageName, Value: org.conan.myhadoop.mr
[INFO] Parameter: package, Value: org.conan.myhadoop.mr
[INFO] Parameter: artifactId, Value: myHadoop
[INFO] Parameter: basedir, Value: D:\workspace\java
[INFO] Parameter: version, Value: 1.0-SNAPSHOT
[INFO] project created from Old (1.x) Archetype in dir: D:\workspace\java\myHadoop
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 8.896s
[INFO] Finished at: Sun Sep 29 20:57:07 CST 2013
[INFO] Final Memory: 9M/179M
[INFO] ------------------------------------------------------------------------

进入项目,执行mvn命令


~ D:\workspace\java>cd myHadoop
~ D:\workspace\java\myHadoop>mvn clean install
[INFO]
[INFO] --- maven-jar-plugin:2.3.2:jar (default-jar) @ myHadoop ---
[INFO] Building jar: D:\workspace\java\myHadoop\target\myHadoop-1.0-SNAPSHOT.jar
[INFO]
[INFO] --- maven-install-plugin:2.3.1:install (default-install) @ myHadoop ---
[INFO] Installing D:\workspace\java\myHadoop\target\myHadoop-1.0-SNAPSHOT.jar to C:\Users\Administrator\.m2\repository\o
rg\conan\myhadoop\mr\myHadoop\1.0-SNAPSHOT\myHadoop-1.0-SNAPSHOT.jar
[INFO] Installing D:\workspace\java\myHadoop\pom.xml to C:\Users\Administrator\.m2\repository\org\conan\myhadoop\mr\myHa
doop\1.0-SNAPSHOT\myHadoop-1.0-SNAPSHOT.pom
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 4.348s
[INFO] Finished at: Sun Sep 29 20:58:43 CST 2013
[INFO] Final Memory: 11M/179M
[INFO] ------------------------------------------------------------------------

2). 导入项目到eclipse

我们创建好了一个基本的maven项目,然后导入到eclipse中。 这里我们最好已安装好了Maven的插件。

hadoop-eclipse

3). 增加hadoop依赖

这里我使用hadoop-1.0.3版本,修改文件:pom.xml


~ vi pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.conan.myhadoop.mr</groupId>
<artifactId>myHadoop</artifactId>
<packaging>jar</packaging>
<version>1.0-SNAPSHOT</version>
<name>myHadoop</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.0.3</version>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

4). 下载依赖

下载依赖:

~ mvn clean install

在eclipse中刷新项目:

hadoop-eclipse-maven

项目的依赖程序,被自动加载的库路径下面。

5). 从Hadoop集群环境下载hadoop配置文件

    • core-site.xml
    • hdfs-site.xml
    • mapred-site.xml

查看core-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://master:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/conan/hadoop/tmp</value>
</property>
<property>
<name>io.sort.mb</name>
<value>256</value>
</property>
</configuration>

查看hdfs-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>dfs.data.dir</name>
<value>/home/conan/hadoop/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>

查看mapred-site.xml


<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>
<property>
<name>mapred.job.tracker</name>
<value>hdfs://master:9001</value>
</property>
</configuration>

保存在src/main/resources/hadoop目录下面

hadoop-config

删除原自动生成的文件:App.java和AppTest.java

6).配置本地host,增加master的域名指向


~ vi c:/Windows/System32/drivers/etc/hosts

192.168.1.210 master

6. MapReduce程序开发

编写一个简单的MapReduce程序,实现wordcount功能。

新一个Java文件:WordCount.java


package org.conan.myhadoop.mr;

import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {

    public static class WordCountMapper extends MapReduceBase implements Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());
                output.collect(word, one);
            }

        }
    }

    public static class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();

        @Override
        public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
            int sum = 0;
            while (values.hasNext()) {
                sum += values.next().get();
            }
            result.set(sum);
            output.collect(key, result);
        }
    }

    public static void main(String[] args) throws Exception {
        String input = "hdfs://192.168.1.210:9000/user/hdfs/o_t_account";
        String output = "hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result";

        JobConf conf = new JobConf(WordCount.class);
        conf.setJobName("WordCount");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(WordCountMapper.class);
        conf.setCombinerClass(WordCountReducer.class);
        conf.setReducerClass(WordCountReducer.class);

        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(input));
        FileOutputFormat.setOutputPath(conf, new Path(output));

        JobClient.runJob(conf);
        System.exit(0);
    }

}

启动Java APP.

控制台错误


2013-9-30 19:25:02 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-9-30 19:25:02 org.apache.hadoop.security.UserGroupInformation doAs
严重: PriviledgedActionException as:Administrator cause:java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Administrator\mapred\staging\Administrator1702422322\.staging to 0700
Exception in thread "main" java.io.IOException: Failed to set permissions of path: \tmp\hadoop-Administrator\mapred\staging\Administrator1702422322\.staging to 0700
	at org.apache.hadoop.fs.FileUtil.checkReturnValue(FileUtil.java:689)
	at org.apache.hadoop.fs.FileUtil.setPermission(FileUtil.java:662)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:509)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:344)
	at org.apache.hadoop.fs.FilterFileSystem.mkdirs(FilterFileSystem.java:189)
	at org.apache.hadoop.mapreduce.JobSubmissionFiles.getStagingDir(JobSubmissionFiles.java:116)
	at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:856)
	at org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:396)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
	at org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:850)
	at org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:824)
	at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1261)
	at org.conan.myhadoop.mr.WordCount.main(WordCount.java:78)

这个错误是win中开发特有的错误,文件权限问题,在Linux下可以正常运行。

解决方法是,修改/hadoop-1.0.3/src/core/org/apache/hadoop/fs/FileUtil.java文件

688-692行注释,然后重新编译源代码,重新打一个hadoop.jar的包。


685 private static void checkReturnValue(boolean rv, File p,
686                                        FsPermission permission
687                                        ) throws IOException {
688     /*if (!rv) {
689       throw new IOException("Failed to set permissions of path: " + p +
690                             " to " +
691                             String.format("%04o", permission.toShort()));
692     }*/
693   }

我这里自己打了一个hadoop-core-1.0.3.jar包,放到了lib下面。

我们还要替换maven中的hadoop类库。


~ cp lib/hadoop-core-1.0.3.jar C:\Users\Administrator\.m2\repository\org\apache\hadoop\hadoop-core\1.0.3\hadoop-core-1.0.3.jar

再次启动Java APP,控制台输出:


2013-9-30 19:50:49 org.apache.hadoop.util.NativeCodeLoader 
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2013-9-30 19:50:49 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2013-9-30 19:50:49 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
2013-9-30 19:50:49 org.apache.hadoop.io.compress.snappy.LoadSnappy 
警告: Snappy native library not loaded
2013-9-30 19:50:49 org.apache.hadoop.mapred.FileInputFormat listStatus
信息: Total input paths to process : 4
2013-9-30 19:50:50 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2013-9-30 19:50:50 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:50 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:50 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2013-9-30 19:50:51 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 0% reduce 0%
2013-9-30 19:50:53 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00003:0+119
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:53 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:53 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
2013-9-30 19:50:54 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2013-9-30 19:50:56 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00000:0+113
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000001_0' done.
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:56 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:56 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000002_0 is done. And is in the process of commiting
2013-9-30 19:50:59 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00001:0+110
2013-9-30 19:50:59 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00001:0+110
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000002_0' done.
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask runOldMapper
信息: numReduceTasks: 1
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: io.sort.mb = 100
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: data buffer = 79691776/99614720
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer 
信息: record buffer = 262144/327680
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2013-9-30 19:50:59 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2013-9-30 19:50:59 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000003_0 is done. And is in the process of commiting
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: hdfs://192.168.1.210:9000/user/hdfs/o_t_account/part-m-00002:0+79
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000003_0' done.
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 4 sorted segments
2013-9-30 19:51:02 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 4 segments left of total size: 442 bytes
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2013-9-30 19:51:02 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2013-9-30 19:51:02 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2013-9-30 19:51:02 org.apache.hadoop.mapred.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result
2013-9-30 19:51:05 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2013-9-30 19:51:05 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2013-9-30 19:51:06 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2013-9-30 19:51:06 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息: Counters: 20
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=421
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=348
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=7377
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=1535
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=209510
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=348
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=458
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map input records=11
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=30
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=509
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=1838546944
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map input bytes=421
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=452
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=22
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=15
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=13
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=15
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=13
2013-9-30 19:51:06 org.apache.hadoop.mapred.Counters log
信息:     Map output records=22

成功运行了wordcount程序,通过命令我们查看输出结果


~ hadoop fs -ls hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result

Found 2 items
-rw-r--r--   3 Administrator supergroup          0 2013-09-30 19:51 /user/hdfs/o_t_account/result/_SUCCESS
-rw-r--r--   3 Administrator supergroup        348 2013-09-30 19:51 /user/hdfs/o_t_account/result/part-00000

~ hadoop fs -cat hdfs://192.168.1.210:9000/user/hdfs/o_t_account/result/part-00000

1,abc@163.com,2013-04-22        1
10,ade121@sohu.com,2013-04-23   1
11,addde@sohu.com,2013-04-23    1
17:21:24.0      5
2,dedac@163.com,2013-04-22      1
20:21:39.0      6
3,qq8fed@163.com,2013-04-22     1
4,qw1@163.com,2013-04-22        1
5,af3d@163.com,2013-04-22       1
6,ab34@163.com,2013-04-22       1
7,q8d1@gmail.com,2013-04-23     1
8,conan@gmail.com,2013-04-23    1
9,adeg@sohu.com,2013-04-23      1

这样,我们就实现了在win7中的开发,通过Maven构建Hadoop依赖环境,在Eclipse中开发MapReduce的程序,然后运行JavaAPP。Hadoop应用会自动把我们的MR程序打成jar包,再上传的远程的hadoop环境中运行,返回日志在Eclipse控制台输出。

7. 模板项目上传github

https://github.com/bsspirit/maven_hadoop_template

大家可以下载这个项目,做为开发的起点。

~ git clone https://github.com/bsspirit/maven_hadoop_template.git

我们完成第一步,下面就将正式进入MapReduce开发实践。

 

转载请注明出处:
http://blog.fens.me/hadoop-maven-eclipse/

打赏作者