• Archive by category "Hadoop实践"
  • (Page 2)

Blog Archives

PageRank算法并行实现

算法为王系列文章,涵盖了计算机算法,数据挖掘(机器学习)算法,统计算法,金融算法等的多种跨学科算法组合。在大数据时代的背景下,算法已经成为了金字塔顶的明星。一个好的算法可以创造一个伟大帝国,就像Google。

算法为王的时代正式到来….

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/algorithm-pagerank-mapreduce/

pagerank-mapreduce

前言

Google通过PageRank算法模型,实现了对全互联网网页的打分。但对于海量数据的处理,在单机下是不可能实现,所以如何将PageRank并行计算,将是本文的重点。

本文将继续上一篇文章 PageRank算法R语言实现,把PageRank单机实现,改成并行实现,利用MapReduce计算框架,在集群中跑起来。

目录

  1. PageRank算法并行化原理
  2. MapReduce分步式编程

1. PageRank算法分步式原理

单机算法原理请参考文章:PageRank算法R语言实现

pagerank-sample

PageRank的分步式算法原理,简单来讲,就是通过矩阵计算实现并行化。

1). 把邻接矩阵的列,按数据行存储

邻接矩阵


          [,1]   [,2]   [,3]   [,4]
[1,] 0.0375000 0.0375 0.0375 0.0375
[2,] 0.3208333 0.0375 0.0375 0.8875
[3,] 0.3208333 0.4625 0.0375 0.0375
[4,] 0.3208333 0.4625 0.8875 0.0375

按行存储HDFS


1       0.037499994,0.32083333,0.32083333,0.32083333
2       0.037499994,0.037499994,0.4625,0.4625
3       0.037499994,0.037499994,0.037499994,0.88750005
4       0.037499994,0.88750005,0.037499994,0.037499994

2). 迭代:求矩阵特征值

pagerank-mr

map过程:

  • input: 邻接矩阵, pr值
  • output: key为pr的行号,value为邻接矩阵和pr值的乘法求和公式

reduce过程:

  • input: key为pr的行号,value为邻接矩阵和pr值的乘法求和公式
  • output: key为pr的行号, value为计算的结果,即pr值

第1次迭代


0.0375000 0.0375 0.0375 0.0375     1     0.150000
0.3208333 0.0375 0.0375 0.8875  *  1  =  1.283333
0.3208333 0.4625 0.0375 0.0375     1     0.858333
0.3208333 0.4625 0.8875 0.0375     1     1.708333

第2次迭代


0.0375000 0.0375 0.0375 0.0375     0.150000      0.150000
0.3208333 0.0375 0.0375 0.8875  *  1.283333  =   1.6445833
0.3208333 0.4625 0.0375 0.0375     0.858333      0.7379167
0.3208333 0.4625 0.8875 0.0375     1.708333      1.4675000

… 10次迭代

特征值


0.1500000
1.4955721
0.8255034
1.5289245

3). 标准化PR值


0.150000                                              0.0375000
1.4955721  / (0.15+1.4955721+0.8255034+1.5289245) =   0.3738930
0.8255034                                             0.2063759
1.5289245                                             0.3822311

2. MapReduce分步式编程

MapReduce流程分解

PageRankJob

HDFS目录

  • input(/user/hdfs/pagerank): HDFS的根目录
  • input_pr(/user/hdfs/pagerank/pr): 临时目录,存储中间结果PR值
  • tmp1(/user/hdfs/pagerank/tmp1):临时目录,存储邻接矩阵
  • tmp2(/user/hdfs/pagerank/tmp2):临时目录,迭代计算PR值,然后保存到input_pr
  • result(/user/hdfs/pagerank/result): PR值输出结果

开发步骤:

  • 网页链接关系数据: page.csv
  • 出始的PR数据:pr.csv
  • 邻接矩阵: AdjacencyMatrix.java
  • PageRank计算: PageRank.java
  • PR标准化: Normal.java
  • 启动程序: PageRankJob.java

1). 网页链接关系数据: page.csv

新建文件:page.csv


1,2
1,3
1,4
2,3
2,4
3,4
4,2

2). 出始的PR数据:pr.csv

设置网页的初始值都是1

新建文件:pr.csv


1,1
2,1
3,1
4,1

3). 邻接矩阵: AdjacencyMatrix.java

adjacencyMatrix

矩阵解释:

  • 阻尼系数为0.85
  • 页面数为4
  • reduce以行输出矩阵的列,输出列主要用于分步式存储,下一步需要转成行

新建程序:AdjacencyMatrix.java


package org.conan.myhadoop.pagerank;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class AdjacencyMatrix {

    private static int nums = 4;// 页面数
    private static float d = 0.85f;// 阻尼系数

    public static class AdjacencyMatrixMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = PageRankJob.DELIMITER.split(values.toString());
            Text k = new Text(tokens[0]);
            Text v = new Text(tokens[1]);
            context.write(k, v);
        }
    }

    public static class AdjacencyMatrixReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            float[] G = new float[nums];// 概率矩阵列
            Arrays.fill(G, (float) (1 - d) / G.length);

            float[] A = new float[nums];// 近邻矩阵列
            int sum = 0;// 链出数量
            for (Text val : values) {
                int idx = Integer.parseInt(val.toString());
                A[idx - 1] = 1;
                sum++;
            }

            if (sum == 0) {// 分母不能为0
                sum = 1;
            }

            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < A.length; i++) {
                sb.append("," + (float) (G[i] + d * A[i] / sum));
            }

            Text v = new Text(sb.toString().substring(1));
            System.out.println(key + ":" + v.toString());
            context.write(key, v);
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();

        String input = path.get("input");
        String input_pr = path.get("input_pr");
        String output = path.get("tmp1");
        String page = path.get("page");
        String pr = path.get("pr");

        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.mkdirs(input_pr);
        hdfs.copyFile(page, input);
        hdfs.copyFile(pr, input_pr);

        Job job = new Job(conf);
        job.setJarByClass(AdjacencyMatrix.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(AdjacencyMatrixMapper.class);
        job.setReducerClass(AdjacencyMatrixReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(page));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }
}

4). PageRank计算: PageRank.java

pagerank-step1

矩阵解释:

  • 实现邻接与PR矩阵的乘法
  • map以邻接矩阵的行号为key,由于上一步是输出的是列,所以这里需要转成行
  • reduce计算得到未标准化的特征值

新建文件: PageRank.java


package org.conan.myhadoop.pagerank;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class PageRank {

    public static class PageRankMapper extends Mapper<LongWritable, Text, Text, Text> {

        private String flag;// tmp1 or result
        private static int nums = 4;// 页面数

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();// 判断读的数据集
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = PageRankJob.DELIMITER.split(values.toString());

            if (flag.equals("tmp1")) {
                String row = values.toString().substring(0,1);
                String[] vals = PageRankJob.DELIMITER.split(values.toString().substring(2));// 矩阵转置
                for (int i = 0; i < vals.length; i++) {
                    Text k = new Text(String.valueOf(i + 1));
                    Text v = new Text(String.valueOf("A:" + (row) + "," + vals[i]));
                    context.write(k, v);
                }

            } else if (flag.equals("pr")) {
                for (int i = 1; i <= nums; i++) {
                    Text k = new Text(String.valueOf(i));
                    Text v = new Text("B:" + tokens[0] + "," + tokens[1]);
                    context.write(k, v);
                }
            }
        }
    }

    public static class PageRankReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            Map<Integer, Float> mapA = new HashMap<Integer, Float>();
            Map<Integer, Float> mapB = new HashMap<Integer, Float>();
            float pr = 0f;

            for (Text line : values) {
                System.out.println(line);
                String vals = line.toString();

                if (vals.startsWith("A:")) {
                    String[] tokenA = PageRankJob.DELIMITER.split(vals.substring(2));
                    mapA.put(Integer.parseInt(tokenA[0]), Float.parseFloat(tokenA[1]));
                }

                if (vals.startsWith("B:")) {
                    String[] tokenB = PageRankJob.DELIMITER.split(vals.substring(2));
                    mapB.put(Integer.parseInt(tokenB[0]), Float.parseFloat(tokenB[1]));
                }
            }

            Iterator iterA = mapA.keySet().iterator();
            while(iterA.hasNext()){
                int idx = iterA.next();
                float A = mapA.get(idx);
                float B = mapB.get(idx);
                pr += A * B;
            }

            context.write(key, new Text(PageRankJob.scaleFloat(pr)));
            // System.out.println(key + ":" + PageRankJob.scaleFloat(pr));
        }

    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();

        String input = path.get("tmp1");
        String output = path.get("tmp2");
        String pr = path.get("input_pr");

        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(PageRank.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(PageRankMapper.class);
         job.setReducerClass(PageRankReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input), new Path(pr));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);

        hdfs.rmr(pr);
        hdfs.rename(output, pr);
    }
}

