• Archive by category "程序算法"
  • (Page 2)

Blog Archives

快速排序的Nodejs实现

算法为王系列文章,涵盖了计算机算法,数据挖掘(机器学习)算法,统计算法,金融算法等的多种跨学科算法组合。在大数据时代的背景下,算法已经成为了金字塔顶的明星。一个好的算法可以创造一个伟大帝国,就像Google。

算法为王的时代正式到来….

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/algorithm-quicksort-nodejs/

quicksort-nodejs

前言

排序算法,是程序开发中最基本的一项任务。教科课上讲的排序算法大概有7-8种,快速排序(QuickSort)是使用最广泛的一种基于比较排序的算法。网上已经有了各种语言的实现。本文则是利用快速排序的原理,实现对数组对象中的属性进行排序,利用Nodejs来实现。

目录

  1. 快速排序介绍
  2. 快速排序算法演示
  3. Nodejs程序实现

1. 快速排序算法介绍

快速排序是由东尼·霍尔所发展的一种排序算法。在平均状况下,排序 n 个项目要Ο(n log n)次比较。在最坏状况下则需要Ο(n^2)次比较,但这种状况并不常见。事实上,快速排序通常明显比其他Ο(n log n) 算法更快,因为它的内部循环(inner loop)可以在大部分的架构上很有效率地被实现出来。快速排序使用分治法(Divide and conquer)策略来把一个串行(list)分为两个子串行(sub-lists)。

快速排序的思想很简单,排序过程只需要三步:

  • 1. 从数列中挑出一个元素,称为 “基准”(pivot)
  • 2. 重新排序数列,所有元素比基准值小的摆放在基准前面,所有元素比基准值大的摆在基准的后面(相同的数可以到任一边)。在这个分区退出之后,该基准就处于数列的中间位置。这个称为分区(partition)操作。
  • 3. 递归地(recursive)把小于基准值元素的子数列和大于基准值元素的子数列排序。

递归的最底部情形,是数列的大小是零或一,也就是永远都已经被排序好了。虽然一直递归下去,但是这个算法总会退出,因为在每次的迭代(iteration)中,它至少会把一个元素摆到它最后的位置去。

摘自: http://zh.wikipedia.org/wiki/快速排序

2. 快速排序算法演示

举例来说,现在有一组数据[9,23,12,11,43,54,43,2,12,66],怎么对其按从小到大顺序排序呢?

quicksort

  • Step1,选择第一个元素9作为分隔值,把小于9的数字放左边,大于等于9的数字放右边。原来的一个数组就被分成了3个部分,左边+分隔值+右边=[2]+9+[23 12 11 43 54 43 12 66]
  • Step2, 选择Step1数组左边的第一元素2作为分隔值,左边只有一个元素,左边排序完成。选择Step1数组右边的第一个元素23作为分隔值,把小于23的数字放左边,大于等于23的数字放右边,得到[ 12 11 12 ] 23 [ 43 54 43 66 ]。
  • Step3,选择Step2数组以23分隔的左边第一个元素12作为分隔值,把小于12的数字放左边,大于等于12的数字放右边,得到[ 11 ] 12 [ 12 ]。选择Step2数组以23分隔的左边第一个元素43作为分割值,把小于43的数字放左边,大于等于43的数字右边,得到 [] 43 [ 54 43 66 ]。
  • Step4,对Step3数组以12分隔的左右两边进行排序,都只有一个元素,排序完成。对Step3数组以43分隔左右两进行排序,左边无元素,右边第一个元素54作为分隔值,得到[ 43 ] 54 [ 66 ]。
  • Step5,对Step4数组以54分隔的左右两边进行排序,都只有一个元素,排序完成。
  • 最后,后整个数组拼接在一起就得到排序结果:[2 9 11 12 12 23 43 43 54 66 ]

我们看到快速排序,其实就是一种分而治之的办法。网上还有很多快速排序的优化方法,可以尽一步减少时间复杂度和空间复杂度的开销。

3. Nodejs程序实现

像快速排序这种成熟的算法,我以为在Nodejs中早就有人已经封装好了程序包。等我用到的时候竟然找不到,所以自己花了点时间,动手实现了快速排序的Node模块。

当然,除了支持对普通数组的快速排序,还支持按数组对象中的某个属性进行快速排序,已满足自己的功能需求。

3.1 普通数组的快速排序

这部分的实现就和上文中演示的思路是一样的,我另外加了方向的判断。可以从小到大排序,也可以从大到小排序。

现实代码如下:


/**
 * 普通数组快速排序
 *
 * @param arr Array 数字数组
 * @param dir asc升序、desc降序
 *
 * @example:
 * sort([1,4,2,5])
 * sort([1,4,2,5],'asc')
 * sort([1,4,2,5],'desc')
 */
exports.sort=function(arr,dir){
    dir=dir||'asc';
    if (arr.length == 0) return [];

    var left = new Array();
    var right = new Array();
    var pivot = arr[0];

    if(dir==='asc'){//升序
        for (var i = 1; i < arr.length; i++) {
            arr[i] < pivot ? left.push(arr[i]): right.push(arr[i]);
        }
    }else{//降序
        for (var i = 1; i < arr.length; i++) {
            arr[i] > pivot ? left.push(arr[i]): right.push(arr[i]);
        }
    }
    return _this.sort(left,dir).concat(pivot, _this.sort(right,dir));
}

运行程序:


var algo = require('./index.js');
var arr = [23,12,11,43,54,43,2,12,66];
console.log(arr);
console.log(algo.quicksort.sort(arr));
console.log(algo.quicksort.sort(arr,'desc'));

//output
[ 23, 12, 11, 43, 54, 43, 2, 12, 66 ]
[ 2, 11, 12, 12, 23, 43, 43, 54, 66 ]
[ 66, 54, 43, 43, 23, 12, 12, 11, 2 ]

3.2 按数组对象中的属性进行快速排序

按数组对象中的属性快速排序,是指在数组中存储的不是数字类型,而是对象类型,如[{name:’小B’,id:12},{name:’小C’,id:21},{name:’小A’,id:2}],然后想要按对象中的id属性把对象排序。这样需求,其实在程序开发中更常见。那我们怎么做呢?其实,原理是一样的,只要把分隔值(pivot)和存储值(pivotObj)分成2个变量就行了。

实现代码如下:


/**
 * 对象数组快速排序
 *
 * @param arr Array 对象数组
 * @param key string 用于排序的属性
 * @param dir asc升序、desc降序
 *
 * @example:
 * sort([{name:'b',id:12},{name:'c',id:21},{name:'a',id:2}],'id')
 * sort([{name:'b',id:12},{name:'c',id:21},{name:'a',id:2}],'id','asc')
 * sort([{name:'b',id:12},{name:'c',id:21},{name:'a',id:2}],'id','desc')
 */
exports.sortObj=function(arr,key,dir){
    key=key||'id';
    dir=dir||'asc';
    if (arr.length == 0) return [];

    var left = new Array();
    var right = new Array();
    var pivot = arr[0][key];//分割值
    var pivotObj = arr[0];//存储值

    if(dir==='asc'){//升序
        for (var i = 1; i < arr.length; i++) {
            arr[i][key] < pivot ? left.push(arr[i]): right.push(arr[i]);
        }
    }else{//降序
        for (var i = 1; i < arr.length; i++) {
            arr[i][key] > pivot ? left.push(arr[i]): right.push(arr[i]);
        }
    }
    return _this.sortObj(left,key,dir).concat(pivotObj, _this.sortObj(right,key,dir));
}

运行程序:


var algo = require('./index.js');
var arrObj = [{name:'b',id:12},{name:'c',id:21},{name:'a',id:2}];
console.log(arrObj);
console.log(algo.quicksort.sortObj(arrObj,'id','asc'));
console.log(algo.quicksort.sortObj(arrObj,'id','desc'));

//output
[ { name: 'b', id: 12 },{ name: 'c', id: 21 },{ name: 'a', id: 2 } ]
[ { name: 'a', id: 2 },{ name: 'b', id: 12 },{ name: 'c', id: 21 } ]
[ { name: 'c', id: 21 },{ name: 'b', id: 12 },{ name: 'a', id: 2 } ]

不用多少代码,就能实现快速排序。程序源代码已上传到github, https://github.com/bsspirit/ape-algorithm。下载后,可以参考example.js文件中,来调用快速排序程序。

同时也在npm发布了,安装命令:

npm install ape-algorithm

有时候写写算法,感觉又回到了学生时代,还是挺有意思的。

转载请注明出处:
http://blog.fens.me/algorithm-quicksort-nodejs/

打赏作者

PeopleRank从社交网络中发现个体价值

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-social-peoplerank/

peoplerank

前言

如果说Google改变了互联网,那么社交网络就改变人们的生活方式。通过社交网络,我们每个个体,都是成为了网络的中心。我们的生活半径,被无限放大,通过6个朋友关系,就可以认识世界上任何一个人。

未来的互联网将是属于我们每一个人。

目录

  1. PeopleRank和PageRank
  2. 需求分析:从社交网络中发现个体价值
  3. 算法模型:PeopleRank算法
  4. 架构设计:PeopleRank计算引擎系统架构
  5. 程序开发:PeopleRank算法实现

