• Posts tagged "Hadoop"

Blog Archives

基于Zookeeper的分步式队列系统集成案例

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

zookeeper-case

前言

软件系统集成一直是工业界的一个难题,像10年以上的遗留系统集成,公司收购后的多系统集成,全球性的分步式系统集成等。虽然基于SOA的软件架构,从理论上都可以解决这些集成的问题,但是具体实施过程,有些集成项目过于复杂而失败。

随着技术的创新和发展,对于分步式集群应用的集成,有了更好的开源软件的支持,像zookeeper就是一个不错的分步式协作软件平台。本文将通过一个案例介绍Zookeeper的强大。

目录

  1. 项目背景:分布式消息中间件
  2. 需求分析:业务系统升级方案
  3. 架构设计:搭建Zookeeper的分步式协作平台
  4. 程序开发:基于Zookeeper的程序设计
  5. 程序运行

1. 项目背景:分布式消息中间件

随着Hadoop的普及,越来越多的公司开始构建自己的Hadoop系统。有时候,公司内部的不同部门或不同的团队,都有自己的Hadoop集群。这种多集群的方式,既能让每个团队拥有个性化的Hadoop,又能避免大集群的高度其中化运维难度。当数据量不是特别巨大的时候,小型集群会有很多适用的场合。

当然,多个小型集群也有缺点,就是资源配置可能造成浪费。每个团队的Hadoop集群,都要配有服务器和运维人员。有些能力强的团队,构建的hadoop集群,可以达到真正的个性化要求;而有一些能力比较差的团队,搭建的Hadoop集群性能会比较糟糕。

还有一些时候,多个团队需要共同完成一个任务,比如,A团队通过Hadoop集群计算的结果,交给B团队继续工作,B完成了自己任务再交给C团队继续做。这就有点像业务系统的工作流一样,一环一环地传下去,直到最后一部分完成。

在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB服务器上部署自己的服务,然后通过消息中间件完成调度任务。对于分步式的多个Hadoop集群系统的协作,同样可以用这种架构来做,只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了。

Zookeeper就可以做为 分步式消息中间件,来完成上面的说的业务需求。ZooKeeper是Hadoop家族的一款高性能的分布式协作的产品,是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。Zookeeper的安装和使用,请参考文章 ZooKeeper伪分布式集群安装及使用

ZooKeeper提供分布式协作服务,并不需要依赖于Hadoop的环境。

2. 需求分析:业务系统升级方案

下面我将从一个案例出发,来解释如何进行分步式协作平台的系统设计。

2.1 案例介绍

某大型软件公司,从事领域为供应链管理,主要业务包括了 采购管理、应付账款管理、应收账款管理、供应商反复管理、退货管理、销售管理、库存管理、电子商务、系统集成等。

biz

每块业务的逻辑都很复杂,由单独部门进行软件开发和维护,部门之间的系统没有直接通信需求,每个部门完成自己的功能就行了,最后通过数据库来共享数据,实现各功能之间的数据交换。

zk1

随着业务的发展,客户对响应速度要求越来越高,通过数据库来共享数据的方式,已经达不到信息交换的要求,系统进行了第一次升级,通过企业服务总线(ESB)统一管理公司内部所有业务。通过WebServices发布服务,通过Message Queue实现业务功能的调度。

zk2

公司业务规模继续扩大,跨国收购了多家公司。业务系统从原来的一个机房的集中式部署,变成了全球性的多机房的分步式部署。这时,Message Queue已经不能满足多机房跨地域的业务系统的功能需求了,需要一种分步式的消息中间件解决方案,来代替原有消息中间件的服务。

系统进行了第二次升级,采用Zookeeper作为分步式中间件调度引擎。

zk3

通过上面的描述,我们可以看出,当一个公司从小到大,从国内业务发展到全球性业务的时候。
为了配合业务发展,IT系统也是越来越复杂的,从最早的主从数据库设计,到ESB企业系统总线的扩展,再到分步式ESB配合分步式消息系统,每一次的升级都需要软件技术的支撑。

2.2 功能需求

全球性采购业务和全球性销售业务,让公司在市场中处于竞争优势。但由于采购和销售分别是由不同部门进行的软件开发和维护,而且业务往来也在不同的国家和地区。所以在每月底结算时,工作量都特别大。

比如,计算利润表 (请不要纠结于公式的准确性)

当月利润 = 当月销售金额 - 当月采购金额 - 当月其他支出

这样一个非常简单的计算公式,但对于跨国公司和部门来说,一点也不简单的。