5). PR标准化: Normal.java

normal-step1

矩阵解释:

  • 对PR的计算结果标准化,让所以PR值落在(0,1)区间

新建文件:Normal.java


package org.conan.myhadoop.pagerank;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Normal {

    public static class NormalMapper extends Mapper<LongWritable, Text, Text, Text> {

        Text k = new Text("1");

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            context.write(k, values);
        }
    }

    public static class NormalReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            List vList = new ArrayList();

            float sum = 0f;
            for (Text line : values) {
                vList.add(line.toString());

                String[] vals = PageRankJob.DELIMITER.split(line.toString());
                float f = Float.parseFloat(vals[1]);
                sum += f;
            }

            for (String line : vList) {
                String[] vals = PageRankJob.DELIMITER.split(line.toString());
                Text k = new Text(vals[0]);

                float f = Float.parseFloat(vals[1]);
                Text v = new Text(PageRankJob.scaleFloat((float) (f / sum)));
                context.write(k, v);

                System.out.println(k + ":" + v);
            }
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();

        String input = path.get("input_pr");
        String output = path.get("result");

        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(Normal.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(NormalMapper.class);
        job.setReducerClass(NormalReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }
}

6). 启动程序: PageRankJob.java

新建文件:PageRankJob.java


package org.conan.myhadoop.pagerank;

import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.mapred.JobConf;

public class PageRankJob {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static void main(String[] args) {
        Map<String, String> path = new HashMap<String, String>();
        path.put("page", "logfile/pagerank/page.csv");// 本地的数据文件
        path.put("pr", "logfile/pagerank/pr.csv");// 本地的数据文件

        path.put("input", HDFS + "/user/hdfs/pagerank");// HDFS的目录
        path.put("input_pr", HDFS + "/user/hdfs/pagerank/pr");// pr存储目
        path.put("tmp1", HDFS + "/user/hdfs/pagerank/tmp1");// 临时目录,存放邻接矩阵
        path.put("tmp2", HDFS + "/user/hdfs/pagerank/tmp2");// 临时目录,计算到得PR,覆盖input_pr

        path.put("result", HDFS + "/user/hdfs/pagerank/result");// 计算结果的PR

        try {

            AdjacencyMatrix.run(path);
            int iter = 3;
            for (int i = 0; i < iter; i++) {// 迭代执行
                PageRank.run(path);
            }
            Normal.run(path);

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(0);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(PageRankJob.class);
        conf.setJobName("PageRank");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static String scaleFloat(float f) {// 保留6位小数
        DecimalFormat df = new DecimalFormat("##0.000000");
        return df.format(f);
    }
}

程序代码已上传到github:

https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myhadoop/pagerank

这样就实现了,PageRank的并行吧!接下来,我们就可以用PageRank做一些有意思的应用了。

转载请注明出处:
http://blog.fens.me/algorithm-pagerank-mapreduce/

打赏作者

用MapReduce实现矩阵乘法

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-matrix/

hadoop-mapreduce-matrix

前言

MapReduce打开了并行计算的大门,让我们个人开发者有了处理大数据的能力。但想用好MapReduce,把原来单机算法并行化,也不是一件容易事情。很多的时候,我们需要从单机算法能否矩阵化去思考,所以矩阵操作就变成了算法并行化的基础。

像推荐系统的协同过滤算法,就是基于矩阵思想实现MapReduce并行化。

目录

  1. 矩阵介绍
  2. 矩阵乘法的R语言计算
  3. 矩阵乘法的MapReduce计算
  4. 稀疏矩阵乘法的MapReduce计算

1. 矩阵介绍

矩阵: 数学上,一个m×n的矩阵是一个由m行n列元素排列成的矩形阵列。矩阵里的元素可以是数字、符号或数学式。以下是一个由6个数字符素构成的2行3列的矩阵:


1 2 3
4 5 6

矩阵加法
大小相同(行数列数都相同)的矩阵之间可以相互加减,具体是对每个位置上的元素做加减法。

举例:两个矩阵的加法


1 3 1   +  0 0 5   =   1+0 3+0 1+5   =   1 3 6
1 0 0      7 5 0       1+7 0+5 0+0       8 5 0 

矩阵乘法
两个矩阵可以相乘,当且仅当第一个矩阵的列数等于第二个矩阵的行数。矩阵的乘法满足结合律和分配律,但不满足交换律。

举例:两个矩阵的乘法


 1 0 2   *   3 1   =  (1*3+0*2+2*1)  (1*1+0*1+2*0)    =  5 1
-1 3 1       2 1      (-1*3+3*2+1*1) (-1*1+3*1+1*0)      4 2
             1 0 

2. 矩阵乘法的R语言计算


> m1<-matrix(c(1,0,2,-1,3,1),nrow=2,byrow=TRUE);m1
     [,1] [,2] [,3]
[1,]    1    0    2
[2,]   -1    3    1

> m2<-matrix(c(3,1,2,1,1,0),nrow=3,byrow=TRUE);m2
     [,1] [,2]
[1,]    3    1
[2,]    2    1
[3,]    1    0

> m3<-m1 %*% m2;m3
     [,1] [,2]
[1,]    5    1
[2,]    4    2

由R语言实现矩阵的乘法是非常简单的。

3. 矩阵乘法的MapReduce计算

算法实现思路:

mapreduce-matrix

  • 新建2个矩阵数据文件:m1.csv, m2.csv
  • 新建启动程序:MainRun.java
  • 新建MR程序:MartrixMultiply.java

1).新建2个矩阵数据文件m1.csv, m2.csv

m1.csv


1,0,2
-1,3,1

m2.csv


3,1
2,1
1,0

3).新建启动程序:MainRun.java

启动程序


package org.conan.myhadoop.matrix;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.mapred.JobConf;

public class MainRun {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static void main(String[] args) {
        martrixMultiply();
    }
    
    public static void martrixMultiply() {
        Map<String, String> path = new HashMap<String, String>();
        path.put("m1", "logfile/matrix/m1.csv");// 本地的数据文件
        path.put("m2", "logfile/matrix/m2.csv");
        path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目录
        path.put("input1", HDFS + "/user/hdfs/matrix/m1");
        path.put("input2", HDFS + "/user/hdfs/matrix/m2");
        path.put("output", HDFS + "/user/hdfs/matrix/output");

        try {
            MartrixMultiply.run(path);// 启动程序
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(0);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(MainRun.class);
        conf.setJobName("MartrixMultiply");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

}

3).新建MR程序:MartrixMultiply.java

MapReduce程序


package org.conan.myhadoop.matrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class MartrixMultiply {

    public static class MatrixMapper extends Mapper<LongWritable, Text, Text, Text> {

        private String flag;// m1 or m2

        private int rowNum = 2;// 矩阵A的行数
        private int colNum = 2;// 矩阵B的列数
        private int rowIndexA = 1; // 矩阵A,当前在第几行
        private int rowIndexB = 1; // 矩阵B,当前在第几行

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();// 判断读的数据集
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = MainRun.DELIMITER.split(values.toString());
            if (flag.equals("m1")) {
                for (int i = 1; i <= rowNum; i++) {
                    Text k = new Text(rowIndexA + "," + i);
                    for (int j = 1; j <= tokens.length; j++) {
                        Text v = new Text("A:" + j + "," + tokens[j - 1]);
                        context.write(k, v);
                        System.out.println(k.toString() + "  " + v.toString());
                    }

                }
                rowIndexA++;

            } else if (flag.equals("m2")) {
                for (int i = 1; i <= tokens.length; i++) {
                    for (int j = 1; j <= colNum; j++) {
                        Text k = new Text(i + "," + j);
                        Text v = new Text("B:" + rowIndexB + "," + tokens[j - 1]);
                        context.write(k, v);
                        System.out.println(k.toString() + "  " + v.toString());
                    }
                }

                rowIndexB++;
            }
        }
    }