1. PeopleRank和PageRank

PageRank让Google成为搜索领域的No.1,也是当今最有影响力的互联网公司之一,用技术创新改变人们的生活。PageRank主要用于网页评分计算,把互联网上的所有网页都进行打分,给网页价值的体现。

自2012以来,中国开始进入社交网络的时代,开心网,人人网,新浪微博,腾讯微博,微信等社交网络应用,开始进入大家的生活。最早是由“抢车位”,“偷菜”等社交游戏带动的社交网络的兴起,如今人们会更多的利用社交网络,获取信息和分享信息。我们的互联网,正在从以网页信息为核心的网络,向着以人为核心的网络转变着。

于是有人就提出了,把PageRank模型应用于社交网络,定义以人为核心的个体价值。这样PageRank模型就有了新的应用领域,同时也有了一个新的名字PeopleRank。

关于PageRank的介绍,请参考文章:PageRank算法R语言实现

注:PeopleRank网上还有不同的解释,我这里仅仅表示用来解释“PageRank模型”。

下面我们将从一个PeopleRank的案例来解释,如何从社交网络中发现个体价值。

2. 需求分析:从社交网络中发现个体价值

案例介绍:
以新浪微博为例,给微博中每个用户进行评分!
从新浪微博上,把我们的关注和粉丝的关系都找到。

如下图:我在微博上,随便找了几个微博账号。

weibo-logo

我们的任务是,需要给这些账号评分!

  • 方法一,简单求和:评分=关注数+粉丝数+微博数
  • 方法二,加权求和:评分=a*关注数+b*粉丝数+c*微博数

新建数据文件:weibo.csv


~ vi weibo.csv

A,314,1091,1488
B,196,10455,327
C,557,51635228,13793
D,55,14655464,1681
E,318,547,4899
F,166,145,170
G,17,890,169
H,54,946759,17229

R语言读入数据文件


weibo<-read.csv(file="weibo.csv",header=FALSE)
names(weibo)<-c("id","follow","fans","tweet")

1). 方法一,简单相加法


> data.frame(weibo[1],rank=rowSums(weibo[2:4]))
  id     rank
1  A     2893
2  B    10978
3  C 51649578
4  D 14657200
5  E     5764
6  F      481
7  G     1076
8  H   964042

这种方法简单粗暴的方式,是否能代码公平的打分呢?!

2). 方法二,加权求和

通过a,b,c的3个参数,分别设置权重求和。与方法一存在同样的问题,a,b,c的权值都是人为指定的,也是不能表示公平的打分的。

除了上面的两个方法,你能否想到不一样的思路呢!

3. 算法模型:PeopleRank算法

基于PageRank的理论,我们以每个微博账户的“关注”为链出链接,“粉丝”为链入链接,我们把这种以人为核心的关系,叫PeopleRank。

关于PageRank的介绍,请参考文章:PageRank算法R语言实现

PeopleRank假设条件:

  • 数量假设:如果一个用户节点接收到的其他用户“关注”的数量越多,那么这个用户越重要。
  • 质量假设:用户A的“粉丝”质量不同,质量高的“粉丝”会通“关注”接向其他用户传递更多的权重。所以越是质量高的“用户”关注用户A,则用户A越重要。

衡量PeopleRank的3个指标:

  • 粉丝数
  • 粉丝是否有较高PeopleRank值
  • 粉丝关注了多少人

我们以下的数据为例,构造基于微博的数据模型:
(由于微博数据已增加访问权限,我无法拿到当前的实际数据,我用以前@晒粉丝应用,收集到的微博数据为例,这里ID已经过处理)

测试数据集:people.csv

  • 25个用户
  • 66个关系,关注和粉丝的关系

数据集: people.csv


1,19
1,21
2,11
2,17
2,21
3,1
3,20
3,2
3,7
3,6
3,10
4,3
4,6
5,19
5,11
5,2
6,4
6,12
6,18
6,15
6,10
6,5
7,9
7,18
7,10
8,3
8,11
8,7
8,16
8,14
9,6
10,8
10,18
11,13
11,3
12,9
12,4
12,16
12,5
13,19
13,1
13,6
14,7
14,17
14,19
14,1
14,5
14,2
15,11
15,14
15,12
16,20
17,4
17,6
18,10
18,11
18,15
18,14
19,18
20,10
20,5
21,24
22,11
23,17
24,15
25,24

第一列为用户ID,第二列也是用户ID。第一列用户,关注了第二用户。

以R语言可视化输出

igraph-refs

R语言程序


library(igraph)
people<-read.csv(file="people.csv",header=FALSE)
drawGraph<-function(data){
  names(data)<-c("from","to") 
  g <- graph.data.frame(data, directed=TRUE)
  V(g)$label <- V(g)$name
  V(g)$size <- 15
  E(g)$color <- grey(0.5)
  g2 <- simplify(g)
  plot(g2,layout=layout.circle)
}
drawGraph(people)

用R语言构建PeopleRank的算法原型

  • 构建邻接矩阵
  • 变换概率矩阵
  • 递归计算矩阵特征值
  • 标准化结果
  • 对结果排序输出

R语言算法模型


#构建邻接矩阵
adjacencyMatrix<-function(pages){
  n<-max(apply(pages,2,max))
  A <- matrix(0,n,n)
  for(i in 1:nrow(pages)) A[pages[i,]$dist,pages[i,]$src]<-1
  A
}

#变换概率矩阵
dProbabilityMatrix<-function(G,d=0.85){
  cs <- colSums(G)
  cs[cs==0] <- 1
  n <- nrow(G)
  delta <- (1-d)/n
  A <- matrix(delta,n,n)
  for (i in 1:n) A[i,] <- A[i,] + d*G[i,]/cs
  A
}

#递归计算矩阵特征值
eigenMatrix<-function(G,iter=100){
  n<-nrow(G)
  x <- rep(1,n)
  for (i in 1:iter) x <- G %*% x
  x/sum(x)
}

#直接计算矩阵特征值
calcEigenMatrix<-function(G){
  x <- Re(eigen(G)$vectors[,1])
  x/sum(x)
}

PeopleRank计算,带入数据集people.csv


people<-read.csv(file="people.csv",header=FALSE)
names(people)<-c("src","dist");people
A<-adjacencyMatrix(people);A
G<-dProbabilityMatrix(A);G
q<-calcEigenMatrix(G);

q
[1] 0.03274732 0.03404052 0.05983465 0.03527074 0.04366519 0.07042752 0.02741232
 [8] 0.03378595 0.02118713 0.06537870 0.07788465 0.03491910 0.03910097 0.05076803
[15] 0.06685364 0.01916392 0.02793695 0.09450614 0.05056016 0.03076591 0.02956243
[22] 0.00600000 0.00600000 0.03622806 0.00600000

我们给这25用户进行打分,从高到低进行排序。

对结果排序输出:


result<-data.frame(userid=userid,PR=q[userid])
result
   userid          PR
1      18 0.09450614
2      11 0.07788465
3       6 0.07042752
4      15 0.06685364
5      10 0.06537870
6       3 0.05983465
7      14 0.05076803
8      19 0.05056016
9       5 0.04366519
10     13 0.03910097
11     24 0.03622806
12      4 0.03527074
13     12 0.03491910
14      2 0.03404052
15      8 0.03378595
16      1 0.03274732
17     20 0.03076591
18     21 0.02956243
19     17 0.02793695
20      7 0.02741232
21      9 0.02118713
22     16 0.01916392
23     22 0.00600000
24     23 0.00600000
25     25 0.00600000

查看评分最高的用户18的关系数据:


people[c(which(people$src==18), which(people$dist==18)),]

   src dist
55  18   10
56  18   11
57  18   15
58  18   14
19   6   18
24   7   18
33  10   18
59  19   18

粉丝的PeopleRank排名:


which(result$userid %in% people$src[which(people$dist==18)])

[1]  3  5  8 20

粉丝的关注数:


table(people$src)[people$src[which(people$dist==18)]]

 6  7 10 19 
 6  3  2  1 

数据解释:用户18

  • 有4个粉丝为别是6,7,10,19。(粉丝数)
  • 4个粉丝的PeopleRank排名,是3,5,8,20。(粉丝是否有较高PeopleRank值)
  • 粉丝的关注数量,是6,3,2,1。(粉丝关注了多少人)

因此,通过对上面3个指标的综合打分,用户18是评分最高的用户。

通过R语言实现的计算模型,已经比较符合我们的评分标准了,下面我们把PeopleRank用MapReduce实现,以满足对海量数据的计算需求。

4. 架构设计:PeopleRank计算引擎系统架构

pagerank-spider

上图中,左边是数据爬虫系统,右边是Hadoop的HDFS, MapReduce。

  • 数据爬虫系统实时爬取微博数据
  • 设置系统定时器CRON,每xx小时,增量向HDFS导入数据(userid1,userid2)
  • 完成导入后,设置系统定时器,启动MapReduce程序,运行推荐算法。
  • 完成计算后,设置系统定时器,从HDFS导出推荐结果数据到数据库,方便以后的及时查询。

5. 程序开发:PeopleRank算法实现

win7的开发环境 和 Hadoop的运行环境 ,请参考文章:用Maven构建Hadoop项目