从系统角度来看,采购部门要统计采购数据(海量数据),销售部门统计销售数据((海量数据),其他部门统计的其他费用支出(汇总的少量数据),最后系统计算得到当月的利润。

这里要说明的是,采购系统是单独的系统,销售是另外单独的系统,及以其他几十个大大小小的系统,如何能让多个系统,配合起来做这道计算题呢??

3. 架构设计:搭建Zookeeper的分步式协作平台

接下来,我们基于zookeeper来构建一个分步式队列的应用,来解决上面的功能需求。下面内容,排除了ESB的部分,只保留zookeeper进行实现。

  • 采购数据,为海量数据,基于Hadoop存储和分析。
  • 销售数据,为海量数据,基于Hadoop存储和分析。
  • 其他费用支出,为少量数据,基于文件或数据库存储和分析。

我们设计一个同步队列,这个队列有3个条件节点,分别对应采购(purchase),销售(sell),其他费用(other)3个部分。当3个节点都被创建后,程序会自动触发计算利润,并创建利润(profit)节点。上面3个节点的创建,无顺序要求。每个节点只能被创建一次。

zk4

系统环境

  • 2个独立的Hadoop集群
  • 2个独立的Java应用
  • 3个Zookeeper集群节点

图标解释:

  • Hadoop App1,Hadoop App2 是2个独立的Hadoop集群应用
  • Java App3,Java App4 是2个独立的Java应用
  • zk1,zk2,zk3是ZooKeeper集群的3个连接点
  • /queue,是znode的队列目录,假设队列长度为3
  • /queue/purchase,是znode队列中,1号排对者,由Hadoop App1提交,用于统计采购金额。
  • /queue/sell,是znode队列中,2号排对者,由Hadoop App2提交,用于统计销售金额。
  • /queue/other,是znode队列中,3号排对者,由Java App3提交,用于统计其他费用支出金额。
  • /queue/profit,当znode队列中满了,触发创建利润节点。
  • 当/qeueu/profit被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束。

补充说明:

  • 创建/queue/purchase,/queue/sell,/queue/other目录时,没有前后顺序,程序提交后,/queue目录下会生成对应该子目录
  • App1可以通过zk2提交,App2也可通过zk3提交。原则上,找最近路由最近的znode节点提交。
  • 每个应用不能重复提出,直到3个任务都提交,计算利润的任务才会被执行。
  • /queue/profit被创建后,zk的应用会监听到这个事件,通知应用,队列已完成!

这里的同步队列的架构更详细的设计思路,请参考文章 ZooKeeper实现分布式队列Queue

4. 程序开发:基于Zookeeper的程序设计

最终的功能需求:计算2013年01月的利润。

4.1 实验环境

在真正企业开发时,我们的实验环境应该与需求是一致的,但我的硬件条件有限,因些做了一个简化的环境设置。

  • 把zookeeper的完全分步式部署的3台服务器集群节点的,改为一台服务器上3个集群节点。
  • 把2个独立Hadoop集群,改为一个集群的2个独立的MapReduce任务。

开发环境:

  • Win7 64bit
  • JDK 1.6
  • Maven3
  • Juno Service Release 2
  • IP:192.168.1.10

Zookeeper服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Zookeeper: 3.4.5
  • IP: 192.168.1.201
  • 3个集群节点

Hadoop服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Hadoop: 1.0.3
  • IP: 192.168.1.210

4.2 实验数据

3组实验数据:

  • 采购数据,purchase.csv
  • 销售数据,sell.csv
  • 其他费用数据,other.csv

4.2.1 采购数据集

一共4列,分别对应 产品ID,产品数量,产品单价,采购日期。


1,26,1168,2013-01-08
2,49,779,2013-02-12
3,80,850,2013-02-05
4,69,1585,2013-01-26
5,88,1052,2013-01-13
6,84,2363,2013-01-19
7,64,1410,2013-01-12
8,53,910,2013-01-11
9,21,1661,2013-01-19
10,53,2426,2013-02-18
11,64,2022,2013-01-07
12,36,2941,2013-01-28
13,99,3819,2013-01-19
14,64,2563,2013-02-16
15,91,752,2013-02-05
16,65,750,2013-02-04
17,19,2426,2013-02-23
18,19,724,2013-02-05
19,87,137,2013-01-25
20,86,2939,2013-01-14
21,92,159,2013-01-23
22,81,2331,2013-03-01
23,88,998,2013-01-20
24,38,102,2013-02-22
25,32,4813,2013-01-13
26,36,1671,2013-01-19

//省略部分数据

4.2.2 销售数据集

一共4列,分别对应 产品ID,销售数量,销售单价,销售日期。


1,14,1236,2013-01-14
2,19,808,2013-03-06
3,26,886,2013-02-23
4,23,1793,2013-02-09
5,27,1206,2013-01-21
6,27,2648,2013-01-30
7,22,1502,2013-01-19
8,20,1050,2013-01-18
9,13,1778,2013-01-30
10,20,2718,2013-03-14
11,22,2175,2013-01-12
12,16,3284,2013-02-12
13,30,4152,2013-01-30
14,22,2770,2013-03-11
15,28,778,2013-02-23
16,22,874,2013-02-22
17,12,2718,2013-03-22
18,12,747,2013-02-23
19,27,172,2013-02-07
20,27,3282,2013-01-22
21,28,224,2013-02-05
22,26,2613,2013-03-30
23,27,1147,2013-01-31
24,16,141,2013-03-20
25,15,5343,2013-01-21
26,16,1887,2013-01-30
27,12,2535,2013-01-12
28,16,469,2013-01-07
29,29,2395,2013-03-30
30,17,1549,2013-01-30
31,25,4173,2013-03-17

//省略部分数据

4.2.3 其他费用数据集

一共2列,分别对应 发生日期,发生金额


2013-01-02,552
2013-01-03,1092
2013-01-04,1794
2013-01-05,435
2013-01-06,960
2013-01-07,1066
2013-01-08,1354
2013-01-09,880
2013-01-10,1992
2013-01-11,931
2013-01-12,1209
2013-01-13,1491
2013-01-14,804
2013-01-15,480
2013-01-16,1891
2013-01-17,156
2013-01-18,1439
2013-01-19,1018
2013-01-20,1506
2013-01-21,1216
2013-01-22,2045
2013-01-23,400
2013-01-24,1795
2013-01-25,1977
2013-01-26,1002
2013-01-27,226
2013-01-28,1239
2013-01-29,702
2013-01-30,1396

//省略部分数据

4.3 程序设计

我们要编写5个文件:

  • 计算采购金额,Purchase.java
  • 计算销售金额,Sell.java
  • 计算其他费用金额,Other.java
  • 计算利润,Profit.java
  • Zookeeper的调度,ZookeeperJob.java

4.3.1 计算采购金额

采购金额,是基于Hadoop的MapReduce统计计算。


public class Purchase {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class PurchaseMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class PurchaseReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("purchase");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化purchase
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Purchase.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(PurchaseMapper.class);
        job.setReducerClass(PurchaseReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("purchase", "logfile/biz/purchase.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/purchase");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.2 计算销售金额

销售金额,是基于Hadoop的MapReduce统计计算。


public class Sell {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class SellMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class SellReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("sell");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化sell
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Sell.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(SellMapper.class);
        job.setReducerClass(SellReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("sell", "logfile/biz/sell.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.3 计算其他费用金额

其他费用金额,是基于本地文件的统计计算。


public class Other {

    public static String file = "logfile/biz/other.csv";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    private static String month = "2013-01";

    public static void main(String[] args) throws IOException {
        calcOther(file);
    }

    public static int calcOther(String file) throws IOException {
        int money = 0;
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));

        String s = null;
        while ((s = br.readLine()) != null) {
            // System.out.println(s);
            String[] tokens = DELIMITER.split(s);
            if (tokens[0].startsWith(month)) {// 1月的数据
                money += Integer.parseInt(tokens[1]);
            }
        }
        br.close();

        System.out.println("Output:" + month + "," + money);
        return money;
    }
}

4.3.4 计算利润

利润,通过zookeeper分步式自动调度计算利润。


public class Profit {

    public static void main(String[] args) throws Exception {
        profit();
    }

    public static void profit() throws Exception {
        int sell = getSell();
        int purchase = getPurchase();
        int other = getOther();
        int profit = sell - purchase - other;
        System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
    }

    public static int getPurchase() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
        return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
    }

    public static int getSell() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
        return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
    }

    public static int getOther() throws IOException {
        return Other.calcOther(Other.file);
    }

}

4.3.5 Zookeeper调度

调度,通过构建分步式队列系统,自动化程序代替人工操作。


public class ZooKeeperJob {

    final public static String QUEUE = "/queue";
    final public static String PROFIT = "/queue/profit";
    final public static String PURCHASE = "/queue/purchase";
    final public static String SELL = "/queue/sell";
    final public static String OTHER = "/queue/other";

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.out.println("Please start a task:");
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            doPurchase(zk);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            doSell(zk);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            doOther(zk);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
                    System.out.println("Queue has Completed!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => " + PROFIT);
        zk.exists(PROFIT, true);

        if (zk.exists(QUEUE, false) == null) {
            System.out.println("create " + QUEUE);
            zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(QUEUE + " is exist!");
        }
    }

    public static void doPurchase(ZooKeeper zk) throws Exception {
        if (zk.exists(PURCHASE, false) == null) {

            Purchase.run(Purchase.path());

            System.out.println("create " + PURCHASE);
            zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(PURCHASE + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doSell(ZooKeeper zk) throws Exception {
        if (zk.exists(SELL, false) == null) {

            Sell.run(Sell.path());

            System.out.println("create " + SELL);
            zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(SELL + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doOther(ZooKeeper zk) throws Exception {
        if (zk.exists(OTHER, false) == null) {

            Other.calcOther(Other.file);

            System.out.println("create " + OTHER);
            zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(OTHER + " is exist!");
        }
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws Exception {
        int size = 3;
        List children = zk.getChildren(QUEUE, true);
        int length = children.size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create " + PROFIT);
            Profit.profit();
            zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

            for (String child : children) {// 清空节点
                zk.delete(QUEUE + "/" + child, -1);
            }
        }
    }
}

5. 运行程序

最后,我们运行整个的程序,包括3个部分。

  • zookeeper服务器
  • hadoop服务器
  • 分步式队列应用

5.1 启动zookeeper服务

启动zookeeper服务器集群:


~ cd toolkit/zookeeper345

# 启动zk集群3个节点
~ bin/zkServer.sh start conf/zk1.cfg
~ bin/zkServer.sh start conf/zk2.cfg
~ bin/zkServer.sh start conf/zk3.cfg

~ jps
4234 QuorumPeerMain
5002 Jps
4275 QuorumPeerMain
4207 QuorumPeerMain

查看zookeeper集群中,各节点的状态


# 查看zk1节点状态
~ bin/zkServer.sh status conf/zk1.cfg
JMX enabled by default
Using config: conf/zk1.cfg
Mode: follower

# 查看zk2节点状态,zk2为leader
~ bin/zkServer.sh status conf/zk2.cfg
JMX enabled by default
Using config: conf/zk2.cfg
Mode: leader

# 查看zk3节点状态
~ bin/zkServer.sh status conf/zk3.cfg
JMX enabled by default
Using config: conf/zk3.cfg
Mode: follower

启动zookeeper客户端:


~ bin/zkCli.sh -server 192.168.1.201:2181

# 查看zk
[zk: 192.168.1.201:2181(CONNECTED) 0] ls /
[queue, queue-fifo, zookeeper]

# /queue路径无子目录
[zk: 192.168.1.201:2181(CONNECTED) 1] ls /queue
[]

5.2 启动Hadoop服务


~ hadoop/hadoop-1.0.3
~ bin/start-all.sh

~ jps
25979 JobTracker
26257 TaskTracker
25576 DataNode
25300 NameNode
12116 Jps
25875 SecondaryNameNode

5.3 启动分步式队列ZookeeperJob

5.3.1 启动统计采购数据程序,设置启动参数1

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
copy from: logfile/biz/purchase.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Output:2013-01,9609887
create /queue/purchase
Queue Complete:1/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 3] ls /queue
[purchase]

5.3.2 启动统计销售数据程序,设置启动参数2

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
copy from: logfile/biz/sell.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Output:2013-01,2950315
create /queue/sell
Queue Complete:2/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[purchase, sell]

5.3.3 启动统计其他费用数据程序,设置启动参数3

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Output:2013-01,34193
create /queue/other
Queue Complete:3/3
create /queue/profit
cat: hdfs://192.168.1.210:9000/user/hdfs/biz/sell/output/part-r-00000
2950315

cat: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase/output/part-r-00000
9609887

Output:2013-01,34193
profit = sell - purchase - other = 2950315 - 9609887 - 34193 = -6693765
Queue has Completed!!!

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[profit]

在最后一步,统计其他费用数据程序运行后,从日志中看到3个条件节点都已满足要求。然后,通过同步的分步式队列自动启动了计算利润的程序,并在日志中打印了2013年1月的利润为-6693765。

本文介绍的源代码,已上传到github: https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myzk/hadoop

通过这个复杂的实验,我们成功地用zookeeper实现了分步式队列,并应用到了业务中。当然,实验中也有一些不是特别的严谨的地方,请同学边做边思考。

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

打赏作者

在Ubuntu中安装HBase

R利剑NoSQL系列文章,主要介绍通过R语言连接使用nosql数据库。涉及的NoSQL产品,包括RedisMongoDBHBaseHiveCassandraNeo4j。希望通过我的介绍让广大的R语言爱好者,有更多的开发选择,做出更多地激动人心的应用。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/linux-hbase-install/

linux-hbase

前言

HBase是Hadoop家族中的一个分布式数据库产品,HBase支持高并发读写,列式数据存储,高效的索引,自动分片,自动Region迁移等许多优点,已经越来越多的被界业认可并实施。

目录

  1. 在Ubuntu中环境准备
  2. HBase安装
  3. Thrift安装

1 在Ubuntu中环境准备

HBase是基于Java开发的运行Hadoop平台上分布式NoSQL数据库软件,HBase没有提供Windows系统安装版本。我在这里也只介绍HBase在Linux Ubuntu系统中的安装。

由于HBase是运行在Hadoop平台上面的,因此我们需要先安装好Hadoop的环境,Hadoop的安装请参考文章:[Hadoop历史版本安装](http://blog.fens.me/hadoop-history-source-install/)

HBase没有提供apt的软件源安装,我们需要自己去官方网络下载HBase软件包进行安装。HBase下载页:http://www.apache.org/dyn/closer.cgi/hbase/

系统环境:

  • Linux Ubuntu 12.04.2 LTS 64bit server
  • Java JDK 1.6.0_45
  • Hadoop 1.1.2

2 HBase安装

2.1 下载HBase


# 通过wget命令下载
~ wget http://www.gaidso.com/apache/hbase/stable/hbase-0.94.18.tar.gz

# 解压HBase
~ tar xvf hbase-0.94.18.tar.gz

# 移动HBase目录到文件夹
~ mv hbase-0.94.18/ /home/conan/hadoop/

# 进入目录
~ cd /home/conan/hadoop/hbase-0.94.18

2.2 配置HBase

2.2.1 修改启动文件hbase-env.sh


~ vi conf/hbase-env.sh

#打开注释
export JAVA_HOME=/home/conan/toolkit/jdk16
export HBASE_CLASSPATH=/home/conan/hadoop/hadoop-1.1.2/conf
export HBASE_MANAGES_ZK=true

2.2.2 修改配置文件 hbase-site.xml


~ vi conf/hbase-site.xml

<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://master:9000/hbase</value>
</property>

<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>

<property>
<name>dfs.replication</name>
<value>1</value>
</property>

<property>
<name>hbase.zookeeper.quorum</name>
<value>master</value>
</property>

<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>

<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/conan/hadoop/hdata</value>
</property>
</configuration>

复制hadoop环境的配置文件和类库


~ cp ~/hadoop/hadoop-1.1.2/conf/hdfs-site.xml conf/
~ cp ~/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar lib/
~ mkdir /home/conan/hadoop/hdata

2.3 启动hadoop和hbase


~ /home/conan/hadoop/hadoop-1.1.2/bin/start-all.sh
~ /home/conan/hadoop/hbase-0.94.18/bin/start-hbase.sh

# 查看hbase进程
~ jps
13838 TaskTracker
13541 JobTracker
15946 HMaster
16756 Jps
12851 NameNode
13450 SecondaryNameNode
13133 DataNode
15817 HQuorumPeer
16283 HRegionServer

2.4 打开HBase命令行客户端访问Hbase


~ bin/hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 0.94.18, r1577788, Sat Mar 15 04:46:47 UTC 2014

hbase(main):002:0> help
HBase Shell, version 0.94.18, r1577788, Sat Mar 15 04:46:47 UTC 2014
Type 'help "COMMAND"', (e.g. 'help "get"' -- the quotes are necessary) for help on a specific command.
Commands are grouped. Type 'help "COMMAND_GROUP"', (e.g. 'help "general"') for help on a command group.

COMMAND GROUPS:
  Group name: general
  Commands: status, version, whoami

  Group name: ddl
  Commands: alter, alter_async, alter_status, create, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, is_disabled, is_enabled, list, show_filters

  Group name: dml
  Commands: count, delete, deleteall, get, get_counter, incr, put, scan, truncate

  Group name: tools
  Commands: assign, balance_switch, balancer, close_region, compact, flush, hlog_roll, major_compact, move, split, unassign, zk_dump

  Group name: replication
  Commands: add_peer, disable_peer, enable_peer, list_peers, list_replicated_tables, remove_peer, start_replication, stop_replication

  Group name: snapshot
  Commands: clone_snapshot, delete_snapshot, list_snapshots, restore_snapshot, snapshot

  Group name: security
  Commands: grant, revoke, user_permission

SHELL USAGE:
Quote all names in HBase Shell such as table and column names.  Commas delimit
command parameters.  Type  after entering a command to run it.
Dictionaries of configuration used in the creation and alteration of tables are
Ruby Hashes. They look like this:

  {'key1' => 'value1', 'key2' => 'value2', ...}

and are opened and closed with curley-braces.  Key/values are delimited by the
'=>' character combination.  Usually keys are predefined constants such as
NAME, VERSIONS, COMPRESSION, etc.  Constants do not need to be quoted.  Type
'Object.constants' to see a (messy) list of all constants in the environment.

If you are using binary keys or values and need to enter them in the shell, use
double-quote'd hexadecimal representation. For example:

  hbase> get 't1', "key\x03\x3f\xcd"
  hbase> get 't1', "key\003\023\011"
  hbase> put 't1', "test\xef\xff", 'f1:', "\x01\x33\x40"

The HBase shell is the (J)Ruby IRB with the above HBase-specific commands added.
For more on the HBase Shell, see http://hbase.apache.org/docs/current/book.html

2.5 HBase简单命令操作


#创建一个新表student
hbase(main):003:0> create 'student','info'
0 row(s) in 1.2680 seconds

#查看所有的表
hbase(main):004:0> list
TABLE
student
1 row(s) in 0.0330 seconds

#查看student的表结构
hbase(main):005:0> describe 'student'
DESCRIPTION                                                 ENABLED
 'student', {NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', true
  BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSIONS
  => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL =
 > '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE =
 > '65536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true',
  BLOCKCACHE => 'true'}
1 row(s) in 0.1100 seconds

#同student表中插入一条数据
hbase(main):007:0> put 'student','mary','info:age','19'
0 row(s) in 0.0490 seconds

#从student表中取出mary的数据
hbase(main):008:0> get 'student','mary'
COLUMN                   CELL
 info:age                timestamp=1396366643298, value=19
1 row(s) in 0.0190 seconds

#让student表失效
hbase(main):009:0> disable 'student'
0 row(s) in 1.2400 seconds

#列出所有表
hbase(main):010:0> list
TABLE
student
1 row(s) in 0.0310 seconds

#删除student表
hbase(main):013:0>  drop 'student'
0 row(s) in 1.1100 seconds

#列出所有表
hbase(main):014:0> list
TABLE
0 row(s) in 0.0400 seconds

3 Thrift安装

安装完成HBase后,我们还需要安装Thrift,因为其他语言调用HBase的时候,是通过Thrift连接的。

Thrift是需要本地编译的,官方没有提供二进制安装包,首先下载thrift-0.9.1,Thrift下载页:http://thrift.apache.org/download

3.1 下载thrift

下载Thrift有两种方式,直接下载源代码发行包,或者通过git下载源代码,请选择其中一种方式下载。

3.1.1 直接下载源代码发行包 thrift-0.9.1.tar.gz


~ wget http://apache.fayea.com/apache-mirror/thrift/0.9.1/thrift-0.9.1.tar.gz
~ tar xvf thrift-0.9.1.tar.gz
~ mv thrift-0.9.1/ /home/conan/hadoop/
~ cd /home/conan/hadoop/

注:后文中的各种错误,都是这个包引起的

3.1.2 通过git下载源代码


~ git clone https://git-wip-us.apache.org/repos/asf/thrift.git thrift-git
~ mv thrift-git/ /home/conan/hadoop/
~ cd /home/conan/hadoop/

为了避免各种出错,建议使用git下载源代码安装

3.2 通过thrift-0.9.1.tar.gz 发行包安装Thrift

Thrift是需要本地编译的,在Thrift解压目录输入./configure,会列Thrift在当前机器所支持的语言环境。

3.2.1 安装Thrift的依赖包


sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev

如果只是为了连接rhbase,默认配置就可以了。如果除了希望支持rhbase访问,还支持PHP,Python,C++等语言的访问,就需要在系统中,装一些额外的类库。大家可以根据自己的要求,安装对应的软件包并设置Thrift的编译参数。

生成配置脚本


~  ./configure

//省略部分日志输出

thrift 0.9.1

Building C++ Library ......... : yes
Building C (GLib) Library .... : no
Building Java Library ........ : yes
Building C# Library .......... : no
Building Python Library ...... : yes
Building Ruby Library ........ : no
Building Haskell Library ..... : no
Building Perl Library ........ : no
Building PHP Library ......... : no
Building Erlang Library ...... : no
Building Go Library .......... : no
Building D Library ........... : no

C++ Library:
   Build TZlibTransport ...... : yes
   Build TNonblockingServer .. : yes
   Build TQTcpServer (Qt) .... : no

Java Library:
   Using javac ............... : javac
   Using java ................ : java
   Using ant ................. : /home/conan/toolkit/ant184/bin/ant

Python Library:
   Using Python .............. : /usr/bin/python

If something is missing that you think should be present,
please skim the output of configure to find the missing
component.  Details are present in config.log.

我本机的已支持C++, Java与Thrift的通信。

3.2.2 增加Python语言的通信

虽然Python已被显示支持与Thrift但在后面编译过程中,还是缺少一些Python的库,我们需要再增加Python的依赖库

安装Python的依赖包


sudo apt-get install python-all python-all-dev python-all-dbg

3.2.3 增加PHP语言的通信

安装PHP的依赖包


sudo apt-get install php5-dev php5-cli phpunit

生成配置脚本


~  ./configure --enable-thrift_protocol

//省略部分日志输出

thrift 0.9.1

Building C++ Library ......... : yes
Building C (GLib) Library .... : no
Building Java Library ........ : yes
Building C# Library .......... : no
Building Python Library ...... : yes
Building Ruby Library ........ : no
Building Haskell Library ..... : no
Building Perl Library ........ : no
Building PHP Library ......... : yes
Building Erlang Library ...... : no
Building Go Library .......... : no
Building D Library ........... : no

C++ Library:
   Build TZlibTransport ...... : yes
   Build TNonblockingServer .. : yes
   Build TQTcpServer (Qt) .... : no

Java Library:
   Using javac ............... : javac
   Using java ................ : java
   Using ant ................. : /home/conan/toolkit/ant184/bin/ant

Python Library:
   Using Python .............. : /usr/bin/python

PHP Library:
   Using php-config .......... : /usr/bin/php-config

If something is missing that you think should be present,
please skim the output of configure to find the missing
component.  Details are present in config.log.

我们看到Thrift的配置中,增加了对PHP语言的支持。

3.2.4 编译和安装


# 编译Thrift
~ make

//省略部分日志

make[5]: 正在进入目录 `/home/conan/hadoop/thrift-0.9.1/lib/php/src/ext/thrift_protocol'
make[5]: *** 没有指明目标并且找不到 makefile。 停止。
make[5]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/lib/php/src/ext/thrift_protocol'
make[4]: *** [src/ext/thrift_protocol/modules/thrift_protocol.so] 错误 2
make[4]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/lib/php'
make[3]: *** [all-recursive] 错误 1
make[3]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/lib/php'
make[2]: *** [all-recursive] 错误 1
make[2]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/lib'
make[1]: *** [all-recursive] 错误 1
make[1]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1'
make: *** [all] 错误 2

在make生成过程,出现PHP的编译错误。从Thrift的错误列表中,我们可以找到错误描述( https://issues.apache.org/jira/browse/THRIFT-2265 ),这是由于Thrift-0.9.1发行包,打包时缺少了PHP扩展文件造成的错误,并在Thrift-0.9.2版本中修复。

所以,我们如果还想继续使用Thrift-0.9.1版本,则不能支持PHP语言。


# 生成配置信息,不包括PHP模块
~ ./configure --without-php_extension

# 编译Thrift
~ make

编译过程中,又出现了C++编译错误。


Makefile:832: 警告:覆盖关于目标“gen-cpp/ThriftTest.cpp”的命令
Makefile:829: 警告:忽略关于目标“gen-cpp/ThriftTest.cpp”的旧命令
/bin/bash ../../libtool --tag=CXX   --mode=link g++ -Wall -g -O2 -L/usr/lib   -o libtestgencpp.la  ThriftTest_constants.lo ThriftTest_types.lo ../../lib/cpp/libthrift.la -lssl -lcrypto -lrt -lpthread
libtool: link: ar cru .libs/libtestgencpp.a .libs/ThriftTest_constants.o .libs/ThriftTest_types.o
ar: .libs/ThriftTest_constants.o: No such file or directory
make[3]: *** [libtestgencpp.la] 错误 1
make[3]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/test/cpp'
make[2]: *** [all-recursive] 错误 1
make[2]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/test'
make[1]: *** [all-recursive] 错误 1
make[1]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1'
make: *** [all] 错误 2

对于上面的2个编译错误,我决定换成git源代码的版本重新操作。

3.2 通过git源代码安装Thrift

运行安装命令


# 进行thrift-git目录
~ cd /home/conan/hadoop/thrift-git

# 复制0.9.1标签到新分支thrift-0.9.1
~ git checkout -b thrift-0.9.1 0.9.1

# 产生配置脚本
~ ./bootstrap.sh

# 生成配置信息
~ ./configure

# 编译Thrift
~ make

# 安装Thrift
~ sudo make install

走了许多弯路,终于使用git源代码版本安装好了Thrift。

查看thrift版本


~ thrift -version
Thrift version 0.9.1

接下来,我们启动HBase的Thrift Server服务


# 启动HBase的Thrift服务
~ /home/conan/hadoop/hbase-0.94.18/bin/hbase-daemon.sh start thrift
starting thrift, logging to /home/conan/hadoop/hbase-0.94.18/bin/../logs/hbase-conan-thrift-master.out

# 检查系统进程
~ jps
13838 TaskTracker
13541 JobTracker
15946 HMaster
32120 Jps
12851 NameNode
13450 SecondaryNameNode
13133 DataNode
32001 ThriftServer
15817 HQuorumPeer
16283 HRegionServer

我们看到ThriftServer已被启动,后面我们就可以使用多种语言,通过Thrift来访问HBase了,这样就完成了HBase的安装。

转载请注明出处:
http://blog.fens.me/linux-hbase-install/

打赏作者

读书笔记 Big Data Analytics with R and Hadoop

RHadoop实践系列文章,包含了R语言与Hadoop结合进行海量数据分析。Hadoop主要用来存储海量数据,R语言完成MapReduce 算法,用来替代Java的MapReduce实现。有了RHadoop可以让广大的R语言爱好者,有更强大的工具处理大数据1G, 10G, 100G, TB, PB。 由于大数据所带来的单机性能问题,可能会一去不复返了。

RHadoop实践是一套系列文章,主要包括”Hadoop环境搭建”,”RHadoop安装与使用”,R实现MapReduce的协同过滤算法”,”HBase和rhbase的安装与使用”。对于单独的R语言爱好者,Java爱好者,或者Hadoop爱好者来说,同时具备三种语言知识并不容 易。此文虽为入门文章,但R,Java,Hadoop基础知识还是需要大家提前掌握。

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/r-hadoop-book-big-data/

r-hadoop-book-big-data

前言

最近的一本新书Big Data Analytics with R and Hadoop是关于R和Hadoop实践的第一本图书,作者Vignesh Prajapati曾经在图书出版的半年前联系过我,通过Google翻译发现了我的博客,希望把其中的1-2个例子放到他的书中。

没想到这本书,经过半年就出版了,作者效率还是挺高的。受Packt Publishing编辑Amol Bhosle委托为本书写个书评,于是就有本篇文章。

目录

  1. 图书概览
  2. 图书内容剖析
  3. 最后总结

1. 图书概览

本书的几个核心点:R,Hadoop, R+Hadoop, 数据分析案例,机器学习算法案例,R的数据访问接口。

我通过一个思维导图来表达。

Big Data Analytics with R and Hadoop - fens.me

书中最重要的是案例部分,作者分别使用R语言单机实现,以及RHadoop的分步式实现,介绍是多个案例的实践。

2. 图书内容剖析

  • R语言介绍
  • Hadoop介绍
  • R+Hadoop技术方案
  • 数据分析案例
  • 大数据分析案例
  • R语言的数据访问接口

1). R语言介绍

简单地介绍了R安装,RStudio安装,R语言最擅长算法模型:回归,分类,聚类,推荐。

2). Hadoop介绍

主要是介绍了Hadoop安装,在几种不同的Linux系统上,用Apache Hadoop和Cloudera Hadoop二个版本对比安装。

简单介绍了HDFS,dateeode和namenode;讲了MapReduce的工作原理,和用MapReduce对数据处理的过程。

介绍了Hadoop的命令行的使用。

3). R+Hadoop技术方案

作者对于R+Hadoop做了3种技术方案讨论,分别是RHipe, RHadoop, R + Hadoop Streaming。

a. RHipe

RHipe是R与Hadoop的集成编程环境。RHIPE可以让R语言与Hadoop进行通信,访问Hadoop的HDFS和调用MapReduce,让R语言的使用者利用Hadoop的分步式环境进行行大数据的分析。

RHipe官方网站:http://www.datadr.org/

b. RHadoop

RHadoop是由RevolutionAnalytics公司开发的一个R与Hadoop的集成编程环境,与RHipe的功能一样。RHadoop包含三个R包 (rmr,rhdfs,rhbase),分别是对应Hadoop系统架构中的,MapReduce, HDFS, HBase 三个部分。在2013年底,又增加第4个R包plyrmr,用于数据处理操作。本书中并没有涉及plyrmr包。

RHadoop的发布页:https://github.com/RevolutionAnalytics/RHadoop/wiki

RHadoop实践系列文章:http://blog.fens.me/series-rhadoop/

c. R + Hadoop Streaming

Hadoop Streaming是Hadoop提供的,允许任何可执行的脚本作为Mapper和Reducer的一种实现方实。R + Hadoop Streaming就是用R脚本实现Mapper和Reducer。作者用2种方式,进行了测试。

c1. R Script + Hadoop Streaming : 单纯的R语言脚本,通过Shell运行。
c2. HadoopStreaming + Hadoop Streaming: 使用一个封装好的R包HadoopStreaming来实现。

这3种技术方案,是目前R与Hadoop结合实现方案。我对RHadoop和R Script + Hadoop Streaming比较熟悉,经过我测试只有R Script + Hadoop Streaming这种方式,可以用于生产环境,RHadoop性能还是有一些问题的,可以提升的空间还是很大的。书中介绍的另外两种方法,我要有时间,再去试试。不过,我相信R和Hadoop的结合的项目,会越来越多的。

4). 数据分析案例

上面篇幅把技术的基础都说清楚,接下来就是核心案例了,书中介绍了如何使用R结合Hadoop进行数据分析。

首先书中介绍了,一个数据分析项目的框架,包括5个部分。

  • 问题
  • 定义数据需求
  • 数据预处理
  • 数据建模及执行
  • 数据可视化

然后,作者介绍了3个应用案例。

  • 网页分类:把一个网站中的网页按重要性进行排名
  • 股票分析:通过历史交易数据计算股票市场的变化
  • 价格预测:预测图书销售的价格,来自Kaggle的一道竞赛题

上面3个案例,都使用R和RHadoop进行实现,都是非常不错的案例,可以给我们的学习和使用提供很好的思路。

5). 机器学习算法案例

接下来,作者介绍了用RHadoop实现的基于大数据机器学习算法,充分结合了R和Hadoop的优势。作者把机器学习算法分为3类,监督学习算法,非监督学习算法,推荐算法。

4个算法的案例:

a. 线性回归

最简单的一种回归算法,可以下面公式表示。

y = ax + e 

在R语言中,一个函数lm()就可以实现。

在大数据的背景下,利用RHadoop,需要自己实现lm()这个函数。


# Reducer
Sum = function(., YY) keyval(1, list(Reduce('+', YY)))

# XtX =
values(

# For loading hdfs data in to R
from.dfs(

# MapReduce Job to produce XT*X
mapreduce(
input = X.index,

# Mapper – To calculate and emitting XT*X
map =
function(., Xi) {
yi = y[Xi[,1],]
Xi = Xi[,-1]
keyval(1, list(t(Xi) %*% Xi))},

# Reducer – To reduce the Mapper output by performing sum
operation over them
reduce = Sum,
combine = TRUE)))[[1]]

b. Logistic回归

比线性回归稍微复杂一些,可以用公式表示。


logit(p) = β0 + β1 × x1 + β2 × x2 + ... + βn × xn

R程序还是一个函数glm()。

在大数据的背景下,利用RHadoop,需要自己实现glm()这个函数。


# Mapper – computes the contribution of a subset of points to the
gradient.

lr.map =
  function(., M) {
    Y = M[,1]
    X = M[,-1]
    keyval(1, Y * X * g(-Y * as.numeric(X %*% t(plane))))
}

# Reducer – Perform sum operation over Mapper output.

lr.reduce = function(k, Z) keyval(k, t(as.matrix(apply(Z,2,sum))))

# MapReduce job – Defining MapReduce function for executing logistic
regression

logistic.regression =
  function(input, iterations, dims, alpha){
    plane = t(rep(0, dims))
    g = function(z) 1/(1 + exp(-z))
    for (i in 1:iterations) {
      gradient =
        values(
          from.dfs(
            mapreduce(input, map = lr.map, reduce = lr.reduce, combine = T)))
      plane = plane + alpha * gradient 
    }
    plane 
}

c. 聚类算法

这里介绍的是快速聚类kmeans,聚类属于非监督学习法。

通过R语言现实,一个函数kmeans()就可以完成了。

通过RHadoop现实,就需要自己重写这个迭代过程。


# distance calculation function
dist.fun = function(C, P) {
  apply(C,1,function(x) colSums((t(P) - x)^2))
}

# k-Means Mapper
kmeans.map = function(., P) {
  nearest = {

  # First interations- Assign random cluster centers
  if(is.null(C))
    sample(1:num.clusters,nrow(P),replace = T)

# Rest of the iterations, where the clusters are assigned # based
on the minimum distance from points

  else {
    D = dist.fun(C, P)
    nearest = max.col(-D)}}
    if(!(combine || in.memory.combine))
      keyval(nearest, P)
    else
      keyval(nearest, cbind(1, P))
}


# k-Means Reducer
kmeans.reduce = {

  # calculating the column average for both of the conditions
  if (!(combine || in.memory.combine) )
    function(., P) t(as.matrix(apply(P, 2, mean)))
  else function(k, P) keyval(k,t(as.matrix(apply(P, 2, sum))))
}


# k-Means MapReduce – for
kmeans.mr = function(P,num.clusters,num.iter,combine,in.memory.combine) {
  C = NULL
  for(i in 1:num.iter ) {
    C =  values(from.dfs(mapreduce(P,map = kmeans.map,reduce = kmeans.reduce)))
    if(combine || in.memory.combine)
    C = C[, -1]/C[, 1]
    if(nrow(C) < num.clusters) {
      C =rbind(C,matrix(rnorm((num.clusters -nrow(C)) * nrow(C)),ncol = row(C)) %*% C) 
    }
  }
  C
}

d. 推荐算法

这里介绍的是协同过滤算法,算法实现就有点复杂了。主要思想就是:

同现矩阵 * 评分矩阵 = 推荐结果

注:由于这个案例出自我的博客,我就直接贴我的文章地址了,也方便中文读者了解算法细节。

R语言的算法实现:http://blog.fens.me/r-mahout-usercf/

RHadoop分步式算法实现:http://blog.fens.me/rhadoop-mapreduce-rmr/

6). R语言的数据访问接口

最后一部分,书中介绍了R语言的各种数据访问接口。

FILE:

  • CSV: read.csv(), write.csv()
  • Excel: xlsx, xlsxjars,rJava

Database:

  • MySQL: RMySQL
  • SQLite: RSQLite
  • PostgreSQL: RPostgreSQL

NoSQL:

  • MongoDB: rmongodb
  • Hive: RHive
  • HBase: RHBase

补充一下,我的博客中也写了R的数据访问接口的文章。

R的多种接口已经打通,这也就说明R已经做好了准备,为工业界带来革命的力量。

3. 最后总结

这是一本不错的图书,从R+Hadoop的角度出发,打开R语言面向大数据应用的思路。这种结合是跨学科碰撞的结果,是市场需求的导向。但由于R+Hadoop还不够成熟,企业级应用依然缺乏成功案例,所以当实施R+Hadoop的应用时,还会碰到非常多的问题。期待有担当的公司和个人,做出被大家认可的产品来。

转载请注明出处:
http://blog.fens.me/r-hadoop-book-big-data/

打赏作者

Mahout构建图书推荐系统

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-recommend-book/

mahout-recommendation-book

前言

本文是Mahout实现推荐系统的又一案例,用Mahout构建图书推荐系统。与之前的两篇文章,思路上面类似,侧重点在于图书的属性如何利用。本文的数据在自于Amazon网站,由爬虫抓取获得。

目录

  1. 项目背景
  2. 需求分析
  3. 数据说明
  4. 算法模型
  5. 程序开发

1. 项目背景

Amazon是最早的电子商务网站之一,以网上图书起家,最后发展成为音像,电子消费品,游戏,生活用品等的综合性电子商务平台。Amazon的推荐系统,是互联网上最早的商品推荐系统,它为Amazon带来了至少30%的流量,和可观的销售利润。

如今推荐系统已经成为电子商务网站的标配,如果还没有推荐系统都不好意思,说自己是做电商的。

2. 需求分析

推荐系统如此重要,我们应该如果理解?

打开Amazon的Mahout In Action图书页面:
http://www.amazon.com/Mahout-Action-Sean-Owen/dp/1935182684/ref=pd_sim_b_1?ie=UTF8&refRID=0H4H2NSSR8F34R76E2TP

网页上的元素:

  • 广告位:广告商投放广告的位置,网站可以靠网络广告赚钱,一般是网页最好的位置。
  • 平均分:用户对图书的打分
  • 关联规则:通过关联规则,推荐位
  • 协同过滤:通过基于物品的协同过滤算法的,推荐位
  • 图书属性:包括页数,出版社,ISBN,语言等
  • 作者介绍:有关作者的介绍,和作者的其他著作
  • 用户评分:用户评分行为
  • 用户评论:用户评论的内容

amazon-book

在网页上,其他的推荐位:

amazon-book-2

结合上面2张截图,我们不难发现,推荐对于Amazon的重要性。除了最明显的广告位给了能直接带来利润的广告商,网页中有4处推荐位,分别从不同的维度,用不同的推荐算法,猜用户喜欢的商品。

3. 数据说明

2个数据文件:

  • rating.csv :用户评分行为数据
  • users.csv :用户属性数据

1). book-ratings.csv

  • 3列数据:用户ID,图书ID, 用户对图书的评分
  • 记录数: 4000次的图书评分
  • 用户数: 200个
  • 图书数: 1000个
  • 评分:1-10

数据示例


1,565,3
1,807,2
1,201,1
1,557,9
1,987,10
1,59,5
1,305,6
1,153,3
1,139,7
1,875,5
1,722,10
2,977,4
2,806,3
2,654,8
2,21,8
2,662,5
2,437,6
2,576,3
2,141,8
2,311,4
2,101,3
2,540,9
2,87,3
2,65,8
2,501,6
2,710,5
2,331,9
2,542,4
2,757,9
2,590,7

2). users.csv

  • 3列数据:用户ID,用户性别,用户年龄
  • 用户数: 200个
  • 用户性别: M为男性,F为女性
  • 用户年龄: 11-80岁之间

数据示例


1,M,40
2,M,27
3,M,41
4,F,43
5,F,16
6,M,36
7,F,36
8,F,46
9,M,50
10,M,21
11,F,11
12,M,42
13,F,40
14,F,28
15,M,25
16,M,68
17,M,53
18,F,69
19,F,48
20,F,56
21,F,36

4. 算法模型

本文主要介绍Mahout的基于物品的协同过滤模型,其他的算法模型将不再这里解释。

针对上面的数据,我将用7种算法组合进行测试:有关Mahout算法组合的详细解释,请参考文章:从源代码剖析Mahout推荐引擎

7种算法组合

  • userCF1: EuclideanSimilarity+ NearestNUserNeighborhood+ GenericUserBasedRecommender
  • userCF2: LogLikelihoodSimilarity+ NearestNUserNeighborhood+ GenericUserBasedRecommender
  • userCF3: EuclideanSimilarity+ NearestNUserNeighborhood+ GenericBooleanPrefUserBasedRecommender
  • itemCF1: EuclideanSimilarity + GenericItemBasedRecommender
  • itemCF2: LogLikelihoodSimilarity + GenericItemBasedRecommender
  • itemCF3: EuclideanSimilarity + GenericBooleanPrefItemBasedRecommender
  • slopeOne:SlopeOneRecommender

对上面的算法进行算法评估,有关于算法评估的详细解释,请参考文章:Mahout推荐算法API详解

  • 查准率:
  • 召回率(查全率):

5. 程序开发

系统架构:Mahout中推荐过滤算法支持单机算法和分步式算法两种。

  • 单机算法: 在单机内存计算,支持多种算法推荐算法,部署运行简单,修正处理数据量有限
  • 分步式算法: 基于Hadoop集群运行,支持有限的几种推荐算法,部署运行复杂,支持海量数据

mahout-recommend-job-architect

开发环境

  • Win7 64bit
  • Java 1.6.0_45
  • Maven3
  • Eclipse Juno Service Release 2
  • Mahout-0.8
  • Hadoop-1.1.2

开发环境mahout版本为0.8。 请参考文章:用Maven构建Mahout项目

新建Java类:

  • BookEvaluator.java, 选出“评估推荐器”验证得分较高的算法
  • BookResult.java, 对指定数量的结果人工比较
  • BookFilterGenderResult.java,只保留男性用户的图书列表

1). BookEvaluator.java, 选出“评估推荐器”验证得分较高的算法

源代码


package org.conan.mymahout.recommendation.book;

import java.io.IOException;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.neighborhood.UserNeighborhood;
import org.apache.mahout.cf.taste.similarity.ItemSimilarity;
import org.apache.mahout.cf.taste.similarity.UserSimilarity;

public class BookEvaluator {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/book/rating.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        userEuclidean(dataModel);
        userLoglikelihood(dataModel);
        userEuclideanNoPref(dataModel);
        itemEuclidean(dataModel);
        itemLoglikelihood(dataModel);
        itemEuclideanNoPref(dataModel);
        slopeOne(dataModel);
    }

    public static RecommenderBuilder userEuclidean(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userEuclidean");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
    
    public static RecommenderBuilder userLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userLoglikelihood");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
    
    public static RecommenderBuilder userEuclideanNoPref(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userEuclideanNoPref");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemEuclidean(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemEuclidean");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemLoglikelihood");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
    
    public static RecommenderBuilder itemEuclideanNoPref(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemEuclideanNoPref");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder slopeOne(DataModel dataModel) throws TasteException, IOException {
        System.out.println("slopeOne");
        RecommenderBuilder recommenderBuilder = RecommendFactory.slopeOneRecommender();

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
}

控制台输出:


userEuclidean
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.33333325386047363
Recommender IR Evaluator: [Precision:0.3010752688172043,Recall:0.08542713567839195]
userLoglikelihood
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.5245869159698486
Recommender IR Evaluator: [Precision:0.11764705882352945,Recall:0.017587939698492466]
userEuclideanNoPref
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:4.288461538461536
Recommender IR Evaluator: [Precision:0.09045226130653267,Recall:0.09296482412060306]
itemEuclidean
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:1.408880928305655
Recommender IR Evaluator: [Precision:0.0,Recall:0.0]
itemLoglikelihood
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.448554412835434
Recommender IR Evaluator: [Precision:0.0,Recall:0.0]
itemEuclideanNoPref
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.5665197873957957
Recommender IR Evaluator: [Precision:0.6005025125628134,Recall:0.6055276381909548]
slopeOne
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.6893078179405814
Recommender IR Evaluator: [Precision:0.0,Recall:0.0]

可视化“评估推荐器”输出:

推荐的结果的平均距离

difference

推荐器的评分

evaluator

只有itemEuclideanNoPref算法评估的结果是非常好的,其他算法的结果都不太好。

2). BookResult.java, 对指定数量的结果人工比较

为得到差异化结果,我们分别取4个算法:userEuclidean,itemEuclidean,userEuclideanNoPref,itemEuclideanNoPref,对推荐结果人工比较。

源代码


package org.conan.mymahout.recommendation.book;

import java.io.IOException;
import java.util.List;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;

public class BookResult {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/book/rating.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        RecommenderBuilder rb1 = BookEvaluator.userEuclidean(dataModel);
        RecommenderBuilder rb2 = BookEvaluator.itemEuclidean(dataModel);
        RecommenderBuilder rb3 = BookEvaluator.userEuclideanNoPref(dataModel);
        RecommenderBuilder rb4 = BookEvaluator.itemEuclideanNoPref(dataModel);
        
        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            System.out.print("userEuclidean       =>");
            result(uid, rb1, dataModel);
            System.out.print("itemEuclidean       =>");
            result(uid, rb2, dataModel);
            System.out.print("userEuclideanNoPref =>");
            result(uid, rb3, dataModel);
            System.out.print("itemEuclideanNoPref =>");
            result(uid, rb4, dataModel);
        }
    }

    public static void result(long uid, RecommenderBuilder recommenderBuilder, DataModel dataModel) throws TasteException {
        List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
        RecommendFactory.showItems(uid, list, false);
    }
}

控制台输出:只截取部分结果


...
userEuclidean       =>uid:63,
itemEuclidean       =>uid:63,(984,9.000000)(690,9.000000)(943,8.875000)
userEuclideanNoPref =>uid:63,(4,1.000000)(723,1.000000)(300,1.000000)
itemEuclideanNoPref =>uid:63,(867,3.791667)(947,3.083333)(28,2.750000)
userEuclidean       =>uid:64,
itemEuclidean       =>uid:64,(368,8.615385)(714,8.200000)(290,8.142858)
userEuclideanNoPref =>uid:64,(860,1.000000)(490,1.000000)(64,1.000000)
itemEuclideanNoPref =>uid:64,(409,3.950000)(715,3.830627)(901,3.444048)
userEuclidean       =>uid:65,(939,7.000000)
itemEuclidean       =>uid:65,(550,9.000000)(334,9.000000)(469,9.000000)
userEuclideanNoPref =>uid:65,(939,2.000000)(185,1.000000)(736,1.000000)
itemEuclideanNoPref =>uid:65,(666,4.166667)(96,3.093931)(345,2.958333)
userEuclidean       =>uid:66,
itemEuclidean       =>uid:66,(971,9.900000)(656,9.600000)(918,9.577709)
userEuclideanNoPref =>uid:66,(6,1.000000)(492,1.000000)(676,1.000000)
itemEuclideanNoPref =>uid:66,(185,3.650000)(533,3.617307)(172,3.500000)
userEuclidean       =>uid:67,
itemEuclidean       =>uid:67,(663,9.700000)(987,9.625000)(486,9.600000)
userEuclideanNoPref =>uid:67,(732,1.000000)(828,1.000000)(113,1.000000)
itemEuclideanNoPref =>uid:67,(724,3.000000)(279,2.950000)(890,2.750000)
...

我们查看uid=65的用户推荐信息:

查看user.csv数据集


> user[65,]
userid gender age
65     65      M  14

用户65,男性,14岁。

以itemEuclideanNoPref的算法的推荐结果,查看bookid=666的图书评分情况


> rating[which(rating$bookid==666),]
userid bookid pref
646      44    666   10
1327     89    666    7
2470    165    666    3
2697    179    666    7

发现有4个用户对666的图书评分,查看这4个用户的属性数据


> user[c(44,89,165,179),]
userid gender age
44      44      F  76
89      89      M  40
165    165      F  59
179    179      F  68

这4个用户,3女1男。

我们假设男性和男性有相同的图书兴趣,女性和女性有相同的图书偏好。因为用户65是男性,所以我们接下来排除女性的评分者,只保留男性评分者的评分记录。

3). BookFilterGenderResult.java,只保留男性用户的图书列表

源代码


package org.conan.mymahout.recommendation.book;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.IDRescorer;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;

public class BookFilterGenderResult {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/book/rating.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        RecommenderBuilder rb1 = BookEvaluator.userEuclidean(dataModel);
        RecommenderBuilder rb2 = BookEvaluator.itemEuclidean(dataModel);
        RecommenderBuilder rb3 = BookEvaluator.userEuclideanNoPref(dataModel);
        RecommenderBuilder rb4 = BookEvaluator.itemEuclideanNoPref(dataModel);
        
        long uid = 65;
        System.out.print("userEuclidean       =>");
        filterGender(uid, rb1, dataModel);
        System.out.print("itemEuclidean       =>");
        filterGender(uid, rb2, dataModel);
        System.out.print("userEuclideanNoPref =>");
        filterGender(uid, rb3, dataModel);
        System.out.print("itemEuclideanNoPref =>");
        filterGender(uid, rb4, dataModel);
    }

    /**
     * 对用户性别进行过滤
     */
    public static void filterGender(long uid, RecommenderBuilder recommenderBuilder, DataModel dataModel) throws TasteException, IOException {
        Set userids = getMale("datafile/book/user.csv");

        //计算男性用户打分过的图书
        Set bookids = new HashSet();
        for (long uids : userids) {
            LongPrimitiveIterator iter = dataModel.getItemIDsFromUser(uids).iterator();
            while (iter.hasNext()) {
                long bookid = iter.next();
                bookids.add(bookid);
            }
        }

        IDRescorer rescorer = new FilterRescorer(bookids);
        List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM, rescorer);
        RecommendFactory.showItems(uid, list, false);
    }

    /**
     * 获得男性用户ID
     */
    public static Set getMale(String file) throws IOException {
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));
        Set userids = new HashSet();
        String s = null;
        while ((s = br.readLine()) != null) {
            String[] cols = s.split(",");
            if (cols[1].equals("M")) {// 判断男性用户
                userids.add(Long.parseLong(cols[0]));
            }
        }
        br.close();
        return userids;
    }

}

/**
 * 对结果重计算
 */
class FilterRescorer implements IDRescorer {
    final private Set userids;

    public FilterRescorer(Set userids) {
        this.userids = userids;
    }

    @Override
    public double rescore(long id, double originalScore) {
        return isFiltered(id) ? Double.NaN : originalScore;
    }

    @Override
    public boolean isFiltered(long id) {
        return userids.contains(id);
    }
}

控制台输出:


userEuclidean       =>uid:65,
itemEuclidean       =>uid:65,(784,8.090909)(276,8.000000)(476,7.666667)
userEuclideanNoPref =>uid:65,
itemEuclideanNoPref =>uid:65,(887,2.250000)(356,2.166667)(430,1.866667)

我们发现,由于只保留男性的评分记录,数据量就变得比较少了,基于用户的协同过滤算法,已经没有输出的结果了。基于物品的协同过滤算法,结果集也有所变化。

对于itemEuclideanNoPref算法,输出排名第一条为ID为887的图书。

我再进一步向下追踪:查询哪些用户对图书887进行了打分。


> rating[which(rating$bookid==887),]
userid bookid pref
1280     85    887    2
1743    119    887    8
2757    184    887    4
2791    186    887    5

有4个用户对图书887评分,再分别查看这个用户的属性


> user[c(85,119,184,186),]
userid gender age
85      85      F  31
119    119      F  49
184    184      M  27
186    186      M  35

其中2男,2女。由于我们的算法,已经排除了女性的评分,我们可以推断图书887的推荐应该来自于2个男性的评分者的推荐。

分别计算用户65,与用户184和用户186的评分的图书交集。


rat65<-rating[which(rating$userid==65),]
rat184<-rating[which(rating$userid==184),]
rat186<-rating[which(rating$userid==186),]

> intersect(rat65$bookid ,rat184$bookid)
integer(0)
> intersect(rat65$bookid ,rat186$bookid)
[1]  65 375

最后发现,用户65与用户186都给图书65和图书375打过分。我们再打分出用户186的评分记录。


> rat186
userid bookid pref
2790    186     65    7
2791    186    887    5
2792    186    529    3
2793    186    375    6
2794    186    566    7
2795    186    169    4
2796    186    907    1
2797    186    821    2
2798    186    720    5
2799    186    642    5
2800    186    137    3
2801    186    744    1
2802    186    896    2
2803    186    156    6
2804    186    392    3
2805    186    386    3
2806    186    901    7
2807    186     69    6
2808    186    845    6
2809    186    998    3

用户186,还给图书887打过分,所以对于给65用户推荐图书887,是合理的。

我们通过一个实际的图书推荐的案例,更进一步地了解了如何用Mahout构建推荐系统。

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-mahout-recommend-book/

打赏作者

PeopleRank从社交网络中发现个体价值

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-social-peoplerank/

peoplerank

前言

如果说Google改变了互联网,那么社交网络就改变人们的生活方式。通过社交网络,我们每个个体,都是成为了网络的中心。我们的生活半径,被无限放大,通过6个朋友关系,就可以认识世界上任何一个人。

未来的互联网将是属于我们每一个人。

目录

  1. PeopleRank和PageRank
  2. 需求分析:从社交网络中发现个体价值
  3. 算法模型:PeopleRank算法
  4. 架构设计:PeopleRank计算引擎系统架构
  5. 程序开发:PeopleRank算法实现

1. PeopleRank和PageRank

PageRank让Google成为搜索领域的No.1,也是当今最有影响力的互联网公司之一,用技术创新改变人们的生活。PageRank主要用于网页评分计算,把互联网上的所有网页都进行打分,给网页价值的体现。

自2012以来,中国开始进入社交网络的时代,开心网,人人网,新浪微博,腾讯微博,微信等社交网络应用,开始进入大家的生活。最早是由“抢车位”,“偷菜”等社交游戏带动的社交网络的兴起,如今人们会更多的利用社交网络,获取信息和分享信息。我们的互联网,正在从以网页信息为核心的网络,向着以人为核心的网络转变着。

于是有人就提出了,把PageRank模型应用于社交网络,定义以人为核心的个体价值。这样PageRank模型就有了新的应用领域,同时也有了一个新的名字PeopleRank。

关于PageRank的介绍,请参考文章:PageRank算法R语言实现

注:PeopleRank网上还有不同的解释,我这里仅仅表示用来解释“PageRank模型”。

下面我们将从一个PeopleRank的案例来解释,如何从社交网络中发现个体价值。

2. 需求分析:从社交网络中发现个体价值

案例介绍:
以新浪微博为例,给微博中每个用户进行评分!
从新浪微博上,把我们的关注和粉丝的关系都找到。

如下图:我在微博上,随便找了几个微博账号。

weibo-logo

我们的任务是,需要给这些账号评分!

  • 方法一,简单求和:评分=关注数+粉丝数+微博数
  • 方法二,加权求和:评分=a*关注数+b*粉丝数+c*微博数

新建数据文件:weibo.csv


~ vi weibo.csv

A,314,1091,1488
B,196,10455,327
C,557,51635228,13793
D,55,14655464,1681
E,318,547,4899
F,166,145,170
G,17,890,169
H,54,946759,17229

R语言读入数据文件


weibo<-read.csv(file="weibo.csv",header=FALSE)
names(weibo)<-c("id","follow","fans","tweet")

1). 方法一,简单相加法


> data.frame(weibo[1],rank=rowSums(weibo[2:4]))
  id     rank
1  A     2893
2  B    10978
3  C 51649578
4  D 14657200
5  E     5764
6  F      481
7  G     1076
8  H   964042

这种方法简单粗暴的方式,是否能代码公平的打分呢?!

2). 方法二,加权求和

通过a,b,c的3个参数,分别设置权重求和。与方法一存在同样的问题,a,b,c的权值都是人为指定的,也是不能表示公平的打分的。

除了上面的两个方法,你能否想到不一样的思路呢!

3. 算法模型:PeopleRank算法

基于PageRank的理论,我们以每个微博账户的“关注”为链出链接,“粉丝”为链入链接,我们把这种以人为核心的关系,叫PeopleRank。

关于PageRank的介绍,请参考文章:PageRank算法R语言实现

PeopleRank假设条件:

  • 数量假设:如果一个用户节点接收到的其他用户“关注”的数量越多,那么这个用户越重要。
  • 质量假设:用户A的“粉丝”质量不同,质量高的“粉丝”会通“关注”接向其他用户传递更多的权重。所以越是质量高的“用户”关注用户A,则用户A越重要。

衡量PeopleRank的3个指标:

  • 粉丝数
  • 粉丝是否有较高PeopleRank值
  • 粉丝关注了多少人

我们以下的数据为例,构造基于微博的数据模型:
(由于微博数据已增加访问权限,我无法拿到当前的实际数据,我用以前@晒粉丝应用,收集到的微博数据为例,这里ID已经过处理)

测试数据集:people.csv

  • 25个用户
  • 66个关系,关注和粉丝的关系

数据集: people.csv


1,19
1,21
2,11
2,17
2,21
3,1
3,20
3,2
3,7
3,6
3,10
4,3
4,6
5,19
5,11
5,2
6,4
6,12
6,18
6,15
6,10
6,5
7,9
7,18
7,10
8,3
8,11
8,7
8,16
8,14
9,6
10,8
10,18
11,13
11,3
12,9
12,4
12,16
12,5
13,19
13,1
13,6
14,7
14,17
14,19
14,1
14,5
14,2
15,11
15,14
15,12
16,20
17,4
17,6
18,10
18,11
18,15
18,14
19,18
20,10
20,5
21,24
22,11
23,17
24,15
25,24

第一列为用户ID,第二列也是用户ID。第一列用户,关注了第二用户。

以R语言可视化输出

igraph-refs

R语言程序


library(igraph)
people<-read.csv(file="people.csv",header=FALSE)
drawGraph<-function(data){
  names(data)<-c("from","to") 
  g <- graph.data.frame(data, directed=TRUE)
  V(g)$label <- V(g)$name
  V(g)$size <- 15
  E(g)$color <- grey(0.5)
  g2 <- simplify(g)
  plot(g2,layout=layout.circle)
}
drawGraph(people)

用R语言构建PeopleRank的算法原型

  • 构建邻接矩阵
  • 变换概率矩阵
  • 递归计算矩阵特征值
  • 标准化结果
  • 对结果排序输出

R语言算法模型


#构建邻接矩阵
adjacencyMatrix<-function(pages){
  n<-max(apply(pages,2,max))
  A <- matrix(0,n,n)
  for(i in 1:nrow(pages)) A[pages[i,]$dist,pages[i,]$src]<-1
  A
}

#变换概率矩阵
dProbabilityMatrix<-function(G,d=0.85){
  cs <- colSums(G)
  cs[cs==0] <- 1
  n <- nrow(G)
  delta <- (1-d)/n
  A <- matrix(delta,n,n)
  for (i in 1:n) A[i,] <- A[i,] + d*G[i,]/cs
  A
}

#递归计算矩阵特征值
eigenMatrix<-function(G,iter=100){
  n<-nrow(G)
  x <- rep(1,n)
  for (i in 1:iter) x <- G %*% x
  x/sum(x)
}

#直接计算矩阵特征值
calcEigenMatrix<-function(G){
  x <- Re(eigen(G)$vectors[,1])
  x/sum(x)
}

PeopleRank计算,带入数据集people.csv


people<-read.csv(file="people.csv",header=FALSE)
names(people)<-c("src","dist");people
A<-adjacencyMatrix(people);A
G<-dProbabilityMatrix(A);G
q<-calcEigenMatrix(G);

q
[1] 0.03274732 0.03404052 0.05983465 0.03527074 0.04366519 0.07042752 0.02741232
 [8] 0.03378595 0.02118713 0.06537870 0.07788465 0.03491910 0.03910097 0.05076803
[15] 0.06685364 0.01916392 0.02793695 0.09450614 0.05056016 0.03076591 0.02956243
[22] 0.00600000 0.00600000 0.03622806 0.00600000

我们给这25用户进行打分,从高到低进行排序。

对结果排序输出:


result<-data.frame(userid=userid,PR=q[userid])
result
   userid          PR
1      18 0.09450614
2      11 0.07788465
3       6 0.07042752
4      15 0.06685364
5      10 0.06537870
6       3 0.05983465
7      14 0.05076803
8      19 0.05056016
9       5 0.04366519
10     13 0.03910097
11     24 0.03622806
12      4 0.03527074
13     12 0.03491910
14      2 0.03404052
15      8 0.03378595
16      1 0.03274732
17     20 0.03076591
18     21 0.02956243
19     17 0.02793695
20      7 0.02741232
21      9 0.02118713
22     16 0.01916392
23     22 0.00600000
24     23 0.00600000
25     25 0.00600000

查看评分最高的用户18的关系数据:


people[c(which(people$src==18), which(people$dist==18)),]

   src dist
55  18   10
56  18   11
57  18   15
58  18   14
19   6   18
24   7   18
33  10   18
59  19   18

粉丝的PeopleRank排名:


which(result$userid %in% people$src[which(people$dist==18)])

[1]  3  5  8 20

粉丝的关注数:


table(people$src)[people$src[which(people$dist==18)]]

 6  7 10 19 
 6  3  2  1 

数据解释:用户18

  • 有4个粉丝为别是6,7,10,19。(粉丝数)
  • 4个粉丝的PeopleRank排名,是3,5,8,20。(粉丝是否有较高PeopleRank值)
  • 粉丝的关注数量,是6,3,2,1。(粉丝关注了多少人)

因此,通过对上面3个指标的综合打分,用户18是评分最高的用户。

通过R语言实现的计算模型,已经比较符合我们的评分标准了,下面我们把PeopleRank用MapReduce实现,以满足对海量数据的计算需求。

4. 架构设计:PeopleRank计算引擎系统架构

pagerank-spider

上图中,左边是数据爬虫系统,右边是Hadoop的HDFS, MapReduce。

  • 数据爬虫系统实时爬取微博数据
  • 设置系统定时器CRON,每xx小时,增量向HDFS导入数据(userid1,userid2)
  • 完成导入后,设置系统定时器,启动MapReduce程序,运行推荐算法。
  • 完成计算后,设置系统定时器,从HDFS导出推荐结果数据到数据库,方便以后的及时查询。

5. 程序开发:PeopleRank算法实现

win7的开发环境 和 Hadoop的运行环境 ,请参考文章:用Maven构建Hadoop项目

开发步骤:

  • 微博好友的关系数据: people.csv
  • 出始的PR数据:peoplerank.csv
  • 邻接矩阵: AdjacencyMatrix.java
  • PeopleRank计算: PageRank.java
  • PR标准化: Normal.java
  • 启动程序: PageRankJob.java

1). 微博好友的关系数据: people.csv,在上文中已列出

2). 出始的PR数据:peoplerank.csv


1,1
2,1
3,1
4,1
5,1
6,1
7,1
8,1
9,1
10,1
11,1
12,1
13,1
14,1
15,1
16,1
17,1
18,1
19,1
20,1
21,1
22,1
23,1
24,1
25,1

3). 邻接矩阵

矩阵解释:

  • 阻尼系数为0.85
  • 用户数为25
  • reduce以行输出矩阵的列,输出列主要用于分步式存储,下一步需要转成行

部分数据输出:


~ hadoop fs -cat /user/hdfs/pagerank/tmp1/part-r-00000|head -n 4

1	0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999
10	0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999
11	0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999
12	0.005999999,0.005999999,0.005999999,0.2185,0.2185,0.005999999,0.005999999,0.005999999,0.2185,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.2185,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999

4). PeopleRank计算: PageRank.java

迭代一次的PeopleRank值


~ hadoop fs -cat /user/hdfs/pagerank/pr/part-r-00000

1       0.716666
10      1.354167
11      2.232500
12      0.575000
13      0.575000
14      0.815833
15      1.354167
16      0.532500
17      1.425000
18      1.850000
19      1.283334
2       0.716667
20      1.141667
21      0.858333
22      0.150000
23      0.150000
24      1.850000
25      0.150000
3       1.170001
4       0.929167
5       1.070833
6       2.275001
7       0.603333
8       0.575000
9       0.645833

5). PR标准化: Normal.java

迭代10次,并标准化的结果:


~ hadoop fs -cat /user/hdfs/pagerank/result/part-r-00000

1       0.032842
10      0.065405
11      0.077670
12      0.034864
13      0.039175
14      0.050574
15      0.066614
16      0.019167
17      0.027990
18      0.094460
19      0.050673
2       0.034054
20      0.030835
21      0.029657
22      0.006000
23      0.006000
24      0.036111
25      0.006000
3       0.059864
4       0.035314
5       0.043805
6       0.070516
7       0.027444
8       0.033715
9       0.021251

我们对结果进行排序


   id       pr
10 18 0.094460
3  11 0.077670
22  6 0.070516
7  15 0.066614
2  10 0.065405
19  3 0.059864
11 19 0.050673
6  14 0.050574
21  5 0.043805
5  13 0.039175
17 24 0.036111
20  4 0.035314
4  12 0.034864
12  2 0.034054
24  8 0.033715
1   1 0.032842
13 20 0.030835
14 21 0.029657
9  17 0.027990
23  7 0.027444
25  9 0.021251
8  16 0.019167
15 22 0.006000
16 23 0.006000
18 25 0.006000

第一名是用户18,第二名是用户11,第三名是用户6,第三名与之前R语言单机计算的结果有些不一样,而且PR值也稍有不同,这是因为我们迭代10次时,特征值还没有完全的收敛,需要更多次的迭代计算,才能得矩阵的特征值。

程序API的实现,请参考文章:PageRank算法并行实现

我们通过PageRank的模型,成功地应用到了社交网络,实现了PeopleRank的计算,通过设计数据挖掘算法,来取代不成熟的人脑思想。算法模型将更客观,更精准。

最后,大家可以利用这个案例的设计思路,认真地了解社交网络,做出属于的自己的算法。

由于时间仓促,代码可能存在bug。请有能力同学,自行发现问题,解决问题!!

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-social-peoplerank/

打赏作者