    public static class MatrixReducer extends Reducer<Text, Text, Text, IntWritable> {

        @Override
        public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

            Map<String, String> mapA = new HashMap<String, String>();
            Map<String, String> mapB = new HashMap<String, String>();

            System.out.print(key.toString() + ":");

            for (Text line : values) {
                String val = line.toString();
                System.out.print("("+val+")");

                if (val.startsWith("A:")) {
                    String[] kv = MainRun.DELIMITER.split(val.substring(2));
                    mapA.put(kv[0], kv[1]);

                    // System.out.println("A:" + kv[0] + "," + kv[1]);

                } else if (val.startsWith("B:")) {
                    String[] kv = MainRun.DELIMITER.split(val.substring(2));
                    mapB.put(kv[0], kv[1]);

                    // System.out.println("B:" + kv[0] + "," + kv[1]);
                }
            }

            int result = 0;
            Iterator<String> iter = mapA.keySet().iterator();
            while (iter.hasNext()) {
                String mapk = iter.next();
                result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(mapB.get(mapk));
            }
            context.write(key, new IntWritable(result));
            System.out.println();

            // System.out.println("C:" + key.toString() + "," + result);
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = MainRun.config();

        String input = path.get("input");
        String input1 = path.get("input1");
        String input2 = path.get("input2");
        String output = path.get("output");

        HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(path.get("m1"), input1);
        hdfs.copyFile(path.get("m2"), input2);

        Job job = new Job(conf);
        job.setJarByClass(MartrixMultiply.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(MatrixMapper.class);
        job.setReducerClass(MatrixReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

}

运行日志


Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix
Create: hdfs://192.168.1.210:9000/user/hdfs/matrix
copy from: logfile/matrix/m1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1
copy from: logfile/matrix/m2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2
2014-1-15 10:48:03 org.apache.hadoop.util.NativeCodeLoader <clinit>
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2014-1-15 10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2014-1-15 10:48:03 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
2014-1-15 10:48:03 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 2
2014-1-15 10:48:03 org.apache.hadoop.io.compress.snappy.LoadSnappy <clinit>
警告: Snappy native library not loaded
2014-1-15 10:48:04 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2014-1-15 10:48:04 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: io.sort.mb = 100
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: data buffer = 79691776/99614720
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: record buffer = 262144/327680
1,1  A:1,1
1,1  A:2,0
1,1  A:3,2
1,2  A:1,1
1,2  A:2,0
1,2  A:3,2
2,1  A:1,-1
2,1  A:2,3
2,1  A:3,1
2,2  A:1,-1
2,2  A:2,3
2,2  A:3,1
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2014-1-15 10:48:04 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2014-1-15 10:48:04 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2014-1-15 10:48:05 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 0% reduce 0%
2014-1-15 10:48:07 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 10:48:07 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2014-1-15 10:48:07 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: io.sort.mb = 100
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: data buffer = 79691776/99614720
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer <init>
信息: record buffer = 262144/327680
1,1  B:1,3
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
1,2  B:1,1
2,1  B:1,3
2,2  B:1,1
1,1  B:2,2
1,2  B:2,1
2,1  B:2,2
2,2  B:2,1
1,1  B:3,1
1,2  B:3,0
2,1  B:3,1
2,2  B:3,0
2014-1-15 10:48:07 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2014-1-15 10:48:07 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
2014-1-15 10:48:08 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000001_0' done.
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 2 sorted segments
2014-1-15 10:48:10 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 2 segments left of total size: 294 bytes
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
1,1:(B:1,3)(B:2,2)(B:3,1)(A:1,1)(A:2,0)(A:3,2)
1,2:(A:1,1)(A:2,0)(A:3,2)(B:1,1)(B:2,1)(B:3,0)
2,1:(B:1,3)(B:2,2)(B:3,1)(A:1,-1)(A:2,3)(A:3,1)
2,2:(A:1,-1)(A:2,3)(A:3,1)(B:1,1)(B:2,1)(B:3,0)
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2014-1-15 10:48:10 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 10:48:10 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2014-1-15 10:48:10 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output
2014-1-15 10:48:13 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce > reduce
2014-1-15 10:48:13 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2014-1-15 10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2014-1-15 10:48:14 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=24
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=1713
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=75
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=125314
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=114
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=30
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=302
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Map input records=5
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=48
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=242
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=764215296
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=220
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=24
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=4
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=4
2014-1-15 10:48:14 org.apache.hadoop.mapred.Counters log
信息:     Map output records=24

4. 稀疏矩阵乘法的MapReduce计算

我们在用矩阵处理真实数据的时候,一般都是非常稀疏矩阵,为了节省存储空间,通常只会存储非0的数据。

下面我们来做一个稀疏矩阵:

spraseMatrix

  • R语言的实现矩阵乘法
  • 新建2个矩阵数据文件sm1.csv, sm2.csv
  • 修改启动程序:MainRun.java
  • 新建MR程序:SparseMartrixMultiply.java

1). R语言的实现矩阵乘法

R语言程序


> m1<-matrix(c(1,0,0,3,2,5,0,4,0,0,0,1,4,7,1,2),nrow=4,byrow=TRUE);m1
     [,1] [,2] [,3] [,4]
[1,]    1    0    0    3
[2,]    2    5    0    4
[3,]    0    0    0    1
[4,]    4    7    1    2

> m2<-matrix(c(5,0,0,2,0,0,3,1),nrow=4,byrow=TRUE);m2
     [,1] [,2]
[1,]    5    0
[2,]    0    2
[3,]    0    0
[4,]    3    1

> m3<-m1 %*% m2;m3
     [,1] [,2]
[1,]   14    3
[2,]   22   14
[3,]    3    1
[4,]   26   16

2).新建2个稀疏矩阵数据文件sm1.csv, sm2.csv

只存储非0的数据,3列存储,第一列“原矩阵行”,第二列“原矩阵列”,第三列“原矩阵值”。

sm1.csv


1,1,1
1,4,3
2,1,2
2,2,5
2,4,4
3,4,1
4,1,4
4,2,7
4,3,1
4,4,2

sm2.csv


1,1,5
2,2,2
4,1,3
4,2,1

3).修改启动程序:MainRun.java

增加SparseMartrixMultiply的启动配置


    public static void main(String[] args) {
        sparseMartrixMultiply();
    }    
    
    public static void sparseMartrixMultiply() {
        Map<String, String> path = new HashMap<String, String>();
        path.put("m1", "logfile/matrix/sm1.csv");// 本地的数据文件
        path.put("m2", "logfile/matrix/sm2.csv");
        path.put("input", HDFS + "/user/hdfs/matrix");// HDFS的目录
        path.put("input1", HDFS + "/user/hdfs/matrix/m1");
        path.put("input2", HDFS + "/user/hdfs/matrix/m2");
        path.put("output", HDFS + "/user/hdfs/matrix/output");

        try {
            SparseMartrixMultiply.run(path);// 启动程序
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(0);
    }

4). 新建MR程序:SparseMartrixMultiply.java

spareseMatrix2

  • map函数有修改,reduce函数没有变化
  • 去掉判断所在行和列的变量