开发步骤:

  • 微博好友的关系数据: people.csv
  • 出始的PR数据:peoplerank.csv
  • 邻接矩阵: AdjacencyMatrix.java
  • PeopleRank计算: PageRank.java
  • PR标准化: Normal.java
  • 启动程序: PageRankJob.java

1). 微博好友的关系数据: people.csv,在上文中已列出

2). 出始的PR数据:peoplerank.csv


1,1
2,1
3,1
4,1
5,1
6,1
7,1
8,1
9,1
10,1
11,1
12,1
13,1
14,1
15,1
16,1
17,1
18,1
19,1
20,1
21,1
22,1
23,1
24,1
25,1

3). 邻接矩阵

矩阵解释:

  • 阻尼系数为0.85
  • 用户数为25
  • reduce以行输出矩阵的列,输出列主要用于分步式存储,下一步需要转成行

部分数据输出:


~ hadoop fs -cat /user/hdfs/pagerank/tmp1/part-r-00000|head -n 4

1	0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999
10	0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999
11	0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999
12	0.005999999,0.005999999,0.005999999,0.2185,0.2185,0.005999999,0.005999999,0.005999999,0.2185,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.2185,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999

4). PeopleRank计算: PageRank.java

迭代一次的PeopleRank值


~ hadoop fs -cat /user/hdfs/pagerank/pr/part-r-00000

1       0.716666
10      1.354167
11      2.232500
12      0.575000
13      0.575000
14      0.815833
15      1.354167
16      0.532500
17      1.425000
18      1.850000
19      1.283334
2       0.716667
20      1.141667
21      0.858333
22      0.150000
23      0.150000
24      1.850000
25      0.150000
3       1.170001
4       0.929167
5       1.070833
6       2.275001
7       0.603333
8       0.575000
9       0.645833

5). PR标准化: Normal.java

迭代10次,并标准化的结果:


~ hadoop fs -cat /user/hdfs/pagerank/result/part-r-00000

1       0.032842
10      0.065405
11      0.077670
12      0.034864
13      0.039175
14      0.050574
15      0.066614
16      0.019167
17      0.027990
18      0.094460
19      0.050673
2       0.034054
20      0.030835
21      0.029657
22      0.006000
23      0.006000
24      0.036111
25      0.006000
3       0.059864
4       0.035314
5       0.043805
6       0.070516
7       0.027444
8       0.033715
9       0.021251

我们对结果进行排序


   id       pr
10 18 0.094460
3  11 0.077670
22  6 0.070516
7  15 0.066614
2  10 0.065405
19  3 0.059864
11 19 0.050673
6  14 0.050574
21  5 0.043805
5  13 0.039175
17 24 0.036111
20  4 0.035314
4  12 0.034864
12  2 0.034054
24  8 0.033715
1   1 0.032842
13 20 0.030835
14 21 0.029657
9  17 0.027990
23  7 0.027444
25  9 0.021251
8  16 0.019167
15 22 0.006000
16 23 0.006000
18 25 0.006000

第一名是用户18,第二名是用户11,第三名是用户6,第三名与之前R语言单机计算的结果有些不一样,而且PR值也稍有不同,这是因为我们迭代10次时,特征值还没有完全的收敛,需要更多次的迭代计算,才能得矩阵的特征值。

程序API的实现,请参考文章:PageRank算法并行实现

我们通过PageRank的模型,成功地应用到了社交网络,实现了PeopleRank的计算,通过设计数据挖掘算法,来取代不成熟的人脑思想。算法模型将更客观,更精准。

最后,大家可以利用这个案例的设计思路,认真地了解社交网络,做出属于的自己的算法。

由于时间仓促,代码可能存在bug。请有能力同学,自行发现问题,解决问题!!

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-social-peoplerank/

打赏作者

PageRank算法R语言实现

算法为王系列文章,涵盖了计算机算法,数据挖掘(机器学习)算法,统计算法,金融算法等的多种跨学科算法组合。在大数据时代的背景下,算法已经成为了金字塔顶的明星。一个好的算法可以创造一个伟大帝国,就像Google。

算法为王的时代正式到来….

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/algorithm-pagerank-r/

pagerank-r

前言

Google搜索,早已成为我每天必用的工具,无数次惊叹它搜索结果的准确性。同时,我也在做Google的SEO,推广自己的博客。经过几个月尝试,我的博客PR到2了,外链也有几万个了。总结下来,还是感叹PageRank的神奇!

改变世界的算法,PageRank!

目录

  1. PageRank算法介绍
  2. PageRank算法原理
  3. PageRank算法的R语言实现

1. PageRank算法介绍

PageRank是Google专有的算法,用于衡量特定网页相对于搜索引擎索引中的其他网页而言的重要程度。它由Larry Page 和 Sergey Brin在20世纪90年代后期发明。PageRank实现了将链接价值概念作为排名因素。

PageRank让链接来”投票”

一个页面的“得票数”由所有链向它的页面的重要性来决定,到一个页面的超链接相当于对该页投一票。一个页面的PageRank是由所有链向它的页面(“链入页面”)的重要性经过递归算法得到的。一个有较多链入的页面会有较高的等级,相反如果一个页面没有任何链入页面,那么它没有等级。

简单一句话概括:从许多优质的网页链接过来的网页,必定还是优质网页。

PageRank的计算基于以下两个基本假设:

  • 数量假设:如果一个页面节点接收到的其他网页指向的入链数量越多,那么这个页面越重要
  • 质量假设:指向页面A的入链质量不同,质量高的页面会通过链接向其他页面传递更多的权重。所以越是质量高的页面指向页面A,则页面A越重要。

要提高PageRank有3个要点:

  • 反向链接数
  • 反向链接是否来自PageRank较高的页面
  • 反向链接源页面的链接数

2. PageRank算法原理

在初始阶段:网页通过链接关系构建起有向图,每个页面设置相同的PageRank值,通过若干轮的计算,会得到每个页面所获得的最终PageRank值。随着每一轮的计算进行,网页当前的PageRank值会不断得到更新。

在一轮更新页面PageRank得分的计算中,每个页面将其当前的PageRank值平均分配到本页面包含的出链上,这样每个链接即获得了相应的权值。而每个页面将所有指向本页面的入链所传入的权值求和,即可得到新的PageRank得分。当每个页面都获得了更新后的PageRank值,就完成了一轮PageRank计算。

1). 算法原理

PageRank算法建立在随机冲浪者模型上,其基本思想是:网页的重要性排序是由网页间的链接关系所决定的,算法是依靠网页间的链接结构来评价每个页面的等级和重要性,一个网页的PR值不仅考虑指向它的链接网页数,还有指向’指向它的网页的其他网页本身的重要性。

PageRank具有两大特性:

  • PR值的传递性:网页A指向网页B时,A的PR值也部分传递给B
  • 重要性的传递性:一个重要网页比一个不重要网页传递的权重要多

2). 计算公式:

pagerank-formula

  • PR(pi): pi页面的PageRank值
  • n: 所有页面的数量
  • pi: 不同的网页p1,p2,p3
  • M(i): pi链入网页的集合
  • L(j): pj链出网页的数量
  • d:阻尼系数, 任意时刻,用户到达某页面后并继续向后浏览的概率。
    (1-d=0.15) :表示用户停止点击,随机跳到新URL的概率
    取值范围: 0 < d ≤ 1, Google设为0.85

3). 构造实例:以4个页面的数据为例

pagerank-sample

图片说明:

  • ID=1的页面链向2,3,4页面,所以一个用户从ID=1的页面跳转到2,3,4的概率各为1/3
  • ID=2的页面链向3,4页面,所以一个用户从ID=2的页面跳转到3,4的概率各为1/2
  • ID=3的页面链向4页面,所以一个用户从ID=3的页面跳转到4的概率各为1
  • ID=4的页面链向2页面,所以一个用户从ID=4的页面跳转到2的概率各为1

构造邻接表:


链接源页面     链接目标页面
    1             2,3,4
    2             3,4
    3             4
    4             2

构造邻接矩阵(方阵):

  • 列:源页面
  • 行:目标页面

     [,1] [,2] [,3] [,4]
[1,]    0    0    0    0
[2,]    1    0    0    1
[3,]    1    1    0    0
[4,]    1    1    1    0

转换为概率矩阵(转移矩阵)


     [,1] [,2] [,3] [,4]
[1,] 0      0    0    0
[2,] 1/3    0    0    1
[3,] 1/3  1/2    0    0
[4,] 1/3  1/2    1    0

通过链接关系,我们就构造出了“转移矩阵”。

3. R语言单机算法实现

创建数据文件:page.csv


1,2
1,3
1,4
2,3
2,4
3,4
4,2

分别用下面3种方式实现PageRank:

  • 未考虑阻尼系统的情况
  • 包括考虑阻尼系统的情况
  • 直接用R的特征值计算函数

1). 未考虑阻尼系统的情况

R语言实现


#构建邻接矩阵
adjacencyMatrix<-function(pages){
  n<-max(apply(pages,2,max))
  A <- matrix(0,n,n)
  for(i in 1:nrow(pages)) A[pages[i,]$dist,pages[i,]$src]<-1
  A
}