package org.conan.myhadoop.matrix;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class SparseMartrixMultiply {

    public static class SparseMatrixMapper extends Mapper>LongWritable, Text, Text, Text< {

        private String flag;// m1 or m2

        private int rowNum = 4;// 矩阵A的行数
        private int colNum = 2;// 矩阵B的列数

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getName();// 判断读的数据集
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            String[] tokens = MainRun.DELIMITER.split(values.toString());
            if (flag.equals("m1")) {
                String row = tokens[0];
                String col = tokens[1];
                String val = tokens[2];

                for (int i = 1; i >= colNum; i++) {
                    Text k = new Text(row + "," + i);
                    Text v = new Text("A:" + col + "," + val);
                    context.write(k, v);
                    System.out.println(k.toString() + "  " + v.toString());
                }

            } else if (flag.equals("m2")) {
                String row = tokens[0];
                String col = tokens[1];
                String val = tokens[2];

                for (int i = 1; i >= rowNum; i++) {
                    Text k = new Text(i + "," + col);
                    Text v = new Text("B:" + row + "," + val);
                    context.write(k, v);
                    System.out.println(k.toString() + "  " + v.toString());

                }
            }
        }
    }

    public static class SparseMatrixReducer extends Reducer>Text, Text, Text, IntWritable< {

        @Override
        public void reduce(Text key, Iterable>Text< values, Context context) throws IOException, InterruptedException {

            Map>String, String< mapA = new HashMap>String, String<();
            Map>String, String< mapB = new HashMap>String, String<();

            System.out.print(key.toString() + ":");

            for (Text line : values) {
                String val = line.toString();
                System.out.print("(" + val + ")");

                if (val.startsWith("A:")) {
                    String[] kv = MainRun.DELIMITER.split(val.substring(2));
                    mapA.put(kv[0], kv[1]);

                    // System.out.println("A:" + kv[0] + "," + kv[1]);

                } else if (val.startsWith("B:")) {
                    String[] kv = MainRun.DELIMITER.split(val.substring(2));
                    mapB.put(kv[0], kv[1]);

                    // System.out.println("B:" + kv[0] + "," + kv[1]);
                }
            }

            int result = 0;
            Iterator>String< iter = mapA.keySet().iterator();
            while (iter.hasNext()) {
                String mapk = iter.next();
                String bVal = mapB.containsKey(mapk) ? mapB.get(mapk) : "0";
                result += Integer.parseInt(mapA.get(mapk)) * Integer.parseInt(bVal);
            }
            context.write(key, new IntWritable(result));
            System.out.println();

            // System.out.println("C:" + key.toString() + "," + result);
        }
    }

    public static void run(Map>String, String< path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = MainRun.config();

        String input = path.get("input");
        String input1 = path.get("input1");
        String input2 = path.get("input2");
        String output = path.get("output");

        HdfsDAO hdfs = new HdfsDAO(MainRun.HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(path.get("m1"), input1);
        hdfs.copyFile(path.get("m2"), input2);

        Job job = new Job(conf);
        job.setJarByClass(MartrixMultiply.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(SparseMatrixMapper.class);
        job.setReducerClass(SparseMatrixReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input1), new Path(input2));// 加载2个输入数据集
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }
}

运行输出:


Delete: hdfs://192.168.1.210:9000/user/hdfs/matrix
Create: hdfs://192.168.1.210:9000/user/hdfs/matrix
copy from: logfile/matrix/sm1.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m1
copy from: logfile/matrix/sm2.csv to hdfs://192.168.1.210:9000/user/hdfs/matrix/m2
2014-1-15 11:57:31 org.apache.hadoop.util.NativeCodeLoader >clinit<
警告: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same.
2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient copyAndConfigureFiles
警告: No job jar file set.  User classes may not be found. See JobConf(Class) or JobConf#setJar(String).
2014-1-15 11:57:31 org.apache.hadoop.mapreduce.lib.input.FileInputFormat listStatus
信息: Total input paths to process : 2
2014-1-15 11:57:31 org.apache.hadoop.io.compress.snappy.LoadSnappy >clinit<
警告: Snappy native library not loaded
2014-1-15 11:57:31 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Running job: job_local_0001
2014-1-15 11:57:31 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: io.sort.mb = 100
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: data buffer = 79691776/99614720
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: record buffer = 262144/327680
1,1  A:1,1
1,2  A:1,1
1,1  A:4,3
1,2  A:4,3
2,1  A:1,2
2,2  A:1,2
2,1  A:2,5
2,2  A:2,5
2,1  A:4,4
2,2  A:4,4
3,1  A:4,1
3,2  A:4,1
4,1  A:1,4
4,2  A:1,4
4,1  A:2,7
4,2  A:2,7
4,1  A:3,1
4,2  A:3,1
4,1  A:4,2
4,2  A:4,2
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
2014-1-15 11:57:31 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2014-1-15 11:57:31 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
2014-1-15 11:57:32 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 0% reduce 0%
2014-1-15 11:57:34 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 11:57:34 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000000_0' done.
2014-1-15 11:57:34 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: io.sort.mb = 100
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: data buffer = 79691776/99614720
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer >init<
信息: record buffer = 262144/327680
1,1  B:1,5
2,1  B:1,5
3,1  B:1,5
4,1  B:1,5
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer flush
信息: Starting flush of map output
1,2  B:2,2
2,2  B:2,2
3,2  B:2,2
4,2  B:2,2
1,1  B:4,3
2,1  B:4,3
3,1  B:4,3
4,1  B:4,3
1,2  B:4,1
2,2  B:4,1
3,2  B:4,1
4,2  B:4,1
2014-1-15 11:57:34 org.apache.hadoop.mapred.MapTask$MapOutputBuffer sortAndSpill
信息: Finished spill 0
2014-1-15 11:57:34 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_m_000001_0 is done. And is in the process of commiting
2014-1-15 11:57:35 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 0%
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_m_000001_0' done.
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task initialize
信息:  Using ResourceCalculatorPlugin : null
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Merging 2 sorted segments
2014-1-15 11:57:37 org.apache.hadoop.mapred.Merger$MergeQueue merge
信息: Down to the last merge-pass, with 2 segments left of total size: 436 bytes
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
1,1:(B:1,5)(B:4,3)(A:1,1)(A:4,3)
1,2:(A:1,1)(A:4,3)(B:2,2)(B:4,1)
2,1:(B:1,5)(B:4,3)(A:1,2)(A:2,5)(A:4,4)
2,2:(A:1,2)(A:2,5)(A:4,4)(B:4,1)(B:2,2)
3,1:(B:1,5)(B:4,3)(A:4,1)
3,2:(A:4,1)(B:2,2)(B:4,1)
4,1:(B:4,3)(B:1,5)(A:1,4)(A:2,7)(A:3,1)(A:4,2)
4,2:(A:1,4)(A:2,7)(A:3,1)(A:4,2)(B:2,2)(B:4,1)
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task done
信息: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
2014-1-15 11:57:37 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: 
2014-1-15 11:57:37 org.apache.hadoop.mapred.Task commit
信息: Task attempt_local_0001_r_000000_0 is allowed to commit now
2014-1-15 11:57:37 org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter commitTask
信息: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://192.168.1.210:9000/user/hdfs/matrix/output
2014-1-15 11:57:40 org.apache.hadoop.mapred.LocalJobRunner$Job statusUpdate
信息: reduce < reduce
2014-1-15 11:57:40 org.apache.hadoop.mapred.Task sendDone
信息: Task 'attempt_local_0001_r_000000_0' done.
2014-1-15 11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息:  map 100% reduce 100%
2014-1-15 11:57:41 org.apache.hadoop.mapred.JobClient monitorAndPrintJob
信息: Job complete: job_local_0001
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息: Counters: 19
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:   File Output Format Counters 
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Bytes Written=53
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:   FileSystemCounters
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_READ=2503
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_READ=266
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     FILE_BYTES_WRITTEN=126274
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     HDFS_BYTES_WRITTEN=347
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:   File Input Format Counters 
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Bytes Read=98
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:   Map-Reduce Framework
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Map output materialized bytes=444
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Map input records=14
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce shuffle bytes=0
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Spilled Records=72
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Map output bytes=360
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Total committed heap usage (bytes)=764215296
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     SPLIT_RAW_BYTES=220
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Combine input records=0
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce input records=36
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce input groups=8
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Combine output records=0
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Reduce output records=8
2014-1-15 11:57:41 org.apache.hadoop.mapred.Counters log
信息:     Map output records=36

程序源代码,已上传到github:
https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myhadoop/matrix