#变换概率矩阵
probabilityMatrix<-function(G){
  cs <- colSums(G)
  cs[cs==0] <- 1
  n <- nrow(G)
  A <- matrix(0,nrow(G),ncol(G))
  for (i in 1:n) A[i,] <- A[i,] + G[i,]/cs
  A
}

#递归计算矩阵特征值
eigenMatrix<-function(G,iter=100){
  iter<-10
  n<-nrow(G)
  x <- rep(1,n)
  for (i in 1:iter) x <- G %*% x
  x/sum(x)
}

> pages<-read.table(file="page.csv",header=FALSE,sep=",")
> names(pages)<-c("src","dist");pages
  src dist
1   1    2
2   1    3
3   1    4
4   2    3
5   2    4
6   3    4
7   4    2

> A<-adjacencyMatrix(pages);A
     [,1] [,2] [,3] [,4]
[1,]    0    0    0    0
[2,]    1    0    0    1
[3,]    1    1    0    0
[4,]    1    1    1    0

> G<-probabilityMatrix(A);G
          [,1] [,2] [,3] [,4]
[1,] 0.0000000  0.0    0    0
[2,] 0.3333333  0.0    0    1
[3,] 0.3333333  0.5    0    0
[4,] 0.3333333  0.5    1    0

> q<-eigenMatrix(G,100);q
          [,1]
[1,] 0.0000000
[2,] 0.4036458
[3,] 0.1979167
[4,] 0.3984375

结果解读:

  • ID=1的页面,PR值是0,因为没有指向ID=1的页面
  • ID=2的页面,PR值是0.4,权重最高,因为1和4都指向2,4权重较高,并且4只有一个链接指向到2,权重传递没有损失
  • ID=3的页面,PR值是0.19,虽有1和2的指向了3,但是1和2还指向的其他页面,权重被分散了,所以ID=3的页面PR并不高
  • ID=4的页面,PR值是0.39,权重很高,因为被1,2,3都指向了

从上面的结果,我们发现ID=1的页面,PR值是0,那么ID=1的页,就不能向其他页面输出权重了,计算就会不合理!所以,增加d阻尼系数,修正没有链接指向的页面,保证页面的最小PR值>0,。

2). 包括考虑阻尼系统的情况

增加函数:dProbabilityMatrix



#变换概率矩阵,考虑d的情况
dProbabilityMatrix<-function(G,d=0.85){
  cs <- colSums(G)
  cs[cs==0] <- 1
  n <- nrow(G)
  delta <- (1-d)/n
  A <- matrix(delta,nrow(G),ncol(G))
  for (i in 1:n) A[i,] <- A[i,] + d*G[i,]/cs
  A
}

> pages<-read.table(file="page.csv",header=FALSE,sep=",")
> names(pages)<-c("src","dist");pages
  src dist
1   1    2
2   1    3
3   1    4
4   2    3
5   2    4
6   3    4
7   4    2

> A<-adjacencyMatrix(pages);A
     [,1] [,2] [,3] [,4]
[1,]    0    0    0    0
[2,]    1    0    0    1
[3,]    1    1    0    0
[4,]    1    1    1    0

> G<-dProbabilityMatrix(A);G
          [,1]   [,2]   [,3]   [,4]
[1,] 0.0375000 0.0375 0.0375 0.0375
[2,] 0.3208333 0.0375 0.0375 0.8875
[3,] 0.3208333 0.4625 0.0375 0.0375
[4,] 0.3208333 0.4625 0.8875 0.0375

> q<-eigenMatrix(G,100);q
          [,1]
[1,] 0.0375000
[2,] 0.3738930
[3,] 0.2063759
[4,] 0.3822311

增加阻尼系数后,ID=1的页面,就有值了PR(1)=(1-d)/n=(1-0.85)/4=0.0375,即无外链页面的最小值。

3). 直接用R的特征值计算函数

增加函数:calcEigenMatrix


#直接计算矩阵特征值
calcEigenMatrix<-function(G){
  x <- Re(eigen(G)$vectors[,1])
  x/sum(x)
}

> pages<-read.table(file="page.csv",header=FALSE,sep=",")
> names(pages)<-c("src","dist");pages
  src dist
1   1    2
2   1    3
3   1    4
4   2    3
5   2    4
6   3    4
7   4    2

> A<-adjacencyMatrix(pages);A
     [,1] [,2] [,3] [,4]
[1,]    0    0    0    0
[2,]    1    0    0    1
[3,]    1    1    0    0
[4,]    1    1    1    0

> G<-dProbabilityMatrix(A);G
          [,1]   [,2]   [,3]   [,4]
[1,] 0.0375000 0.0375 0.0375 0.0375
[2,] 0.3208333 0.0375 0.0375 0.8875
[3,] 0.3208333 0.4625 0.0375 0.0375
[4,] 0.3208333 0.4625 0.8875 0.0375

> q<-calcEigenMatrix(G);q
[1] 0.0375000 0.3732476 0.2067552 0.3824972

直接计算矩阵特征值,可以有效地减少的循环的操作,提高程序运行效率。

在了解PageRank的原理后,使用R语言构建PageRank模型,是非常容易的。实际应用中,我们也愿意用比较简单的方式建模,验证后,再用其他语言语言去企业应用!

下一篇文章,将介绍如何用MapReduce分步式算法来实现PageRank模型,PageRank算法并行实现

参考文章

转载请注明出处:
http://blog.fens.me/algorithm-pagerank-r/

打赏作者

用MapReduce实现矩阵乘法

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-matrix/

hadoop-mapreduce-matrix

前言

MapReduce打开了并行计算的大门,让我们个人开发者有了处理大数据的能力。但想用好MapReduce,把原来单机算法并行化,也不是一件容易事情。很多的时候,我们需要从单机算法能否矩阵化去思考,所以矩阵操作就变成了算法并行化的基础。

像推荐系统的协同过滤算法,就是基于矩阵思想实现MapReduce并行化。

目录

  1. 矩阵介绍
  2. 矩阵乘法的R语言计算
  3. 矩阵乘法的MapReduce计算
  4. 稀疏矩阵乘法的MapReduce计算

1. 矩阵介绍

矩阵: 数学上,一个m×n的矩阵是一个由m行n列元素排列成的矩形阵列。矩阵里的元素可以是数字、符号或数学式。以下是一个由6个数字符素构成的2行3列的矩阵:


1 2 3
4 5 6

矩阵加法
大小相同(行数列数都相同)的矩阵之间可以相互加减,具体是对每个位置上的元素做加减法。

举例:两个矩阵的加法


1 3 1   +  0 0 5   =   1+0 3+0 1+5   =   1 3 6
1 0 0      7 5 0       1+7 0+5 0+0       8 5 0 

矩阵乘法
两个矩阵可以相乘,当且仅当第一个矩阵的列数等于第二个矩阵的行数。矩阵的乘法满足结合律和分配律,但不满足交换律。

举例:两个矩阵的乘法


 1 0 2   *   3 1   =  (1*3+0*2+2*1)  (1*1+0*1+2*0)    =  5 1
-1 3 1       2 1      (-1*3+3*2+1*1) (-1*1+3*1+1*0)      4 2
             1 0 

2. 矩阵乘法的R语言计算


> m1<-matrix(c(1,0,2,-1,3,1),nrow=2,byrow=TRUE);m1
     [,1] [,2] [,3]
[1,]    1    0    2
[2,]   -1    3    1

> m2<-matrix(c(3,1,2,1,1,0),nrow=3,byrow=TRUE);m2
     [,1] [,2]
[1,]    3    1
[2,]    2    1
[3,]    1    0

> m3<-m1 %*% m2;m3
     [,1] [,2]
[1,]    5    1
[2,]    4    2

由R语言实现矩阵的乘法是非常简单的。

3. 矩阵乘法的MapReduce计算

算法实现思路:

mapreduce-matrix

  • 新建2个矩阵数据文件:m1.csv, m2.csv
  • 新建启动程序:MainRun.java
  • 新建MR程序:MartrixMultiply.java

1).新建2个矩阵数据文件m1.csv, m2.csv

m1.csv


1,0,2
-1,3,1

m2.csv


3,1
2,1
1,0

3).新建启动程序:MainRun.java

启动程序


package org.conan.myhadoop.matrix;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.mapred.JobConf;

public class MainRun {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static void main(String[] args) {
        martrixMultiply();
    }
    