这样就用MapReduce的程序,实现了矩阵的乘法!有了矩阵计算的基础,接下来,我们就可以做更多的事情了!

参考文章:MapReduce实现大矩阵乘法

转载请注明出处:
http://blog.fens.me/hadoop-mapreduce-matrix/

打赏作者

ChinaHadoop大会 2103: R语言为Hadoop注入统计血脉

跨界知识聚会系列文章,“知识是用来分享和传承的”,各种会议、论坛、沙龙都是分享知识的绝佳场所。我也有幸作为演讲嘉宾参加了一些国内的大型会议,向大家展示我所做的一些成果。从听众到演讲感觉是不一样的,把知识分享出来,你才能收获更多。

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-china-rhadoop-2013/

rhadoop-chinahadoop

前言

今天有幸在2013年ChinaHadoop大会发言,为R语言推广做出一点点贡献,自己感觉非常的激动。自学习R语言以来,跨学科的思维模式,每天都在扩充自己的视野!“唯有跳出IT的圈子,才能体会IT正在改变着世界”。

以计算机技术和统计为工具,再结合行业知识,必将成为未来“数据掘金”的原动力!抓住时代的机会,是80后崛起的时候了!

目录

  1. 主题内容介绍
  2. 活动照片

1. 主题内容介绍

ChinaHadoop的大会主页:http://www.chinahadoop.com/

R语言为Hadoop注入统计血脉:PPT下载

  • 1). 主题:R语言为Hadoop注入统计血脉
  • 2). RHadoop基础程序
  • 3). 分步式协同过滤ItemCF算法介绍
  • 4). ItemCF算法:R本地程序实现
  • 5). ItemCF算法:RHadoop实现
  • 6). ItemCF算法:Java Hadoop MapReduce实现
  • 7). ItemCF算法:Mahout 实现
  • 8). 推荐结果,数据可视化

1). 主题:R语言为Hadoop注入统计血脉
主要内容:R语言为Hadoop注入统计血脉

2). RHadoop基础程序

主要内容:RHadoop实践系列之二:RHadoop安装与使用

源代码


#hdfs
library(rhdfs)
hdfs.init()
hdfs.ls("/user/")
hdfs.cat("/user/hdfs/o_t_account/part-m-00000")

#rmr
library(rmr2)
small.ints <- 1:10
sapply(small.ints, function(x) x^2)

small.ints <- to.dfs(keyval(1,1:10))
from.dfs(small.ints)

output<-mapreduce(input = small.ints, map = function(k, v) cbind(v, v^2))
from.dfs(output)

#rmr-wordcount
input<-"/user/hdfs/o_t_account/"
wordcount = function(input, output = NULL, pattern = ","){
  
  wc.map = function(., lines) {
    keyval(unlist( strsplit( x = lines,split = pattern)),1)
  }
  
  wc.reduce =function(word, counts ) {
    keyval(word, sum(counts))
  }         
  
  mapreduce(input = input ,output = output, input.format = "text",
            map = wc.map, reduce = wc.reduce,combine = TRUE)
}
output<-wordcount(input)
from.dfs(output)

3). 分步式协同过滤ItemCF算法介绍
主要内容:RHadoop实践系列之三 R实现MapReduce的协同过滤算法

aglorithm_2

4). ItemCF算法:R本地程序实现

主要内容:RHadoop实践系列之三 R实现MapReduce的协同过滤算法

源代码:


library(plyr)

#读取数据集
train<-read.csv(file="small.csv",header=FALSE)
names(train)<-c("user","item","pref") 

#计算用户列表
usersUnique<-function(){
  users<-unique(train$user)
  users[order(users)]
}

#计算商品列表方法
itemsUnique<-function(){
  items<-unique(train$item)
  items[order(items)]
}

# 用户列表
users<-usersUnique() 
users

# 商品列表
items<-itemsUnique() 
items

#建立商品列表索引
index<-function(x) which(items %in% x)
data<-ddply(train,.(user,item,pref),summarize,idx=index(item)) 
data

#同现矩阵
cooccurrence<-function(data){
  n<-length(items)
  co<-matrix(rep(0,n*n),nrow=n)
  for(u in users){
    idx<-index(data$item[which(data$user==u)])
    m<-merge(idx,idx)
    for(i in 1:nrow(m)){
      co[m$x[i],m$y[i]]=co[m$x[i],m$y[i]]+1
    }
  }
  return(co)
}


#推荐算法
recommend<-function(udata=udata,co=coMatrix,num=0){
  n<-length(items)
  
  # all of pref
  pref<-rep(0,n)
  pref[udata$idx]<-udata$pref
  
  # 用户评分矩阵
  userx<-matrix(pref,nrow=n)
  
  # 同现矩阵*评分矩阵
  r<-co %*% userx
  
  # 推荐结果排序
  r[udata$idx]<-0
  idx<-order(r,decreasing=TRUE)
  topn<-data.frame(user=rep(udata$user[1],length(idx)),item=items[idx],val=r[idx])
  topn<-topn[which(topn$val>0),]
  
  # 推荐结果取前num个
  if(num>0){
    topn<-head(topn,num)
  }
  
  #返回结果
  return(topn)
}

co<-cooccurrence(data) 
co


#计算推荐结果
recommendation<-data.frame()
for(i in 1:length(users)){
  udata<-data[which(data$user==users[i]),]
  recommendation<-rbind(recommendation,recommend(udata,co,0)) 
} 

recommendation

5). ItemCF算法:RHadoop实现

主要内容:RHadoop实践系列之三 R实现MapReduce的协同过滤算法

源代码:


#加载rmr2包
library(rmr2)

#输入数据文件
train<-read.csv(file="small.csv",header=FALSE)
names(train)<-c("user","item","pref")

#把数据集存入HDFS
train.hdfs = to.dfs(keyval(train$user,train))
from.dfs(train.hdfs)

#STEP 1, 建立物品的同现矩阵
# 1) 按用户分组,得到所有物品出现的组合列表。
train.mr<-mapreduce(
  train.hdfs, 
  map = function(k, v) {
    keyval(k,v$item)
  }
  ,reduce=function(k,v){
    m<-merge(v,v)
    keyval(m$x,m$y)
  }
)
from.dfs(train.mr)

# 2) 对物品组合列表进行计数,建立物品的同现矩阵
step2.mr<-mapreduce(
  train.mr,
  map = function(k, v) {
    d<-data.frame(k,v)
    d2<-ddply(d,.(k,v),count)
    
    key<-d2$k
    val<-d2
    keyval(key,val)
  }
)
from.dfs(step2.mr)

# 2. 建立用户对物品的评分矩阵
train2.mr<-mapreduce(
  train.hdfs, 
  map = function(k, v) {
    df<-v
    key<-df$item
    val<-data.frame(item=df$item,user=df$user,pref=df$pref)
    keyval(key,val)
  }
)
from.dfs(train2.mr)

#3. 合并同现矩阵 和 评分矩阵
eq.hdfs<-equijoin(
  left.input=step2.mr, 
  right.input=train2.mr,
  map.left=function(k,v){
    keyval(k,v)
  },
  map.right=function(k,v){
    keyval(k,v)
  },
  outer = c("left")
)
from.dfs(eq.hdfs)


#4. 计算推荐结果列表
cal.mr<-mapreduce(
  input=eq.hdfs,
  map=function(k,v){
    val<-v
    na<-is.na(v$user.r)
    if(length(which(na))>0) val<-v[-which(is.na(v$user.r)),]
    keyval(val$k.l,val)
  }
  ,reduce=function(k,v){
    val<-ddply(v,.(k.l,v.l,user.r),summarize,v=freq.l*pref.r)
    keyval(val$k.l,val)
  }
)
from.dfs(cal.mr)


#5. 按输入格式得到推荐评分列表
result.mr<-mapreduce(
  input=cal.mr,
  map=function(k,v){
    keyval(v$user.r,v)
  }
  ,reduce=function(k,v){
    val<-ddply(v,.(user.r,v.l),summarize,v=sum(v))
    val2<-val[order(val$v,decreasing=TRUE),]
    names(val2)<-c("user","item","pref")
    keyval(val2$user,val2)
  }
)
from.dfs(result.mr)