    public static void martrixMultiply() {
        Map<String, String> path = new HashMap<String, String>();
        path.put("m1", "logfile/matrix/m1.csv");// 本地的数据文件
        path.put("m2", "logfile/matrix/m2.csv");
        path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目录
        path.put("input1", HDFS + "/user/hdfs/matrix/m1");
        path.put("input2", HDFS + "/user/hdfs/matrix/m2");
        path.put("output", HDFS + "/user/hdfs/matrix/output");

        try {
            MartrixMultiply.run(path);// 启动程序
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(0);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(MainRun.class);
        conf.setJobName("MartrixMultiply");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

}

3).新建MR程序:MartrixMultiply.java

MapReduce程序


package org.conan.myhadoop.matrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class MartrixMultiply {

    public static class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> {

        private String flag;// m1 or m2

        private int rowNum = 2;// 矩阵A的行数
        private int colNum = 2;// 矩阵B的列数
        private int rowIndexA = 1; // 矩阵A,当前在第几行
        private int rowIndexB = 1; // 矩阵B,当前在第几行

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();// 判断读的数据集
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = MainRun.DELIMITER.split(values.toString());
            if (flag.equals("m1")) {
                for (int i = 1; i <= rowNum; i++) {
                    Text k = new Text(rowIndexA + "," + i);
                    for (int j = 1; j <= tokens.length; j++) {
                        Text v = new Text("A:" + j + "," + tokens[j - 1]);
                        context.write(k, v);
                        System.out.println(k.toString() + "  " + v.toString());
                    }

                }
                rowIndexA++;

            } else if (flag.equals("m2")) {
                for (int i = 1; i <= tokens.length; i++) {
                    for (int j = 1; j <= colNum; j++) {
                        Text k = new Text(i + "," + j);
                        Text v = new Text("B:" + rowIndexB + "," + tokens[j - 1]);
                        context.write(k, v);
                        System.out.println(k.toString() + "  " + v.toString());
                    }
                }

                rowIndexB++;
            }
        }
    }

    public static class MatrixReducer extends Reducer<Text, Text, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            Map<String, String> mapA = new HashMap<String, String>();
            Map<String, String> mapB = new HashMap<String, String>();

            System.out.print(key.toString() + ":");

            for (Text line : values) {
                String val = line.toString();
                System.out.print("("+val+")");

                if (val.startsWith("A:")) {
                    String[] kv = MainRun.DELIMITER.split(val.substring(2));
                    mapA.put(kv[0], kv[1]);

                    // System.out.println("A:" + kv[0] + "," + kv[1]);

                } else if (val.startsWith("B:")) {
                    String[] kv = MainRun.DELIMITER.split(val.substring(2));
                    mapB.put(kv[0], kv[1]);

                    // System.out.println("B:" + kv[0] + "," + kv[1]);
                }
            }

            int result = 0;
            Iterator<String> iter = mapA.keySet().iterator();
            while (iter.hasNext()) {
                String mapk = iter.next();
                result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(mapB.get(mapk));
            }
            context.write(key, new IntWritable(result));
            System.out.println();

            // System.out.println("C:" + key.toString() + "," + result);
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = MainRun.config();

        String input = path.get("input");
        String input1 = path.get("input1");
        String input2 = path.get("input2");
        String output = path.get("output");

        HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(path.get("m1"), input1);
        hdfs.copyFile(path.get("m2"), input2);

        Job job = new Job(conf);
        job.setJarByClass(MartrixMultiply.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(MatrixMapper.class);
        job.setReducerClass(MatrixReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

}

运行日志


Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix
Create: hdfs://192.168.1.210:9000/user/hdfs/matrix
copy from: logfile/matrix/m1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1
copy from: logfile/matrix/m2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2
2014-1-15 10:48:03 org.apache.hadoop.util.NativeCodeLoader <clinit>
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2014-1-15 10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2014-1-15 10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
2014-1-15 10:48:03 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 2
2014-1-15 10:48:03 org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
警告: Snappy native library not loaded
2014-1-15 10:48:04 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2014-1-15 10:48:04 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: io.sort.mb = 100
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: data buffer = 79691776/99614720
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: record buffer = 262144/327680
1,1  A:1,1
1,1  A:2,0
1,1  A:3,2
1,2  A:1,1
1,2  A:2,0
1,2  A:3,2
2,1  A:1,-1
2,1  A:2,3
2,1  A:3,1
2,2  A:1,-1
2,2  A:2,3
2,2  A:3,1
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2014-1-15 10:48:04 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2014-1-15 10:48:05 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 0% reduce 0%
2014-1-15 10:48:07 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 10:48:07 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2014-1-15 10:48:07 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: io.sort.mb = 100
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: data buffer = 79691776/99614720
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: record buffer = 262144/327680
1,1  B:1,3
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
1,2  B:1,1
2,1  B:1,3
2,2  B:1,1
1,1  B:2,2
1,2  B:2,1
2,1  B:2,2
2,2  B:2,1
1,1  B:3,1
1,2  B:3,0
2,1  B:3,1
2,2  B:3,0
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2014-1-15 10:48:07 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
2014-1-15 10:48:08 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000001_0' done.
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 2 sorted segments
2014-1-15 10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 2 segments left of total size: 294 bytes
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
1,1:(B:1,3)(B:2,2)(B:3,1)(A:1,1)(A:2,0)(A:3,2)
1,2:(A:1,1)(A:2,0)(A:3,2)(B:1,1)(B:2,1)(B:3,0)
2,1:(B:1,3)(B:2,2)(B:3,1)(A:1,-1)(A:2,3)(A:3,1)
2,2:(A:1,-1)(A:2,3)(A:3,1)(B:1,1)(B:2,1)(B:3,0)
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2014-1-15 10:48:10 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output
2014-1-15 10:48:13 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2014-1-15 10:48:13 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2014-1-15 10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2014-1-15 10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=24
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=1713
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=75
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=125314
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=114
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=30
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=302
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Map input records=5
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=48
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=242
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=764215296
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=220
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=24
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=4
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=4
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Map output records=24

4. 稀疏矩阵乘法的MapReduce计算

我们在用矩阵处理真实数据的时候,一般都是非常稀疏矩阵,为了节省存储空间,通常只会存储非0的数据。

下面我们来做一个稀疏矩阵:

spraseMatrix

  • R语言的实现矩阵乘法
  • 新建2个矩阵数据文件sm1.csv, sm2.csv
  • 修改启动程序:MainRun.java
  • 新建MR程序:SparseMartrixMultiply.java

1). R语言的实现矩阵乘法

R语言程序


> m1<-matrix(c(1,0,0,3,2,5,0,4,0,0,0,1,4,7,1,2),nrow=4,byrow=TRUE);m1
     [,1] [,2] [,3] [,4]
[1,]    1    0    0    3
[2,]    2    5    0    4
[3,]    0    0    0    1
[4,]    4    7    1    2

> m2<-matrix(c(5,0,0,2,0,0,3,1),nrow=4,byrow=TRUE);m2
     [,1] [,2]
[1,]    5    0
[2,]    0    2
[3,]    0    0
[4,]    3    1

> m3<-m1 %*% m2;m3
     [,1] [,2]
[1,]   14    3
[2,]   22   14
[3,]    3    1
[4,]   26   16

2).新建2个稀疏矩阵数据文件sm1.csv, sm2.csv

只存储非0的数据,3列存储,第一列“原矩阵行”,第二列“原矩阵列”,第三列“原矩阵值”。

sm1.csv


1,1,1
1,4,3
2,1,2
2,2,5
2,4,4
3,4,1
4,1,4
4,2,7
4,3,1
4,4,2

sm2.csv


1,1,5
2,2,2
4,1,3
4,2,1

3).修改启动程序:MainRun.java

增加SparseMartrixMultiply的启动配置


    public static void main(String[] args) {
        sparseMartrixMultiply();
    }    
    
    public static void sparseMartrixMultiply() {
        Map<String, String> path = new HashMap<String, String>();
        path.put("m1", "logfile/matrix/sm1.csv");// 本地的数据文件
        path.put("m2", "logfile/matrix/sm2.csv");
        path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目录
        path.put("input1", HDFS + "/user/hdfs/matrix/m1");
        path.put("input2", HDFS + "/user/hdfs/matrix/m2");
        path.put("output", HDFS + "/user/hdfs/matrix/output");

        try {
            SparseMartrixMultiply.run(path);// 启动程序
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(0);
    }

4). 新建MR程序:SparseMartrixMultiply.java

spareseMatrix2

  • map函数有修改,reduce函数没有变化
  • 去掉判断所在行和列的变量