6). ItemCF算法:Java Hadoop MapReduce实现

主要内容:用Hadoop构建电影推荐系统

源代码:https://github.com/bsspirit/maven_hadoop_template/releases/tag/recommend

7). ItemCF算法:Mahout 实现

主要内容:Mahout分步式程序开发 基于物品的协同过滤ItemCF

源代码: https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.8

8). 推荐结果,数据可视化

数据集:small.csv


1,101,5.0
1,102,3.0
1,103,2.5
2,101,2.0
2,102,2.5
2,103,5.0
2,104,2.0
3,101,2.0
3,104,4.0
3,105,4.5
3,107,5.0
4,101,5.0
4,103,3.0
4,104,4.5
4,106,4.0
5,101,4.0
5,102,3.0
5,103,2.0
5,104,4.0
5,105,3.5
5,106,4.0

结果集: result.csv


1,104,33.5
1,106,18
1,105,15.5
1,107,5
2,106,20.5
2,105,15.5
2,107,4
3,103,24.5
3,102,18.5
3,106,16.5
4,102,37
4,105,26
4,107,9.5
5,107,11.5

R语言Socail Graph可视化


library(igraph)

train<-read.csv(file="small.csv",header=FALSE)

drawGraph<-function(data){
  names(data)<-c("from","to","f") 
  g <- graph.data.frame(data, directed=TRUE)
  V(g)$label <- V(g)$name
  V(g)$size <- 25
  V(g)$color <- c(rep("green",5),rep("red",7))
  V(g)$shape <- c(rep("rectangle",5),rep("circle",7))
  E(g)$color <- grey(0.5)
  E(g)$weight<-data$f
  E(g)$width<-scale(E(g)$weight,scale=TRUE)+2
  g2 <- simplify(g)
  plot(g2,edge.label=E(g)$weight,edge.width=E(g)$width,layout=layout.circle)
}

#small
drawGraph(train)

src_graph


#recommandation
recommendation<-read.csv(file="result.csv",header=FALSE)
drawGraph(recommendation)

recommand_graph

2. 活动照片

chinahadoop-1

chinahadoop-2

chinaHadoop2013

转载请注明出处:
http://blog.fens.me/hadoop-china-rhadoop-2013/

打赏作者

Hive学习路线图

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-hive-roadmap/

hadoop-hive-roadmap-title

前言

Hive是Hadoop家族中一款数据仓库产品,Hive最大的特点就是提供了类SQL的语法,封装了底层的MapReduce过程,让有SQL基础的业务人员,也可以直接利用Hadoop进行大数据的操作。就是这一个点,解决了原数据分析人员对于大数据分析的瓶颈。

让我们把Hive的环境构建起来,帮助非开发人员也能更好地了解大数据。

目录

  1. Hive介绍
  2. Hive学习路线图
  3. 我的使用经历
  4. Hive的使用案例

1. Hive介绍

Hive起源于Facebook,它使得针对Hadoop进行SQL查询成为可能,从而非程序员也可以方便地使用。Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的SQL查询功能,可以将SQL语句转换为MapReduce任务运行。

Hive是建立在 Hadoop 上的数据仓库基础构架。它提供了一系列的工具,可以用来进行数据提取转化加载(ETL),这是一种可以存储、查询和分析存储在 Hadoop 中的大规模数据的机制。Hive 定义了简单的类 SQL 查询语言,称为 HQL,它允许熟悉 SQL 的用户查询数据。同时,这个语言也允许熟悉 MapReduce 开发者的开发自定义的 mapper 和 reducer 来处理内建的 mapper 和 reducer 无法完成的复杂的分析工作。

详细地Hive的安装和使用介绍,请参考文章:Hive安装及使用攻略

2. Hive学习路线图

hadoop-hive-roadmap

Hive的知识点,我已经列在图中,希望帮助其他人更好的了解Hive。

接下来,是我的使用经历,谁都没有捷径。把心踏实下来,就不那么难了。

3. 我的使用经历

我使用Hive有两个考虑:

  • 1. 帮助无开发经验的数据分析人员,有能力处理大数据
  • 2. 构建标准化的MapReduce开发过程

1). 帮助无开发经验的数据分析人员,有能力处理大数据

完全符合与Hive的设计理念,一直在强调,无需多言。

2). 构建标准化的MapReduce开发过程

这个方面是我们需要努力的方向。

首先,Hive已经用类SQL的语法封装了MapReduce过程,这个封装过程就是MapReduce的标准化的过程。

我们在做业务或者工具时,会针对场景用逻辑封装,这是第二层封装是在Hive之上的封装。在第二层封装时,我们要尽可能多的屏蔽Hive的细节,让接口单一化,低少灵活性,再次精简HQL的语法结构。只满足我们的系统要求,专用的接口。

在使用二次封装的接口时,我们已经可以不用知道Hive是什么, 更不用知道Hadoop是什么。我们只需要知道,SQL查询(SQL92标准),怎么写效率高,怎么写可以完成业务需要就可以了。

当我们完成了Hive的二次封装后,我们可以构建标准化的MapReduce开发过程。

hive-architect-2

通过上图的思路,我们可以统一企业内部各种应用对于Hive的依赖,并且当人员素质升高后,有可以剥离Hive,用更优秀的底层解决方案来替换,如果封装的接口的不变,甚至替换Hive时业务使用都不知道,我们已经替换了Hive。

这个过程是需要经历的,也是有意义的。当我在考虑构建Hadoop分析工具时,以Hive作为Hadoop访问接口是最有效的。

3). 有关Hive的运维:
因为Hive是基于Hadoop构建的,简单地说就是一套Hadoop的访问接口,Hive本身并没有太多的东西,所以运维上面我们注意下面几个问题就行了。

  • 1. 使用单独的数据库存储元数据
  • 2. 定义合理的表分区和键
  • 3. 设置合理的bucket数据量
  • 4. 进行表压缩
  • 5. 定义外部表使用规范
  • 6. 合理的控制Mapper, Reducer数量

4. Hive的使用案例

已经整理成文章的案例

相关文章:
Hadoop家族产品学习路线图

转载请注明出处:
http://blog.fens.me/hadoop-hive-roadmap/

打赏作者

用Mahout构建职位推荐引擎

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-recommend-job/

mahout-recommender-job

前言

随着大数据思想实施的落地,推荐系统也开始倍受关注。不光是电商,各种互联网应用都开始应用推荐系统,像搜索,社交网络,音乐,餐饮,地图服务等等。

在以前,我们没有使用推荐算法的时候,我们是通过设置各种约束条件,匹配数据的自然属性呈现给用户,这种就是基于规则的系统。比如,用户购买了一个商品,我们会推荐同类别的其他商品,通过类别属性作为推荐的规则。后来问题就出现了,当用户一次性买了多种类别的不同商品的时候,前一条规则就失败了,我们要进一步设计规则,IT类别优先推荐,价格高的产品优先推荐…..几个回合下来,我们要不停的增加规则,以至于规则有可能的会前后冲突,增加一条新的规则会让推荐结果越来越不好,而且还无法解释是为什么。

推荐算法从另一角度入手,解决了基于规则设置的问题。下面将用Mahout来构建一个职位推荐算法引擎。

目录

  1. Mahout推荐框架概述
  2. 需求分析:职位推荐引擎指标设计
  3. 算法模型:推荐算法
  4. 架构设计:职位推荐引擎系统架构
  5. 程序开发:基于Mahout的推荐算法实现

1. Mahout推荐系统框架概述

Mahout框架包含了一套完整的推荐系统引擎,标准化的数据结构,多样的算法实现,简单的开发流程。Mahout推荐的推荐系统引擎是模块化的,分为5个主要部分组成:数据模型,相似度算法,近邻算法,推荐算法,算法评分器。

更详细的介绍,请参考文章:从源代码剖析Mahout推荐引擎

2. 需求分析:职位推荐引擎指标设计

下面我们将从一个公司案例出发来全面的解释,如何进行职位推荐引擎指标设计。