package org.conan.myhadoop.matrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class SparseMartrixMultiply {

    public static class SparseMatrixMapper extends Mapper>LongWritable, Text, Text, Text< {

        private String flag;// m1 or m2

        private int rowNum = 4;// 矩阵A的行数
        private int colNum = 2;// 矩阵B的列数

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();// 判断读的数据集
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = MainRun.DELIMITER.split(values.toString());
            if (flag.equals("m1")) {
                String row = tokens[0];
                String col = tokens[1];
                String val = tokens[2];

                for (int i = 1; i >= colNum; i++) {
                    Text k = new Text(row + "," + i);
                    Text v = new Text("A:" + col + "," + val);
                    context.write(k, v);
                    System.out.println(k.toString() + "  " + v.toString());
                }

            } else if (flag.equals("m2")) {
                String row = tokens[0];
                String col = tokens[1];
                String val = tokens[2];

                for (int i = 1; i >= rowNum; i++) {
                    Text k = new Text(i + "," + col);
                    Text v = new Text("B:" + row + "," + val);
                    context.write(k, v);
                    System.out.println(k.toString() + "  " + v.toString());

                }
            }
        }
    }

    public static class SparseMatrixReducer extends Reducer>Text, Text, Text, IntWritable< {

        @Override
        public void reduce(Text key, Iterable>Text< values, Context context) throws IOException, InterruptedException {

            Map>String, String< mapA = new HashMap>String, String<();
            Map>String, String< mapB = new HashMap>String, String<();

            System.out.print(key.toString() + ":");

            for (Text line : values) {
                String val = line.toString();
                System.out.print("(" + val + ")");

                if (val.startsWith("A:")) {
                    String[] kv = MainRun.DELIMITER.split(val.substring(2));
                    mapA.put(kv[0], kv[1]);

                    // System.out.println("A:" + kv[0] + "," + kv[1]);

                } else if (val.startsWith("B:")) {
                    String[] kv = MainRun.DELIMITER.split(val.substring(2));
                    mapB.put(kv[0], kv[1]);

                    // System.out.println("B:" + kv[0] + "," + kv[1]);
                }
            }

            int result = 0;
            Iterator>String< iter = mapA.keySet().iterator();
            while (iter.hasNext()) {
                String mapk = iter.next();
                String bVal = mapB.containsKey(mapk) ? mapB.get(mapk) : "0";
                result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(bVal);
            }
            context.write(key, new IntWritable(result));
            System.out.println();

            // System.out.println("C:" + key.toString() + "," + result);
        }
    }

    public static void run(Map>String, String< path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = MainRun.config();

        String input = path.get("input");
        String input1 = path.get("input1");
        String input2 = path.get("input2");
        String output = path.get("output");

        HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(path.get("m1"), input1);
        hdfs.copyFile(path.get("m2"), input2);

        Job job = new Job(conf);
        job.setJarByClass(MartrixMultiply.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(SparseMatrixMapper.class);
        job.setReducerClass(SparseMatrixReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }
}

运行输出:


Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix
Create: hdfs://192.168.1.210:9000/user/hdfs/matrix
copy from: logfile/matrix/sm1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1
copy from: logfile/matrix/sm2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2
2014-1-15 11:57:31 org.apache.hadoop.util.NativeCodeLoader >clinit<
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
2014-1-15 11:57:31 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 2
2014-1-15 11:57:31 org.apache.hadoop.io.compress.snappy.LoadSnappy >clinit<
警告: Snappy native library not loaded
2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2014-1-15 11:57:31 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: io.sort.mb = 100
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: data buffer = 79691776/99614720
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: record buffer = 262144/327680
1,1  A:1,1
1,2  A:1,1
1,1  A:4,3
1,2  A:4,3
2,1  A:1,2
2,2  A:1,2
2,1  A:2,5
2,2  A:2,5
2,1  A:4,4
2,2  A:4,4
3,1  A:4,1
3,2  A:4,1
4,1  A:1,4
4,2  A:1,4
4,1  A:2,7
4,2  A:2,7
4,1  A:3,1
4,2  A:3,1
4,1  A:4,2
4,2  A:4,2
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2014-1-15 11:57:31 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2014-1-15 11:57:32 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 0% reduce 0%
2014-1-15 11:57:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 11:57:34 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2014-1-15 11:57:34 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: io.sort.mb = 100
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: data buffer = 79691776/99614720
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: record buffer = 262144/327680
1,1  B:1,5
2,1  B:1,5
3,1  B:1,5
4,1  B:1,5
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
1,2  B:2,2
2,2  B:2,2
3,2  B:2,2
4,2  B:2,2
1,1  B:4,3
2,1  B:4,3
3,1  B:4,3
4,1  B:4,3
1,2  B:4,1
2,2  B:4,1
3,2  B:4,1
4,2  B:4,1
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2014-1-15 11:57:34 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
2014-1-15 11:57:35 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000001_0' done.
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 2 sorted segments
2014-1-15 11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 2 segments left of total size: 436 bytes
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
1,1:(B:1,5)(B:4,3)(A:1,1)(A:4,3)
1,2:(A:1,1)(A:4,3)(B:2,2)(B:4,1)
2,1:(B:1,5)(B:4,3)(A:1,2)(A:2,5)(A:4,4)
2,2:(A:1,2)(A:2,5)(A:4,4)(B:4,1)(B:2,2)
3,1:(B:1,5)(B:4,3)(A:4,1)
3,2:(A:4,1)(B:2,2)(B:4,1)
4,1:(B:4,3)(B:1,5)(A:1,4)(A:2,7)(A:3,1)(A:4,2)
4,2:(A:1,4)(A:2,7)(A:3,1)(A:4,2)(B:2,2)(B:4,1)
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2014-1-15 11:57:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output
2014-1-15 11:57:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce < reduce
2014-1-15 11:57:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2014-1-15 11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2014-1-15 11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=53
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=2503
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=266
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=126274
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=347
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=98
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=444
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Map input records=14
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=72
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=360
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=764215296
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=220
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=36
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=8
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=8
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Map output records=36

程序源代码,已上传到github:
https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myhadoop/matrix

这样就用MapReduce的程序,实现了矩阵的乘法!有了矩阵计算的基础,接下来,我们就可以做更多的事情了!

参考文章:MapReduce实现大矩阵乘法

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-matrix/

打赏作者

Mahout推荐算法API详解

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/mahout-recommendation-api

mahout-Recommendation

前言

用Mahout来构建推荐系统,是一件既简单又困难的事情。简单是因为Mahout完整地封装了“协同过滤”算法,并实现了并行化,提供非常简单的API接口;困难是因为我们不了解算法细节,很难去根据业务的场景进行算法配置和调优。

本文将深入算法API去解释Mahout推荐算法底层的一些事。

目录

  1. Mahout推荐算法介绍
  2. 算法评判标准:召回率与准确率
  3. Recommender.java的API接口
  4. 测试程序:RecommenderTest.java
  5. 基于用户的协同过滤算法UserCF
  6. 基于物品的协同过滤算法ItemCF
  7. SlopeOne算法
  8. KNN Linear interpolation item–based推荐算法
  9. SVD推荐算法
  10. Tree Cluster-based 推荐算法
  11. Mahout推荐算法总结

1. Mahout推荐算法介绍

Mahoutt推荐算法,从数据处理能力上,可以划分为2类:

  • 单机内存算法实现
  • 基于Hadoop的分步式算法实现

1). 单机内存算法实现

单机内存算法实现:就是在单机下运行的算法,是由cf.taste项目实现的,像我的们熟悉的UserCF,ItemCF都支持单机内存运行,并且参数可以灵活配置。单机算法的基本实例,请参考文章:用Maven构建Mahout项目

单机内存算法的问题在于,受限于单机的资源。对于中等规模的数据,像1G,10G的数据量,有能力进行计算,但是超过100G的数据量,对于单机来说是不可能完成的任务。

2). 基于Hadoop的分步式算法实现

基于Hadoop的分步式算法实现:就是把单机内存算法并行化,把任务分散到多台计算机一起运行。Mahout提供了ItemCF基于Hadoop并行化算法实现。基于Hadoop的分步式算法实现,请参考文章:
Mahout分步式程序开发 基于物品的协同过滤ItemCF

分步式并行算法的问题在于,如何让单机算法并行化。在单机算法中,我们只需要考虑算法,数据结构,内存,CPU就够了,但是分步式算法还要额外考虑很多的情况,比如多节点的数据合并,数据排序,网路通信的效率,节点宕机重算,数据分步式存储等等的很多问题。

2. 算法评判标准:召回率(recall)与查准率(precision)

Mahout提供了2个评估推荐器的指标,查准率和召回率(查全率),这两个指标是搜索引擎中经典的度量方法。

precision_recall


         相关 不相关
检索到     A    C
未检索到   B    D
  • A:检索到的,相关的 (搜到的也想要的)
  • B:未检索到的,但是相关的 (没搜到,然而实际上想要的)
  • C:检索到的,但是不相关的 (搜到的但没用的)
  • D:未检索到的,也不相关的 (没搜到也没用的)

被检索到的越多越好,这是追求“查全率”,即A/(A+B),越大越好。
被检索到的,越相关的越多越好,不相关的越少越好,这是追求“查准率”,即A/(A+C),越大越好。

在大规模数据集合中,这两个指标是相互制约的。当希望索引出更多的数据的时候,查准率就会下降,当希望索引更准确的时候,会索引更少的数据。

3. Recommender的API接口

1). 系统环境:

  • Win7 64bit
  • Java 1.6.0_45
  • Maven 3
  • Eclipse Juno Service Release 2
  • Mahout 0.8
  • Hadoop 1.1.2

2). Recommender接口文件:
org.apache.mahout.cf.taste.recommender.Recommender.java

mahout-Recommender-class

接口中方法的解释:

  • recommend(long userID, int howMany): 获得推荐结果,给userID推荐howMany个Item
  • recommend(long userID, int howMany, IDRescorer rescorer): 获得推荐结果,给userID推荐howMany个Item,可以根据rescorer对结构重新排序。
  • estimatePreference(long userID, long itemID): 当打分为空,估计用户对物品的打分
  • setPreference(long userID, long itemID, float value): 赋值用户,物品,打分
  • removePreference(long userID, long itemID): 删除用户对物品的打分
  • getDataModel(): 提取推荐数据

通过Recommender接口,我可以猜出核心算法,应该会在子类的estimatePreference()方法中进行实现。

3). 通过继承关系到Recommender接口的子类:

mahout-Recommender-hierarchy

推荐算法实现类:

  • GenericUserBasedRecommender: 基于用户的推荐算法
  • GenericItemBasedRecommender: 基于物品的推荐算法
  • KnnItemBasedRecommender: 基于物品的KNN推荐算法
  • SlopeOneRecommender: Slope推荐算法
  • SVDRecommender: SVD推荐算法
  • TreeClusteringRecommender:TreeCluster推荐算法

下面将分别介绍每种算法的实现。

4. 测试程序:RecommenderTest.java

测试数据集:item.csv


1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.5
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

测试程序:org.conan.mymahout.recommendation.job.RecommenderTest.java


package org.conan.mymahout.recommendation.job;

import java.io.IOException;
import java.util.List;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.common.RandomUtils;

public class RecommenderTest {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        RandomUtils.useTestSeed();
        String file = "datafile/item.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        slopeOne(dataModel);
    }

    public static void userCF(DataModel dataModel) throws TasteException{}
    public static void itemCF(DataModel dataModel) throws TasteException{}
    public static void slopeOne(DataModel dataModel) throws TasteException{}

    ...

每种算法都一个单独的方法进行算法测试,如userCF(),itemCF(),slopeOne()….

5. 基于用户的协同过滤算法UserCF

基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲就是:给用户推荐和他兴趣相似的其他用户喜欢的物品。

举例说明:

image015

基于用户的 CF 的基本思想相当简单,基于用户对物品的偏好找到相邻邻居用户,然后将邻居用户喜欢的推荐给当前用户。计算上,就是将一个用户对所有物品的偏好作为一个向量来计算用户之间的相似度,找到 K 邻居后,根据邻居的相似度权重以及他们对物品的偏好,预测当前用户没有偏好的未涉及物品,计算得到一个排序的物品列表作为推荐。图 2 给出了一个例子,对于用户 A,根据用户的历史偏好,这里只计算得到一个邻居 – 用户 C,然后将用户 C 喜欢的物品 D 推荐给用户 A。

上文中图片和解释文字,摘自: https://www.ibm.com/developerworks/cn/web/1103_zhaoct_recommstudy2/

算法API: org.apache.mahout.cf.taste.impl.recommender.GenericUserBasedRecommender


  @Override
  public float estimatePreference(long userID, long itemID) throws TasteException {
    DataModel model = getDataModel();
    Float actualPref = model.getPreferenceValue(userID, itemID);
    if (actualPref != null) {
      return actualPref;
    }
    long[] theNeighborhood = neighborhood.getUserNeighborhood(userID);
    return doEstimatePreference(userID, theNeighborhood, itemID);
  }

 protected float doEstimatePreference(long theUserID, long[] theNeighborhood, long itemID) throws TasteException {
    if (theNeighborhood.length == 0) {
      return Float.NaN;
    }
    DataModel dataModel = getDataModel();
    double preference = 0.0;
    double totalSimilarity = 0.0;
    int count = 0;
    for (long userID : theNeighborhood) {
      if (userID != theUserID) {
        // See GenericItemBasedRecommender.doEstimatePreference() too
        Float pref = dataModel.getPreferenceValue(userID, itemID);
        if (pref != null) {
          double theSimilarity = similarity.userSimilarity(theUserID, userID);
          if (!Double.isNaN(theSimilarity)) {
            preference += theSimilarity * pref;
            totalSimilarity += theSimilarity;
            count++;
          }
        }
      }
    }
    // Throw out the estimate if it was based on no data points, of course, but also if based on
    // just one. This is a bit of a band-aid on the 'stock' item-based algorithm for the moment.
    // The reason is that in this case the estimate is, simply, the user's rating for one item
    // that happened to have a defined similarity. The similarity score doesn't matter, and that
    // seems like a bad situation.
    if (count <= 1) {
      return Float.NaN;
    }
    float estimate = (float) (preference / totalSimilarity);
    if (capper != null) {
      estimate = capper.capEstimate(estimate);
    }
    return estimate;
  }

测试程序:


    public static void userCF(DataModel dataModel) throws TasteException {
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:1.0
Recommender IR Evaluator: [Precision:0.5,Recall:0.5]
uid:1,(104,4.333333)(106,4.000000)
uid:2,(105,4.049678)
uid:3,(103,3.512787)(102,2.747869)
uid:4,(102,3.000000)

用R语言重写UserCF的实现,请参考文章:用R解析Mahout用户推荐协同过滤算法(UserCF)

6. 基于物品的协同过滤算法ItemCF

基于item的协同过滤,通过用户对不同item的评分来评测item之间的相似性,基于item之间的相似性做出推荐。简单来讲就是:给用户推荐和他之前喜欢的物品相似的物品。

举例说明:

image017

基于物品的 CF 的原理和基于用户的 CF 类似,只是在计算邻居时采用物品本身,而不是从用户的角度,即基于用户对物品的偏好找到相似的物品,然后根据用户的历史偏好,推荐相似的物品给他。从计算的角度看,就是将所有用户对某个物品的偏好作为一个向量来计算物品之间的相似度,得到物品的相似物品后,根据用户历史的偏好预测当前用户还没有表示偏好的物品,计算得到一个排序的物品列表作为推荐。图 3 给出了一个例子,对于物品 A,根据所有用户的历史偏好,喜欢物品 A 的用户都喜欢物品 C,得出物品 A 和物品 C 比较相似,而用户 C 喜欢物品 A,那么可以推断出用户 C 可能也喜欢物品 C。

上文中图片和解释文字,摘自: https://www.ibm.com/developerworks/cn/web/1103_zhaoct_recommstudy2/

算法API: org.apache.mahout.cf.taste.impl.recommender.GenericItemBasedRecommender


  @Override
  public float estimatePreference(long userID, long itemID) throws TasteException {
    PreferenceArray preferencesFromUser = getDataModel().getPreferencesFromUser(userID);
    Float actualPref = getPreferenceForItem(preferencesFromUser, itemID);
    if (actualPref != null) {
      return actualPref;
    }
    return doEstimatePreference(userID, preferencesFromUser, itemID);
  }

protected float doEstimatePreference(long userID, PreferenceArray preferencesFromUser, long itemID)
    throws TasteException {
    double preference = 0.0;
    double totalSimilarity = 0.0;
    int count = 0;
    double[] similarities = similarity.itemSimilarities(itemID, preferencesFromUser.getIDs());
    for (int i = 0; i < similarities.length; i++) {
      double theSimilarity = similarities[i];
      if (!Double.isNaN(theSimilarity)) {
        // Weights can be negative!
        preference += theSimilarity * preferencesFromUser.getValue(i);
        totalSimilarity += theSimilarity;
        count++;
      }
    }
    // Throw out the estimate if it was based on no data points, of course, but also if based on
    // just one. This is a bit of a band-aid on the 'stock' item-based algorithm for the moment.
    // The reason is that in this case the estimate is, simply, the user's rating for one item
    // that happened to have a defined similarity. The similarity score doesn't matter, and that
    // seems like a bad situation.
    if (count <= 1) {
      return Float.NaN;
    }
    float estimate = (float) (preference / totalSimilarity);
    if (capper != null) {
      estimate = capper.capEstimate(estimate);
    }
    return estimate;
  }

测试程序:


    public static void itemCF(DataModel dataModel) throws TasteException {
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.8676552772521973
Recommender IR Evaluator: [Precision:0.5,Recall:1.0]
uid:1,(105,3.823529)(104,3.722222)(106,3.478261)
uid:2,(106,2.984848)(105,2.537037)(107,2.000000)
uid:3,(106,3.648649)(102,3.380000)(103,3.312500)
uid:4,(107,4.722222)(105,4.313953)(102,4.025000)
uid:5,(107,3.736842)

7. SlopeOne算法

这个算法在mahout-0.8版本中,已经被@Deprecated。

SlopeOne是一种简单高效的协同过滤算法。通过均差计算进行评分。SlopeOne论文下载(PDF)

1). 举例说明:
用户X,Y,Z,对于物品A,B进行打分,如下表,求Z对B的打分是多少?

slopeone

Slope one算法认为:平均值可以代替某两个未知个体之间的打分差异,事物A对事物B的平均差是:((5 - 4) + (4 - 2)) / 2 = 1.5,就得到Z对B的打分是,3-1.5 = 1.5。

Slope one算法将用户的评分之间的关系看作简单的线性关系:

Y = mX + b

2). 平均加权计算:
用户X,Y,Z,对于物品A,B,C进行打分,如下表,求Z对A的打分是多少?

slopeone2

  • 1. 计算A和B的平均差, ((5-3)+(3-4))/2=0.5
  • 2. 计算A和C的平均差, (5-2)/1=3
  • 3. Z对A的评分,通过AB得到, 2+0.5=2.5
  • 4. Z对A的评分,通过AC得到,5+3=8
  • 5. 通过加权平均计算Z对A的评分:A和B都有评价的用户数为2,A和C都有评价的用户数为1,权重为别是2和1, (2*2.5+1*8)/(2+1)=13/3=4.33

通过这种简单的方式,我们可以快速计算出一个评分项,完成推荐过程!

算法API: org.apache.mahout.cf.taste.impl.recommender.slopeone.SlopeOneRecommender