案例介绍:
互联网某职业社交网站,主要产品包括 个人简历展示页,人脉圈,微博及分享链接,职位发布,职位申请,教育培训等。

用户在完成注册后,需要完善自己的个人信息,包括教育背景,工作经历,项目经历,技能专长等等信息。然后,你要告诉网站,你是否想找工作!!当你选择“是”(求职中),网站会从数据库中为你推荐你可能感兴趣的职位。

通过简短的描述,我们可以粗略地看出,这家职业社交网站的定位和主营业务。核心点有2个:

  • 用户:尽可能多的保存有效完整的用户资料
  • 服务:帮助用户找到工作,帮助猎头和企业找到员工

因此,职位推荐引擎 将成为这个网站的核心功能。

KPI指标设计

  • 通过推荐带来的职位浏览量: 职位网页的PV
  • 通过推荐带来的职位申请量: 职位网页的有效转化

3. 算法模型:推荐算法

2个测试数据集:

  • pv.csv: 职位被浏览的信息,包括用户ID,职位ID
  • job.csv: 职位基本信息,包括职位ID,发布时间,工资标准

1). pv.csv

  • 2列数据:用户ID,职位ID(userid,jobid)
  • 浏览记录:2500条
  • 用户数:1000个,用户ID:1-1000
  • 职位数:200个,职位ID:1-200

部分数据:

1,11
2,136
2,187
3,165
3,1
3,24
4,8
4,199
5,32
5,100
6,14
7,59
7,147
8,92
9,165
9,80
9,171
10,45
10,31
10,1
10,152

2). job.csv

  • 3列数据:职位ID,发布时间,工资标准(jobid,create_date,salary)
  • 职位数:200个,职位ID:1-200

部分数据:

1,2013-01-24,5600
2,2011-03-02,5400
3,2011-03-14,8100
4,2012-10-05,2200
5,2011-09-03,14100
6,2011-03-05,6500
7,2012-06-06,37000
8,2013-02-18,5500
9,2010-07-05,7500
10,2010-01-23,6700
11,2011-09-19,5200
12,2010-01-19,29700
13,2013-09-28,6000
14,2013-10-23,3300
15,2010-10-09,2700
16,2010-07-14,5100
17,2010-05-13,29000
18,2010-01-16,21800
19,2013-05-23,5700
20,2011-04-24,5900

为了完成KPI的指标,我们把问题用“技术”语言转化一下:我们需要让职位的推荐结果更准确,从而增加用户的点击。

  • 1. 组合使用推荐算法,选出“评估推荐器”验证得分较高的算法
  • 2. 人工验证推荐结果
  • 3. 职位有时效性,推荐的结果应该是发布半年内的职位
  • 4. 工资的标准,应不低于用户浏览职位工资的平均值的80%

我们选择UserCF,ItemCF,SlopeOne的 3种推荐算法,进行7种组合的测试。

  • userCF1: LogLikelihoodSimilarity + NearestNUserNeighborhood + GenericBooleanPrefUserBasedRecommender
  • userCF2: CityBlockSimilarity+ NearestNUserNeighborhood + GenericBooleanPrefUserBasedRecommender
  • userCF3: UserTanimoto + NearestNUserNeighborhood + GenericBooleanPrefUserBasedRecommender
  • itemCF1: LogLikelihoodSimilarity + GenericBooleanPrefItemBasedRecommender
  • itemCF2: CityBlockSimilarity+ GenericBooleanPrefItemBasedRecommender
  • itemCF3: ItemTanimoto + GenericBooleanPrefItemBasedRecommender
  • slopeOne:SlopeOneRecommender

关于的推荐算法的详细介绍,请参考文章:Mahout推荐算法API详解

关于算法的组合的详细介绍,请参考文章:从源代码剖析Mahout推荐引擎

4. 架构设计:职位推荐引擎系统架构

mahout-recommend-job-architect

上图中,左边是Application业务系统,右边是Mahout,下边是Hadoop集群。

  • 1. 当数据量不太大时,并且算法复杂,直接选择用Mahout读取CSV或者Database数据,在单机内存中进行计算。Mahout是多线程的应用,会并行使用单机所有系统资源。
  • 2. 当数据量很大时,选择并行化算法(ItemCF),先业务系统的数据导入到Hadoop的HDFS中,然后用Mahout访问HDFS实现算法,这时算法的性能与整个Hadoop集群有关。
  • 3. 计算后的结果,保存到数据库中,方便查询

5. 程序开发:基于Mahout的推荐算法实现

开发环境mahout版本为0.8。 ,请参考文章:用Maven构建Mahout项目

新建Java类:

  • RecommenderEvaluator.java, 选出“评估推荐器”验证得分较高的算法
  • RecommenderResult.java, 对指定数量的结果人工比较
  • RecommenderFilterOutdateResult.java,排除过期职位
  • RecommenderFilterSalaryResult.java,排除工资过低的职位

1). RecommenderEvaluator.java, 选出“评估推荐器”验证得分较高的算
源代码:


public class RecommenderEvaluator {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/job/pv.csv";
        DataModel dataModel = RecommendFactory.buildDataModelNoPref(file);
        userLoglikelihood(dataModel);
        userCityBlock(dataModel);
        userTanimoto(dataModel);
        itemLoglikelihood(dataModel);
        itemCityBlock(dataModel);
        itemTanimoto(dataModel);
        slopeOne(dataModel);
    }

    public static RecommenderBuilder userLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userLoglikelihood");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder userCityBlock(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userCityBlock");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.CITYBLOCK, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder userTanimoto(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userTanimoto");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.TANIMOTO, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemLoglikelihood");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemCityBlock(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemCityBlock");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.CITYBLOCK, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemTanimoto(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemTanimoto");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.TANIMOTO, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder slopeOne(DataModel dataModel) throws TasteException, IOException {
        System.out.println("slopeOne");
        RecommenderBuilder recommenderBuilder = RecommendFactory.slopeOneRecommender();

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder knnLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("knnLoglikelihood");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemKNNRecommender(itemSimilarity, new NonNegativeQuadraticOptimizer(), 10);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        return recommenderBuilder;
    }

    public static RecommenderBuilder knnTanimoto(DataModel dataModel) throws TasteException, IOException {
        System.out.println("knnTanimoto");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.TANIMOTO, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemKNNRecommender(itemSimilarity, new NonNegativeQuadraticOptimizer(), 10);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        return recommenderBuilder;
    }

    public static RecommenderBuilder knnCityBlock(DataModel dataModel) throws TasteException, IOException {
        System.out.println("knnCityBlock");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.CITYBLOCK, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemKNNRecommender(itemSimilarity, new NonNegativeQuadraticOptimizer(), 10);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        return recommenderBuilder;
    }

    public static RecommenderBuilder svd(DataModel dataModel) throws TasteException {
        System.out.println("svd");
        RecommenderBuilder recommenderBuilder = RecommendFactory.svdRecommender(new ALSWRFactorizer(dataModel, 5, 0.05, 10));

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        return recommenderBuilder;
    }

    public static RecommenderBuilder treeClusterLoglikelihood(DataModel dataModel) throws TasteException {
        System.out.println("treeClusterLoglikelihood");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        ClusterSimilarity clusterSimilarity = RecommendFactory.clusterSimilarity(RecommendFactory.SIMILARITY.FARTHEST_NEIGHBOR_CLUSTER, userSimilarity);
        RecommenderBuilder recommenderBuilder = RecommendFactory.treeClusterRecommender(clusterSimilarity, 3);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);

        return recommenderBuilder;
    }
}

运行结果,控制台输出:


userLoglikelihood
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.2741487771272658
Recommender IR Evaluator: [Precision:0.6424242424242422,Recall:0.4098360655737705]
userCityBlock
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.575306732961736
Recommender IR Evaluator: [Precision:0.919580419580419,Recall:0.4371584699453552]
userTanimoto
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.5546485136181523
Recommender IR Evaluator: [Precision:0.6625766871165644,Recall:0.41803278688524603]
itemLoglikelihood
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.5398332608612343
Recommender IR Evaluator: [Precision:0.26229508196721296,Recall:0.26229508196721296]
itemCityBlock
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.9251437840891661
Recommender IR Evaluator: [Precision:0.02185792349726776,Recall:0.02185792349726776]
itemTanimoto
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.9176432856689655
Recommender IR Evaluator: [Precision:0.26229508196721296,Recall:0.26229508196721296]
slopeOne
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.0
Recommender IR Evaluator: [Precision:0.01912568306010929,Recall:0.01912568306010929]