@Override
  public float estimatePreference(long userID, long itemID) throws TasteException {
    DataModel model = getDataModel();
    Float actualPref = model.getPreferenceValue(userID, itemID);
    if (actualPref != null) {
      return actualPref;
    }
    return doEstimatePreference(userID, itemID);
  }
  
  private float doEstimatePreference(long userID, long itemID) throws TasteException {
    double count = 0.0;
    double totalPreference = 0.0;
    PreferenceArray prefs = getDataModel().getPreferencesFromUser(userID);
    RunningAverage[] averages = diffStorage.getDiffs(userID, itemID, prefs);
    int size = prefs.length();
    for (int i = 0; i < size; i++) {
      RunningAverage averageDiff = averages[i];
      if (averageDiff != null) {
        double averageDiffValue = averageDiff.getAverage();
        if (weighted) {
          double weight = averageDiff.getCount();
          if (stdDevWeighted) {
            double stdev = ((RunningAverageAndStdDev) averageDiff).getStandardDeviation();
            if (!Double.isNaN(stdev)) {
              weight /= 1.0 + stdev;
            }
            // If stdev is NaN, then it is because count is 1. Because we're weighting by count,
            // the weight is already relatively low. We effectively assume stdev is 0.0 here and
            // that is reasonable enough. Otherwise, dividing by NaN would yield a weight of NaN
            // and disqualify this pref entirely
            // (Thanks Daemmon)
          }
          totalPreference += weight * (prefs.getValue(i) + averageDiffValue);
          count += weight;
        } else {
          totalPreference += prefs.getValue(i) + averageDiffValue;
          count += 1.0;
        }
      }
    }
    if (count <= 0.0) {
      RunningAverage itemAverage = diffStorage.getAverageItemPref(itemID);
      return itemAverage == null ? Float.NaN : (float) itemAverage.getAverage();
    } else {
      return (float) (totalPreference / count);
    }
  }

测试程序:


    public static void slopeOne(DataModel dataModel) throws TasteException {
        RecommenderBuilder recommenderBuilder = RecommendFactory.slopeOneRecommender();

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:1.3333333333333333
Recommender IR Evaluator: [Precision:0.25,Recall:0.5]
uid:1,(105,5.750000)(104,5.250000)(106,4.500000)
uid:2,(105,2.286115)(106,1.500000)
uid:3,(106,2.000000)(102,1.666667)(103,1.625000)
uid:4,(105,4.976859)(102,3.509071)

8. KNN Linear interpolation item–based推荐算法

这个算法在mahout-0.8版本中,已经被@Deprecated。

算法来自论文:
This algorithm is based in the paper of Robert M. Bell and Yehuda Koren in ICDM '07.

(TODO未完)

算法API: org.apache.mahout.cf.taste.impl.recommender.knn.KnnItemBasedRecommender


@Override
  protected float doEstimatePreference(long theUserID, PreferenceArray preferencesFromUser, long itemID)
    throws TasteException {
    
    DataModel dataModel = getDataModel();
    int size = preferencesFromUser.length();
    FastIDSet possibleItemIDs = new FastIDSet(size);
    for (int i = 0; i < size; i++) {
      possibleItemIDs.add(preferencesFromUser.getItemID(i));
    }
    possibleItemIDs.remove(itemID);
    
    List mostSimilar = mostSimilarItems(itemID, possibleItemIDs.iterator(),
      neighborhoodSize, null);
    long[] theNeighborhood = new long[mostSimilar.size() + 1];
    theNeighborhood[0] = -1;
  
    List usersRatedNeighborhood = Lists.newArrayList();
    int nOffset = 0;
    for (RecommendedItem rec : mostSimilar) {
      theNeighborhood[nOffset++] = rec.getItemID();
    }
    
    if (!mostSimilar.isEmpty()) {
      theNeighborhood[mostSimilar.size()] = itemID;
      for (int i = 0; i < theNeighborhood.length; i++) {
        PreferenceArray usersNeighborhood = dataModel.getPreferencesForItem(theNeighborhood[i]);
        int size1 = usersRatedNeighborhood.isEmpty() ? usersNeighborhood.length() : usersRatedNeighborhood.size();
        for (int j = 0; j < size1; j++) {
          if (i == 0) {
            usersRatedNeighborhood.add(usersNeighborhood.getUserID(j));
          } else {
            if (j >= usersRatedNeighborhood.size()) {
              break;
            }
            long index = usersRatedNeighborhood.get(j);
            if (!usersNeighborhood.hasPrefWithUserID(index) || index == theUserID) {
              usersRatedNeighborhood.remove(index);
              j--;
            }
          }
        }
      }
    }

    double[] weights = null;
    if (!mostSimilar.isEmpty()) {
      weights = getInterpolations(itemID, theNeighborhood, usersRatedNeighborhood);
    }
    
    int i = 0;
    double preference = 0.0;
    double totalSimilarity = 0.0;
    for (long jitem : theNeighborhood) {
      
      Float pref = dataModel.getPreferenceValue(theUserID, jitem);
      
      if (pref != null) {
        double weight = weights[i];
        preference += pref * weight;
        totalSimilarity += weight;
      }
      i++;
      
    }
    return totalSimilarity == 0.0 ? Float.NaN : (float) (preference / totalSimilarity);
  }
  
}

测试程序:


    public static void itemKNN(DataModel dataModel) throws TasteException {
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemKNNRecommender(itemSimilarity, new NonNegativeQuadraticOptimizer(), 10);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:1.5
Recommender IR Evaluator: [Precision:0.5,Recall:1.0]
uid:1,(107,5.000000)(104,3.501168)(106,3.498198)
uid:2,(105,2.878995)(106,2.878086)(107,2.000000)
uid:3,(103,3.667444)(102,3.667161)(106,3.667019)
uid:4,(107,4.750247)(102,4.122755)(105,4.122709)
uid:5,(107,3.833621)

9. SVD推荐算法

(TODO未完)

算法API: org.apache.mahout.cf.taste.impl.recommender.svd.SVDRecommender


@Override
  public float estimatePreference(long userID, long itemID) throws TasteException {
    double[] userFeatures = factorization.getUserFeatures(userID);
    double[] itemFeatures = factorization.getItemFeatures(itemID);
    double estimate = 0;
    for (int feature = 0; feature < userFeatures.length; feature++) {
      estimate += userFeatures[feature] * itemFeatures[feature];
    }
    return (float) estimate;
  }

测试程序:


    public static void svd(DataModel dataModel) throws TasteException {
        RecommenderBuilder recommenderBuilder = RecommendFactory.svdRecommender(new ALSWRFactorizer(dataModel, 10, 0.05, 10));

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.09990564982096355
Recommender IR Evaluator: [Precision:0.5,Recall:1.0]
uid:1,(104,4.032909)(105,3.390885)(107,1.858541)
uid:2,(105,3.761718)(106,2.951908)(107,1.561116)
uid:3,(103,5.593422)(102,2.458930)(106,-0.091259)
uid:4,(105,4.068329)(102,3.534025)(107,0.206257)
uid:5,(107,0.105169)

10. Tree Cluster-based 推荐算法

这个算法在mahout-0.8版本中,已经被@Deprecated。

(TODO未完)

算法API: org.apache.mahout.cf.taste.impl.recommender.TreeClusteringRecommender


  @Override
  public float estimatePreference(long userID, long itemID) throws TasteException {
    DataModel model = getDataModel();
    Float actualPref = model.getPreferenceValue(userID, itemID);
    if (actualPref != null) {
      return actualPref;
    }
    buildClusters();
    List topRecsForUser = topRecsByUserID.get(userID);
    if (topRecsForUser != null) {
      for (RecommendedItem item : topRecsForUser) {
        if (itemID == item.getItemID()) {
          return item.getValue();
        }
      }
    }
    // Hmm, we have no idea. The item is not in the user's cluster
    return Float.NaN;
  }

测试程序:


    public static void treeCluster(DataModel dataModel) throws TasteException {
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        ClusterSimilarity clusterSimilarity = RecommendFactory.clusterSimilarity(RecommendFactory.SIMILARITY.FARTHEST_NEIGHBOR_CLUSTER, userSimilarity);
        RecommenderBuilder recommenderBuilder = RecommendFactory.treeClusterRecommender(clusterSimilarity, 10);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
            RecommendFactory.showItems(uid, list, true);
        }
    }

程序输出:


AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:NaN
Recommender IR Evaluator: [Precision:NaN,Recall:0.0]

11. Mahout推荐算法总结

算法及适用场景:

recommender-intro

算法评分的结果:

recommender-score

通过对上面几种算法的一平分比较:itemCF,itemKNN,SVD的Rrecision,Recall的评分值是最好的,并且itemCF和SVD的AVERAGE_ABSOLUTE_DIFFERENCE是最低的,所以,从算法的角度知道了,哪个算法是更准确的或者会索引到更多的数据集。

另外的一些因素:

  • 1. 这3个指标,并不能直接决定计算结果一定itemCF,SVD好
  • 2. 各种算法的参数我们并没有调优
  • 3. 数据量和数据分布,是影响算法的评分

程序源代码下载

https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.8/src/main/java/org/conan/mymahout/recommendation/job

转载请注明出处:
http://blog.fens.me/mahout-recommendation-api

打赏作者