可视化“评估推荐器”输出:

difference

evaluator

UserCityBlock算法评估的结果是最好的,基于UserCF的算法比ItemCF都要好,SlopeOne算法几乎没有得分。

2). RecommenderResult.java, 对指定数量的结果人工比较
为得到差异化结果,我们分别取UserCityBlock,itemLoglikelihood,对推荐结果人工比较。

源代码:


public class RecommenderResult {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/job/pv.csv";
        DataModel dataModel = RecommendFactory.buildDataModelNoPref(file);
        RecommenderBuilder rb1 = RecommenderEvaluator.userCityBlock(dataModel);
        RecommenderBuilder rb2 = RecommenderEvaluator.itemLoglikelihood(dataModel);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            System.out.print("userCityBlock    =>");
            result(uid, rb1, dataModel);
            System.out.print("itemLoglikelihood=>");
            result(uid, rb2, dataModel);
        }
    }

    public static void result(long uid, RecommenderBuilder recommenderBuilder, DataModel dataModel) throws TasteException {
        List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
        RecommendFactory.showItems(uid, list, false);
    }

}

控制台输出:只截取部分结果


...
userCityBlock    =>uid:968,(61,0.333333)
itemLoglikelihood=>uid:968,(121,1.429362)(153,1.239939)(198,1.207726)
userCityBlock    =>uid:969,
itemLoglikelihood=>uid:969,(75,1.326499)(30,0.873100)(85,0.763344)
userCityBlock    =>uid:970,
itemLoglikelihood=>uid:970,(13,0.748417)(156,0.748417)(122,0.748417)
userCityBlock    =>uid:971,
itemLoglikelihood=>uid:971,(38,2.060951)(104,1.951208)(83,1.941735)
userCityBlock    =>uid:972,
itemLoglikelihood=>uid:972,(131,1.378395)(4,1.349386)(87,0.881816)
userCityBlock    =>uid:973,
itemLoglikelihood=>uid:973,(196,1.432040)(140,1.398066)(130,1.380335)
userCityBlock    =>uid:974,(19,0.200000)
itemLoglikelihood=>uid:974,(145,1.994049)(121,1.794289)(98,1.738027)
...

我们查看uid=974的用户推荐信息:

搜索pv.csv:


> pv[which(pv$userid==974),]
     userid jobid
2426    974   106
2427    974   173
2428    974    82
2429    974   188
2430    974    78

搜索job.csv:


> job[job$jobid %in% c(145,121,98,19),]
    jobid create_date salary
19     19  2013-05-23   5700
98     98  2010-01-15   2900
121   121  2010-06-19   5300
145   145  2013-08-02   6800

上面两种算法,推荐的结果都是2010年的职位,这些结果并不是太好,接下来我们要排除过期职位,只保留2013年的职位。

3).RecommenderFilterOutdateResult.java,排除过期职位
源代码:



public class RecommenderFilterOutdateResult {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/job/pv.csv";
        DataModel dataModel = RecommendFactory.buildDataModelNoPref(file);
        RecommenderBuilder rb1 = RecommenderEvaluator.userCityBlock(dataModel);
        RecommenderBuilder rb2 = RecommenderEvaluator.itemLoglikelihood(dataModel);

        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            System.out.print("userCityBlock    =>");
            filterOutdate(uid, rb1, dataModel);
            System.out.print("itemLoglikelihood=>");
            filterOutdate(uid, rb2, dataModel);
        }
    }

    public static void filterOutdate(long uid, RecommenderBuilder recommenderBuilder, DataModel dataModel) throws TasteException, IOException {
        Set jobids = getOutdateJobID("datafile/job/job.csv");
        IDRescorer rescorer = new JobRescorer(jobids);
        List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM, rescorer);
        RecommendFactory.showItems(uid, list, true);
    }

    public static Set getOutdateJobID(String file) throws IOException {
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));
        Set jobids = new HashSet();
        String s = null;
        while ((s = br.readLine()) != null) {
            String[] cols = s.split(",");
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd");
            Date date = null;
            try {
                date = df.parse(cols[1]);
                if (date.getTime() < df.parse("2013-01-01").getTime()) {
                    jobids.add(Long.parseLong(cols[0]));
                }
            } catch (ParseException e) {
                e.printStackTrace();
            }

        }
        br.close();
        return jobids;
    }

}

class JobRescorer implements IDRescorer {
    final private Set jobids;

    public JobRescorer(Set jobs) {
        this.jobids = jobs;
    }

    @Override
    public double rescore(long id, double originalScore) {
        return isFiltered(id) ? Double.NaN : originalScore;
    }

    @Override
    public boolean isFiltered(long id) {
        return jobids.contains(id);
    }
}

控制台输出:只截取部分结果


...
itemLoglikelihood=>uid:965,(200,0.829600)(122,0.748417)(170,0.736340)
userCityBlock    =>uid:966,(114,0.250000)
itemLoglikelihood=>uid:966,(114,1.516898)(101,0.864536)(99,0.856057)
userCityBlock    =>uid:967,
itemLoglikelihood=>uid:967,(105,0.873100)(114,0.725016)(168,0.707119)
userCityBlock    =>uid:968,
itemLoglikelihood=>uid:968,(174,0.735004)(39,0.696716)(185,0.696171)
userCityBlock    =>uid:969,
itemLoglikelihood=>uid:969,(197,0.723203)(81,0.710230)(167,0.668358)
userCityBlock    =>uid:970,
itemLoglikelihood=>uid:970,(13,0.748417)(122,0.748417)(28,0.736340)
userCityBlock    =>uid:971,
itemLoglikelihood=>uid:971,(28,1.540753)(174,1.511881)(39,1.435575)
userCityBlock    =>uid:972,
itemLoglikelihood=>uid:972,(14,0.800605)(60,0.794088)(163,0.710230)
userCityBlock    =>uid:973,
itemLoglikelihood=>uid:973,(56,0.795529)(13,0.712680)(120,0.701026)
userCityBlock    =>uid:974,(19,0.200000)
itemLoglikelihood=>uid:974,(145,1.994049)(89,1.578694)(19,1.435193)
...

我们查看uid=994的用户推荐信息:
搜索pv.csv:


> pv[which(pv$userid==974),]
     userid jobid
2426    974   106
2427    974   173
2428    974    82
2429    974   188
2430    974    78

搜索job.csv:


> job[job$jobid %in% c(19,145,89),]
    jobid create_date salary
19     19  2013-05-23   5700
89     89  2013-06-15   8400
145   145  2013-08-02   6800

排除过期的职位比较,我们发现userCityBlock结果都是19,itemLoglikelihood的第2,3的结果被替换为了得分更低的89和19。

4).RecommenderFilterSalaryResult.java,排除工资过低的职位

我们查看uid=994的用户,浏览过的职位。


> job[job$jobid %in% c(106,173,82,188,78),]
    jobid create_date salary
78     78  2012-01-29   6800
82     82  2010-07-05   7500
106   106  2011-04-25   5200
173   173  2013-09-13   5200
188   188  2010-07-14   6000

平均工资为=6140,我们觉得用户的浏览职位的行为,一般不会看比自己现在工资低的职位,因此设计算法,排除工资低于平均工资80%的职位,即排除工资小于4912的推荐职位(6140*0.8=4912)

大家可以参考上文中RecommenderFilterOutdateResult.java,自行实现。

这样,我们就完成用Mahout构建职位推荐引擎的算法。如果没有Mahout,我们自己写这个算法引擎估计还要花个小半年的时间,善加利用开源技术会帮助我们飞一样的成长!!

原代码下载:
https://github.com/bsspirit/maven_mahout_template/tree/mahout-0.8/src/main/java/org/conan/mymahout/recommendation/job

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-mahout-recommend-job/

打赏作者