• Posts tagged "Hadoop"

Blog Archives

2017河北民族师范学院:大数据时代的变革

跨界知识聚会系列文章,“知识是用来分享和传承的”,各种会议、论坛、沙龙都是分享知识的绝佳场所。我也有幸作为演讲嘉宾参加了一些国内的大型会议,向大家展示我所做的一些成果。从听众到演讲感觉是不一样的,把知识分享出来,你才能收获更多。

关于作者

  • 张丹, 程序员R,Nodejs,Java
  • weibo:@Conan_Z
  • blog:http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/meeting-collage-20170617

前言

数据中国“百校工程”项目,是由教育部学校规划建设发展中心,联合曙光信息产业股份有限公司发起的,在全国范围内遴选百所高校,部署集人才培养、科研支撑、行业应用及社会服务于一体的“大数据应用创新中心”,与项目院校共同设立“大数据学院”,完成大数据创新生态体系战略的顶层设计。

我非常有幸能参加这个教育类项目,通过顶层大数据专业课程设计,帮助大学生通过4年的在校学习,理解大数据思维,掌握大数据技术,毕业后成为大数据企业的核心人才。

本次分享主要面向,河北民族师范学院的大数据专业的老师和同学们,介绍了大数据给我们生活方式带来的变革。

目录

  1. 我的演讲主题:大数据时代的变革
  2. 会议体验和照片分享

1. 我的演讲主题:大数据时代的变革

大数据时代的变革,PPT下载,主要内容来自我的一篇博文:大数据时代的变革(未发布)。

本次活动为曙光大数据学院大师巡讲第二期,我主要通过问题的角度,论述大数据时代的变革。

下面9个问题,通常都是学生们开始入门大数据时比较关心的。

  1. 大数据是什么?
  2. 为什么需要大数据?
  3. 大数据解决了什么问题?
  4. 大数据给我们的生活带来了哪些改变?
  5. 大数据需要什么技术?
  6. 我们怎么学大数据的技术?
  7. 如何才能会学大数据的技术?
  8. 学会大数据技术能找到什么样的工作?
  9. 金融大数据是什么?

对于大数据这样的一个新专业来了,很学生学习起来是懵的。就像15年前,大学新开办的的电子商务专业一样。我也不知道电子商务这个是什么,专业怎么学,毕业后会有什么样的出路。现在,看阿里巴巴,京东就知道,什么是电子商务了。

对于本次的分享,我其实也想了解到学生们的疑惑,具体在什么地方!对于大数据的是否有兴趣?学习大数据的动力是什么?基础知识的水平怎么样? 在之后课程体系的设计,会针对学生的困扰,进行适当的方案调整,从而满足难度上和时间上的教学目标的要求。

在这里,不得不赞一下,曙光大数据学院提出的“VIP课程体系”。参考了《华盛顿协议》的教育理念,从顶层设计就很大程度领先于国内的课程设计理念,并结合了国内二本院校的特点,落地实施。

教育理念:

  • 用综合项目的设计建立技术应用全貌概念
  • 建立学习目标导向的强驱动机制
  • 项目难度递进促进知识学习的进阶关系
  • 多维度考核促进学习专注度和学习效果

《华盛顿协议》是工程教育本科专业认证的国际互认协议,1989年,由美国、英国、加拿大、爱尔兰、澳大利亚、新西兰6个国家的工程专业团体发起成立,旨在通过校准、系统的工程教育本科专业认证保证工程教育质量,为工程师资格国际互认奠定基础。《华盛顿协议》所有签约成员均为本国(地区)政府授权的、独立的非政府和专业性团体,目前共有15个正式成员、5个预备成员。

2. 会议体验和照片分享

2017年6月17日早上出发,从北京开车到承德,接近3个小时的路程,不算近。一路上和数据中国项目负责人谢总,一直在聊教育,聊产业,聊人才培养,聊了大数据,聊投资,聊了很多。

本次分享可以说是我一个人的专场了,整个会场大概500人左右,基本坐满。

张丹,主讲人,《R的极客理想》系列图书作者

中科曙光项目负责人,谢欧

现场提问

校园记者采访

与老师们的合照

与同学的们的合照

通过同学们的提问,我收集了很多真实的需求。最后,希望能够把我的经验,分享给渴望学习的同学们。祝同学们在4年的学习中学到真知识,未来的舞台是你们的。

转载请注明出处:
http://blog.fens.me/meeting-collage-20170617

打赏作者

基于Zookeeper的分步式队列系统集成案例

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

zookeeper-case

前言

软件系统集成一直是工业界的一个难题,像10年以上的遗留系统集成,公司收购后的多系统集成,全球性的分步式系统集成等。虽然基于SOA的软件架构,从理论上都可以解决这些集成的问题,但是具体实施过程,有些集成项目过于复杂而失败。

随着技术的创新和发展,对于分步式集群应用的集成,有了更好的开源软件的支持,像zookeeper就是一个不错的分步式协作软件平台。本文将通过一个案例介绍Zookeeper的强大。

目录

  1. 项目背景:分布式消息中间件
  2. 需求分析:业务系统升级方案
  3. 架构设计:搭建Zookeeper的分步式协作平台
  4. 程序开发:基于Zookeeper的程序设计
  5. 程序运行

1. 项目背景:分布式消息中间件

随着Hadoop的普及,越来越多的公司开始构建自己的Hadoop系统。有时候,公司内部的不同部门或不同的团队,都有自己的Hadoop集群。这种多集群的方式,既能让每个团队拥有个性化的Hadoop,又能避免大集群的高度其中化运维难度。当数据量不是特别巨大的时候,小型集群会有很多适用的场合。

当然,多个小型集群也有缺点,就是资源配置可能造成浪费。每个团队的Hadoop集群,都要配有服务器和运维人员。有些能力强的团队,构建的hadoop集群,可以达到真正的个性化要求;而有一些能力比较差的团队,搭建的Hadoop集群性能会比较糟糕。

还有一些时候,多个团队需要共同完成一个任务,比如,A团队通过Hadoop集群计算的结果,交给B团队继续工作,B完成了自己任务再交给C团队继续做。这就有点像业务系统的工作流一样,一环一环地传下去,直到最后一部分完成。

在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB服务器上部署自己的服务,然后通过消息中间件完成调度任务。对于分步式的多个Hadoop集群系统的协作,同样可以用这种架构来做,只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了。

Zookeeper就可以做为 分步式消息中间件,来完成上面的说的业务需求。ZooKeeper是Hadoop家族的一款高性能的分布式协作的产品,是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。Zookeeper的安装和使用,请参考文章 ZooKeeper伪分布式集群安装及使用

ZooKeeper提供分布式协作服务,并不需要依赖于Hadoop的环境。

2. 需求分析:业务系统升级方案

下面我将从一个案例出发,来解释如何进行分步式协作平台的系统设计。

2.1 案例介绍

某大型软件公司,从事领域为供应链管理,主要业务包括了 采购管理、应付账款管理、应收账款管理、供应商反复管理、退货管理、销售管理、库存管理、电子商务、系统集成等。

biz

每块业务的逻辑都很复杂,由单独部门进行软件开发和维护,部门之间的系统没有直接通信需求,每个部门完成自己的功能就行了,最后通过数据库来共享数据,实现各功能之间的数据交换。

zk1

随着业务的发展,客户对响应速度要求越来越高,通过数据库来共享数据的方式,已经达不到信息交换的要求,系统进行了第一次升级,通过企业服务总线(ESB)统一管理公司内部所有业务。通过WebServices发布服务,通过Message Queue实现业务功能的调度。

zk2

公司业务规模继续扩大,跨国收购了多家公司。业务系统从原来的一个机房的集中式部署,变成了全球性的多机房的分步式部署。这时,Message Queue已经不能满足多机房跨地域的业务系统的功能需求了,需要一种分步式的消息中间件解决方案,来代替原有消息中间件的服务。

系统进行了第二次升级,采用Zookeeper作为分步式中间件调度引擎。

zk3

通过上面的描述,我们可以看出,当一个公司从小到大,从国内业务发展到全球性业务的时候。
为了配合业务发展,IT系统也是越来越复杂的,从最早的主从数据库设计,到ESB企业系统总线的扩展,再到分步式ESB配合分步式消息系统,每一次的升级都需要软件技术的支撑。

2.2 功能需求

全球性采购业务和全球性销售业务,让公司在市场中处于竞争优势。但由于采购和销售分别是由不同部门进行的软件开发和维护,而且业务往来也在不同的国家和地区。所以在每月底结算时,工作量都特别大。

比如,计算利润表 (请不要纠结于公式的准确性)

当月利润 = 当月销售金额 - 当月采购金额 - 当月其他支出

这样一个非常简单的计算公式,但对于跨国公司和部门来说,一点也不简单的。

从系统角度来看,采购部门要统计采购数据(海量数据),销售部门统计销售数据((海量数据),其他部门统计的其他费用支出(汇总的少量数据),最后系统计算得到当月的利润。

这里要说明的是,采购系统是单独的系统,销售是另外单独的系统,及以其他几十个大大小小的系统,如何能让多个系统,配合起来做这道计算题呢??

3. 架构设计:搭建Zookeeper的分步式协作平台

接下来,我们基于zookeeper来构建一个分步式队列的应用,来解决上面的功能需求。下面内容,排除了ESB的部分,只保留zookeeper进行实现。

  • 采购数据,为海量数据,基于Hadoop存储和分析。
  • 销售数据,为海量数据,基于Hadoop存储和分析。
  • 其他费用支出,为少量数据,基于文件或数据库存储和分析。

我们设计一个同步队列,这个队列有3个条件节点,分别对应采购(purchase),销售(sell),其他费用(other)3个部分。当3个节点都被创建后,程序会自动触发计算利润,并创建利润(profit)节点。上面3个节点的创建,无顺序要求。每个节点只能被创建一次。

zk4

系统环境

  • 2个独立的Hadoop集群
  • 2个独立的Java应用
  • 3个Zookeeper集群节点

图标解释:

  • Hadoop App1,Hadoop App2 是2个独立的Hadoop集群应用
  • Java App3,Java App4 是2个独立的Java应用
  • zk1,zk2,zk3是ZooKeeper集群的3个连接点
  • /queue,是znode的队列目录,假设队列长度为3
  • /queue/purchase,是znode队列中,1号排对者,由Hadoop App1提交,用于统计采购金额。
  • /queue/sell,是znode队列中,2号排对者,由Hadoop App2提交,用于统计销售金额。
  • /queue/other,是znode队列中,3号排对者,由Java App3提交,用于统计其他费用支出金额。
  • /queue/profit,当znode队列中满了,触发创建利润节点。
  • 当/qeueu/profit被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束。

补充说明:

  • 创建/queue/purchase,/queue/sell,/queue/other目录时,没有前后顺序,程序提交后,/queue目录下会生成对应该子目录
  • App1可以通过zk2提交,App2也可通过zk3提交。原则上,找最近路由最近的znode节点提交。
  • 每个应用不能重复提出,直到3个任务都提交,计算利润的任务才会被执行。
  • /queue/profit被创建后,zk的应用会监听到这个事件,通知应用,队列已完成!

这里的同步队列的架构更详细的设计思路,请参考文章 ZooKeeper实现分布式队列Queue

4. 程序开发:基于Zookeeper的程序设计

最终的功能需求:计算2013年01月的利润。

4.1 实验环境

在真正企业开发时,我们的实验环境应该与需求是一致的,但我的硬件条件有限,因些做了一个简化的环境设置。

  • 把zookeeper的完全分步式部署的3台服务器集群节点的,改为一台服务器上3个集群节点。
  • 把2个独立Hadoop集群,改为一个集群的2个独立的MapReduce任务。

开发环境:

  • Win7 64bit
  • JDK 1.6
  • Maven3
  • Juno Service Release 2
  • IP:192.168.1.10

Zookeeper服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Zookeeper: 3.4.5
  • IP: 192.168.1.201
  • 3个集群节点

Hadoop服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Hadoop: 1.0.3
  • IP: 192.168.1.210

4.2 实验数据

3组实验数据:

  • 采购数据,purchase.csv
  • 销售数据,sell.csv
  • 其他费用数据,other.csv

4.2.1 采购数据集

一共4列,分别对应 产品ID,产品数量,产品单价,采购日期。


1,26,1168,2013-01-08
2,49,779,2013-02-12
3,80,850,2013-02-05
4,69,1585,2013-01-26
5,88,1052,2013-01-13
6,84,2363,2013-01-19
7,64,1410,2013-01-12
8,53,910,2013-01-11
9,21,1661,2013-01-19
10,53,2426,2013-02-18
11,64,2022,2013-01-07
12,36,2941,2013-01-28
13,99,3819,2013-01-19
14,64,2563,2013-02-16
15,91,752,2013-02-05
16,65,750,2013-02-04
17,19,2426,2013-02-23
18,19,724,2013-02-05
19,87,137,2013-01-25
20,86,2939,2013-01-14
21,92,159,2013-01-23
22,81,2331,2013-03-01
23,88,998,2013-01-20
24,38,102,2013-02-22
25,32,4813,2013-01-13
26,36,1671,2013-01-19

//省略部分数据

4.2.2 销售数据集

一共4列,分别对应 产品ID,销售数量,销售单价,销售日期。


1,14,1236,2013-01-14
2,19,808,2013-03-06
3,26,886,2013-02-23
4,23,1793,2013-02-09
5,27,1206,2013-01-21
6,27,2648,2013-01-30
7,22,1502,2013-01-19
8,20,1050,2013-01-18
9,13,1778,2013-01-30
10,20,2718,2013-03-14
11,22,2175,2013-01-12
12,16,3284,2013-02-12
13,30,4152,2013-01-30
14,22,2770,2013-03-11
15,28,778,2013-02-23
16,22,874,2013-02-22
17,12,2718,2013-03-22
18,12,747,2013-02-23
19,27,172,2013-02-07
20,27,3282,2013-01-22
21,28,224,2013-02-05
22,26,2613,2013-03-30
23,27,1147,2013-01-31
24,16,141,2013-03-20
25,15,5343,2013-01-21
26,16,1887,2013-01-30
27,12,2535,2013-01-12
28,16,469,2013-01-07
29,29,2395,2013-03-30
30,17,1549,2013-01-30
31,25,4173,2013-03-17

//省略部分数据

4.2.3 其他费用数据集

一共2列,分别对应 发生日期,发生金额


2013-01-02,552
2013-01-03,1092
2013-01-04,1794
2013-01-05,435
2013-01-06,960
2013-01-07,1066
2013-01-08,1354
2013-01-09,880
2013-01-10,1992
2013-01-11,931
2013-01-12,1209
2013-01-13,1491
2013-01-14,804
2013-01-15,480
2013-01-16,1891
2013-01-17,156
2013-01-18,1439
2013-01-19,1018
2013-01-20,1506
2013-01-21,1216
2013-01-22,2045
2013-01-23,400
2013-01-24,1795
2013-01-25,1977
2013-01-26,1002
2013-01-27,226
2013-01-28,1239
2013-01-29,702
2013-01-30,1396

//省略部分数据

4.3 程序设计

我们要编写5个文件:

  • 计算采购金额,Purchase.java
  • 计算销售金额,Sell.java
  • 计算其他费用金额,Other.java
  • 计算利润,Profit.java
  • Zookeeper的调度,ZookeeperJob.java

4.3.1 计算采购金额

采购金额,是基于Hadoop的MapReduce统计计算。


public class Purchase {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class PurchaseMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class PurchaseReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("purchase");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化purchase
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Purchase.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(PurchaseMapper.class);
        job.setReducerClass(PurchaseReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("purchase", "logfile/biz/purchase.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/purchase");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.2 计算销售金额

销售金额,是基于Hadoop的MapReduce统计计算。


public class Sell {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class SellMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class SellReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("sell");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化sell
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Sell.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(SellMapper.class);
        job.setReducerClass(SellReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("sell", "logfile/biz/sell.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.3 计算其他费用金额

其他费用金额,是基于本地文件的统计计算。


public class Other {

    public static String file = "logfile/biz/other.csv";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    private static String month = "2013-01";

    public static void main(String[] args) throws IOException {
        calcOther(file);
    }

    public static int calcOther(String file) throws IOException {
        int money = 0;
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));

        String s = null;
        while ((s = br.readLine()) != null) {
            // System.out.println(s);
            String[] tokens = DELIMITER.split(s);
            if (tokens[0].startsWith(month)) {// 1月的数据
                money += Integer.parseInt(tokens[1]);
            }
        }
        br.close();

        System.out.println("Output:" + month + "," + money);
        return money;
    }
}

4.3.4 计算利润

利润,通过zookeeper分步式自动调度计算利润。


public class Profit {

    public static void main(String[] args) throws Exception {
        profit();
    }

    public static void profit() throws Exception {
        int sell = getSell();
        int purchase = getPurchase();
        int other = getOther();
        int profit = sell - purchase - other;
        System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
    }

    public static int getPurchase() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
        return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
    }

    public static int getSell() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
        return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
    }

    public static int getOther() throws IOException {
        return Other.calcOther(Other.file);
    }

}

4.3.5 Zookeeper调度

调度,通过构建分步式队列系统,自动化程序代替人工操作。


public class ZooKeeperJob {

    final public static String QUEUE = "/queue";
    final public static String PROFIT = "/queue/profit";
    final public static String PURCHASE = "/queue/purchase";
    final public static String SELL = "/queue/sell";
    final public static String OTHER = "/queue/other";

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.out.println("Please start a task:");
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            doPurchase(zk);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            doSell(zk);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            doOther(zk);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
                    System.out.println("Queue has Completed!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => " + PROFIT);
        zk.exists(PROFIT, true);

        if (zk.exists(QUEUE, false) == null) {
            System.out.println("create " + QUEUE);
            zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(QUEUE + " is exist!");
        }
    }

    public static void doPurchase(ZooKeeper zk) throws Exception {
        if (zk.exists(PURCHASE, false) == null) {

            Purchase.run(Purchase.path());

            System.out.println("create " + PURCHASE);
            zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(PURCHASE + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doSell(ZooKeeper zk) throws Exception {
        if (zk.exists(SELL, false) == null) {

            Sell.run(Sell.path());

            System.out.println("create " + SELL);
            zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(SELL + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doOther(ZooKeeper zk) throws Exception {
        if (zk.exists(OTHER, false) == null) {

            Other.calcOther(Other.file);

            System.out.println("create " + OTHER);
            zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(OTHER + " is exist!");
        }
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws Exception {
        int size = 3;
        List children = zk.getChildren(QUEUE, true);
        int length = children.size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create " + PROFIT);
            Profit.profit();
            zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

            for (String child : children) {// 清空节点
                zk.delete(QUEUE + "/" + child, -1);
            }
        }
    }
}

5. 运行程序

最后,我们运行整个的程序,包括3个部分。

  • zookeeper服务器
  • hadoop服务器
  • 分步式队列应用

5.1 启动zookeeper服务

启动zookeeper服务器集群:


~ cd toolkit/zookeeper345

# 启动zk集群3个节点
~ bin/zkServer.sh start conf/zk1.cfg
~ bin/zkServer.sh start conf/zk2.cfg
~ bin/zkServer.sh start conf/zk3.cfg

~ jps
4234 QuorumPeerMain
5002 Jps
4275 QuorumPeerMain
4207 QuorumPeerMain

查看zookeeper集群中,各节点的状态


# 查看zk1节点状态
~ bin/zkServer.sh status conf/zk1.cfg
JMX enabled by default
Using config: conf/zk1.cfg
Mode: follower

# 查看zk2节点状态,zk2为leader
~ bin/zkServer.sh status conf/zk2.cfg
JMX enabled by default
Using config: conf/zk2.cfg
Mode: leader

# 查看zk3节点状态
~ bin/zkServer.sh status conf/zk3.cfg
JMX enabled by default
Using config: conf/zk3.cfg
Mode: follower

启动zookeeper客户端:


~ bin/zkCli.sh -server 192.168.1.201:2181

# 查看zk
[zk: 192.168.1.201:2181(CONNECTED) 0] ls /
[queue, queue-fifo, zookeeper]

# /queue路径无子目录
[zk: 192.168.1.201:2181(CONNECTED) 1] ls /queue
[]

5.2 启动Hadoop服务


~ hadoop/hadoop-1.0.3
~ bin/start-all.sh

~ jps
25979 JobTracker
26257 TaskTracker
25576 DataNode
25300 NameNode
12116 Jps
25875 SecondaryNameNode

5.3 启动分步式队列ZookeeperJob

5.3.1 启动统计采购数据程序,设置启动参数1

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
copy from: logfile/biz/purchase.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Output:2013-01,9609887
create /queue/purchase
Queue Complete:1/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 3] ls /queue
[purchase]

5.3.2 启动统计销售数据程序,设置启动参数2

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
copy from: logfile/biz/sell.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Output:2013-01,2950315
create /queue/sell
Queue Complete:2/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[purchase, sell]

5.3.3 启动统计其他费用数据程序,设置启动参数3

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Output:2013-01,34193
create /queue/other
Queue Complete:3/3
create /queue/profit
cat: hdfs://192.168.1.210:9000/user/hdfs/biz/sell/output/part-r-00000
2950315

cat: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase/output/part-r-00000
9609887

Output:2013-01,34193
profit = sell - purchase - other = 2950315 - 9609887 - 34193 = -6693765
Queue has Completed!!!

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[profit]

在最后一步,统计其他费用数据程序运行后,从日志中看到3个条件节点都已满足要求。然后,通过同步的分步式队列自动启动了计算利润的程序,并在日志中打印了2013年1月的利润为-6693765。

本文介绍的源代码,已上传到github: https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myzk/hadoop

通过这个复杂的实验,我们成功地用zookeeper实现了分步式队列,并应用到了业务中。当然,实验中也有一些不是特别的严谨的地方,请同学边做边思考。

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

打赏作者

在Ubuntu中安装HBase

R利剑NoSQL系列文章,主要介绍通过R语言连接使用nosql数据库。涉及的NoSQL产品,包括RedisMongoDBHBaseHiveCassandraNeo4j。希望通过我的介绍让广大的R语言爱好者,有更多的开发选择,做出更多地激动人心的应用。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/linux-hbase-install/

linux-hbase

前言

HBase是Hadoop家族中的一个分布式数据库产品,HBase支持高并发读写,列式数据存储,高效的索引,自动分片,自动Region迁移等许多优点,已经越来越多的被界业认可并实施。

目录

  1. 在Ubuntu中环境准备
  2. HBase安装
  3. Thrift安装

1 在Ubuntu中环境准备

HBase是基于Java开发的运行Hadoop平台上分布式NoSQL数据库软件,HBase没有提供Windows系统安装版本。我在这里也只介绍HBase在Linux Ubuntu系统中的安装。

由于HBase是运行在Hadoop平台上面的,因此我们需要先安装好Hadoop的环境,Hadoop的安装请参考文章:[Hadoop历史版本安装](http://blog.fens.me/hadoop-history-source-install/)

HBase没有提供apt的软件源安装,我们需要自己去官方网络下载HBase软件包进行安装。HBase下载页:http://www.apache.org/dyn/closer.cgi/hbase/

系统环境:

  • Linux Ubuntu 12.04.2 LTS 64bit server
  • Java JDK 1.6.0_45
  • Hadoop 1.1.2

2 HBase安装

2.1 下载HBase


# 通过wget命令下载
~ wget http://www.gaidso.com/apache/hbase/stable/hbase-0.94.18.tar.gz

# 解压HBase
~ tar xvf hbase-0.94.18.tar.gz

# 移动HBase目录到文件夹
~ mv hbase-0.94.18/ /home/conan/hadoop/

# 进入目录
~ cd /home/conan/hadoop/hbase-0.94.18

2.2 配置HBase

2.2.1 修改启动文件hbase-env.sh


~ vi conf/hbase-env.sh

#打开注释
export JAVA_HOME=/home/conan/toolkit/jdk16
export HBASE_CLASSPATH=/home/conan/hadoop/hadoop-1.1.2/conf
export HBASE_MANAGES_ZK=true

2.2.2 修改配置文件 hbase-site.xml


~ vi conf/hbase-site.xml

<configuration>
<property>
<name>hbase.rootdir</name>
<value>hdfs://master:9000/hbase</value>
</property>

<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>

<property>
<name>dfs.replication</name>
<value>1</value>
</property>

<property>
<name>hbase.zookeeper.quorum</name>
<value>master</value>
</property>

<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>

<property>
<name>hbase.zookeeper.property.dataDir</name>
<value>/home/conan/hadoop/hdata</value>
</property>
</configuration>

复制hadoop环境的配置文件和类库


~ cp ~/hadoop/hadoop-1.1.2/conf/hdfs-site.xml conf/
~ cp ~/hadoop/hadoop-1.1.2/hadoop-core-1.1.2.jar lib/
~ mkdir /home/conan/hadoop/hdata

2.3 启动hadoop和hbase


~ /home/conan/hadoop/hadoop-1.1.2/bin/start-all.sh
~ /home/conan/hadoop/hbase-0.94.18/bin/start-hbase.sh

# 查看hbase进程
~ jps
13838 TaskTracker
13541 JobTracker
15946 HMaster
16756 Jps
12851 NameNode
13450 SecondaryNameNode
13133 DataNode
15817 HQuorumPeer
16283 HRegionServer

2.4 打开HBase命令行客户端访问Hbase


~ bin/hbase shell
HBase Shell; enter 'help' for list of supported commands.
Type "exit" to leave the HBase Shell
Version 0.94.18, r1577788, Sat Mar 15 04:46:47 UTC 2014

hbase(main):002:0> help
HBase Shell, version 0.94.18, r1577788, Sat Mar 15 04:46:47 UTC 2014
Type 'help "COMMAND"', (e.g. 'help "get"' -- the quotes are necessary) for help on a specific command.
Commands are grouped. Type 'help "COMMAND_GROUP"', (e.g. 'help "general"') for help on a command group.

COMMAND GROUPS:
  Group name: general
  Commands: status, version, whoami

  Group name: ddl
  Commands: alter, alter_async, alter_status, create, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, is_disabled, is_enabled, list, show_filters

  Group name: dml
  Commands: count, delete, deleteall, get, get_counter, incr, put, scan, truncate

  Group name: tools
  Commands: assign, balance_switch, balancer, close_region, compact, flush, hlog_roll, major_compact, move, split, unassign, zk_dump

  Group name: replication
  Commands: add_peer, disable_peer, enable_peer, list_peers, list_replicated_tables, remove_peer, start_replication, stop_replication

  Group name: snapshot
  Commands: clone_snapshot, delete_snapshot, list_snapshots, restore_snapshot, snapshot

  Group name: security
  Commands: grant, revoke, user_permission

SHELL USAGE:
Quote all names in HBase Shell such as table and column names.  Commas delimit
command parameters.  Type  after entering a command to run it.
Dictionaries of configuration used in the creation and alteration of tables are
Ruby Hashes. They look like this:

  {'key1' => 'value1', 'key2' => 'value2', ...}

and are opened and closed with curley-braces.  Key/values are delimited by the
'=>' character combination.  Usually keys are predefined constants such as
NAME, VERSIONS, COMPRESSION, etc.  Constants do not need to be quoted.  Type
'Object.constants' to see a (messy) list of all constants in the environment.

If you are using binary keys or values and need to enter them in the shell, use
double-quote'd hexadecimal representation. For example:

  hbase> get 't1', "key\x03\x3f\xcd"
  hbase> get 't1', "key\003\023\011"
  hbase> put 't1', "test\xef\xff", 'f1:', "\x01\x33\x40"

The HBase shell is the (J)Ruby IRB with the above HBase-specific commands added.
For more on the HBase Shell, see http://hbase.apache.org/docs/current/book.html

2.5 HBase简单命令操作


#创建一个新表student
hbase(main):003:0> create 'student','info'
0 row(s) in 1.2680 seconds

#查看所有的表
hbase(main):004:0> list
TABLE
student
1 row(s) in 0.0330 seconds

#查看student的表结构
hbase(main):005:0> describe 'student'
DESCRIPTION                                                 ENABLED
 'student', {NAME => 'info', DATA_BLOCK_ENCODING => 'NONE', true
  BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', VERSIONS
  => '3', COMPRESSION => 'NONE', MIN_VERSIONS => '0', TTL =
 > '2147483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE =
 > '65536', IN_MEMORY => 'false', ENCODE_ON_DISK => 'true',
  BLOCKCACHE => 'true'}
1 row(s) in 0.1100 seconds

#同student表中插入一条数据
hbase(main):007:0> put 'student','mary','info:age','19'
0 row(s) in 0.0490 seconds

#从student表中取出mary的数据
hbase(main):008:0> get 'student','mary'
COLUMN                   CELL
 info:age                timestamp=1396366643298, value=19
1 row(s) in 0.0190 seconds

#让student表失效
hbase(main):009:0> disable 'student'
0 row(s) in 1.2400 seconds

#列出所有表
hbase(main):010:0> list
TABLE
student
1 row(s) in 0.0310 seconds

#删除student表
hbase(main):013:0>  drop 'student'
0 row(s) in 1.1100 seconds

#列出所有表
hbase(main):014:0> list
TABLE
0 row(s) in 0.0400 seconds

3 Thrift安装

安装完成HBase后,我们还需要安装Thrift,因为其他语言调用HBase的时候,是通过Thrift连接的。

Thrift是需要本地编译的,官方没有提供二进制安装包,首先下载thrift-0.9.1,Thrift下载页:http://thrift.apache.org/download

3.1 下载thrift

下载Thrift有两种方式,直接下载源代码发行包,或者通过git下载源代码,请选择其中一种方式下载。

3.1.1 直接下载源代码发行包 thrift-0.9.1.tar.gz


~ wget http://apache.fayea.com/apache-mirror/thrift/0.9.1/thrift-0.9.1.tar.gz
~ tar xvf thrift-0.9.1.tar.gz
~ mv thrift-0.9.1/ /home/conan/hadoop/
~ cd /home/conan/hadoop/

注:后文中的各种错误,都是这个包引起的

3.1.2 通过git下载源代码


~ git clone https://git-wip-us.apache.org/repos/asf/thrift.git thrift-git
~ mv thrift-git/ /home/conan/hadoop/
~ cd /home/conan/hadoop/

为了避免各种出错,建议使用git下载源代码安装

3.2 通过thrift-0.9.1.tar.gz 发行包安装Thrift

Thrift是需要本地编译的,在Thrift解压目录输入./configure,会列Thrift在当前机器所支持的语言环境。

3.2.1 安装Thrift的依赖包


sudo apt-get install libboost-dev libboost-test-dev libboost-program-options-dev libevent-dev automake libtool flex bison pkg-config g++ libssl-dev

如果只是为了连接rhbase,默认配置就可以了。如果除了希望支持rhbase访问,还支持PHP,Python,C++等语言的访问,就需要在系统中,装一些额外的类库。大家可以根据自己的要求,安装对应的软件包并设置Thrift的编译参数。

生成配置脚本


~  ./configure

//省略部分日志输出

thrift 0.9.1

Building C++ Library ......... : yes
Building C (GLib) Library .... : no
Building Java Library ........ : yes
Building C# Library .......... : no
Building Python Library ...... : yes
Building Ruby Library ........ : no
Building Haskell Library ..... : no
Building Perl Library ........ : no
Building PHP Library ......... : no
Building Erlang Library ...... : no
Building Go Library .......... : no
Building D Library ........... : no

C++ Library:
   Build TZlibTransport ...... : yes
   Build TNonblockingServer .. : yes
   Build TQTcpServer (Qt) .... : no

Java Library:
   Using javac ............... : javac
   Using java ................ : java
   Using ant ................. : /home/conan/toolkit/ant184/bin/ant

Python Library:
   Using Python .............. : /usr/bin/python

If something is missing that you think should be present,
please skim the output of configure to find the missing
component.  Details are present in config.log.

我本机的已支持C++, Java与Thrift的通信。

3.2.2 增加Python语言的通信

虽然Python已被显示支持与Thrift但在后面编译过程中,还是缺少一些Python的库,我们需要再增加Python的依赖库

安装Python的依赖包


sudo apt-get install python-all python-all-dev python-all-dbg

3.2.3 增加PHP语言的通信

安装PHP的依赖包


sudo apt-get install php5-dev php5-cli phpunit

生成配置脚本


~  ./configure --enable-thrift_protocol

//省略部分日志输出

thrift 0.9.1

Building C++ Library ......... : yes
Building C (GLib) Library .... : no
Building Java Library ........ : yes
Building C# Library .......... : no
Building Python Library ...... : yes
Building Ruby Library ........ : no
Building Haskell Library ..... : no
Building Perl Library ........ : no
Building PHP Library ......... : yes
Building Erlang Library ...... : no
Building Go Library .......... : no
Building D Library ........... : no

C++ Library:
   Build TZlibTransport ...... : yes
   Build TNonblockingServer .. : yes
   Build TQTcpServer (Qt) .... : no

Java Library:
   Using javac ............... : javac
   Using java ................ : java
   Using ant ................. : /home/conan/toolkit/ant184/bin/ant

Python Library:
   Using Python .............. : /usr/bin/python

PHP Library:
   Using php-config .......... : /usr/bin/php-config

If something is missing that you think should be present,
please skim the output of configure to find the missing
component.  Details are present in config.log.

我们看到Thrift的配置中,增加了对PHP语言的支持。

3.2.4 编译和安装


# 编译Thrift
~ make

//省略部分日志

make[5]: 正在进入目录 `/home/conan/hadoop/thrift-0.9.1/lib/php/src/ext/thrift_protocol'
make[5]: *** 没有指明目标并且找不到 makefile。 停止。
make[5]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/lib/php/src/ext/thrift_protocol'
make[4]: *** [src/ext/thrift_protocol/modules/thrift_protocol.so] 错误 2
make[4]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/lib/php'
make[3]: *** [all-recursive] 错误 1
make[3]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/lib/php'
make[2]: *** [all-recursive] 错误 1
make[2]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/lib'
make[1]: *** [all-recursive] 错误 1
make[1]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1'
make: *** [all] 错误 2

在make生成过程,出现PHP的编译错误。从Thrift的错误列表中,我们可以找到错误描述( https://issues.apache.org/jira/browse/THRIFT-2265 ),这是由于Thrift-0.9.1发行包,打包时缺少了PHP扩展文件造成的错误,并在Thrift-0.9.2版本中修复。

所以,我们如果还想继续使用Thrift-0.9.1版本,则不能支持PHP语言。


# 生成配置信息,不包括PHP模块
~ ./configure --without-php_extension

# 编译Thrift
~ make

编译过程中,又出现了C++编译错误。


Makefile:832: 警告:覆盖关于目标“gen-cpp/ThriftTest.cpp”的命令
Makefile:829: 警告:忽略关于目标“gen-cpp/ThriftTest.cpp”的旧命令
/bin/bash ../../libtool --tag=CXX   --mode=link g++ -Wall -g -O2 -L/usr/lib   -o libtestgencpp.la  ThriftTest_constants.lo ThriftTest_types.lo ../../lib/cpp/libthrift.la -lssl -lcrypto -lrt -lpthread
libtool: link: ar cru .libs/libtestgencpp.a .libs/ThriftTest_constants.o .libs/ThriftTest_types.o
ar: .libs/ThriftTest_constants.o: No such file or directory
make[3]: *** [libtestgencpp.la] 错误 1
make[3]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/test/cpp'
make[2]: *** [all-recursive] 错误 1
make[2]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1/test'
make[1]: *** [all-recursive] 错误 1
make[1]:正在离开目录 `/home/conan/hadoop/thrift-0.9.1'
make: *** [all] 错误 2

对于上面的2个编译错误,我决定换成git源代码的版本重新操作。

3.2 通过git源代码安装Thrift

运行安装命令


# 进行thrift-git目录
~ cd /home/conan/hadoop/thrift-git

# 复制0.9.1标签到新分支thrift-0.9.1
~ git checkout -b thrift-0.9.1 0.9.1

# 产生配置脚本
~ ./bootstrap.sh

# 生成配置信息
~ ./configure

# 编译Thrift
~ make

# 安装Thrift
~ sudo make install

走了许多弯路,终于使用git源代码版本安装好了Thrift。

查看thrift版本


~ thrift -version
Thrift version 0.9.1

接下来,我们启动HBase的Thrift Server服务


# 启动HBase的Thrift服务
~ /home/conan/hadoop/hbase-0.94.18/bin/hbase-daemon.sh start thrift
starting thrift, logging to /home/conan/hadoop/hbase-0.94.18/bin/../logs/hbase-conan-thrift-master.out

# 检查系统进程
~ jps
13838 TaskTracker
13541 JobTracker
15946 HMaster
32120 Jps
12851 NameNode
13450 SecondaryNameNode
13133 DataNode
32001 ThriftServer
15817 HQuorumPeer
16283 HRegionServer

我们看到ThriftServer已被启动,后面我们就可以使用多种语言,通过Thrift来访问HBase了,这样就完成了HBase的安装。

转载请注明出处:
http://blog.fens.me/linux-hbase-install/

打赏作者

读书笔记 Big Data Analytics with R and Hadoop

RHadoop实践系列文章,包含了R语言与Hadoop结合进行海量数据分析。Hadoop主要用来存储海量数据,R语言完成MapReduce 算法,用来替代Java的MapReduce实现。有了RHadoop可以让广大的R语言爱好者,有更强大的工具处理大数据1G, 10G, 100G, TB, PB。 由于大数据所带来的单机性能问题,可能会一去不复返了。

RHadoop实践是一套系列文章,主要包括”Hadoop环境搭建”,”RHadoop安装与使用”,R实现MapReduce的协同过滤算法”,”HBase和rhbase的安装与使用”。对于单独的R语言爱好者,Java爱好者,或者Hadoop爱好者来说,同时具备三种语言知识并不容 易。此文虽为入门文章,但R,Java,Hadoop基础知识还是需要大家提前掌握。

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/r-hadoop-book-big-data/

r-hadoop-book-big-data

前言

最近的一本新书Big Data Analytics with R and Hadoop是关于R和Hadoop实践的第一本图书,作者Vignesh Prajapati曾经在图书出版的半年前联系过我,通过Google翻译发现了我的博客,希望把其中的1-2个例子放到他的书中。

没想到这本书,经过半年就出版了,作者效率还是挺高的。受Packt Publishing编辑Amol Bhosle委托为本书写个书评,于是就有本篇文章。

目录

  1. 图书概览
  2. 图书内容剖析
  3. 最后总结

1. 图书概览

本书的几个核心点:R,Hadoop, R+Hadoop, 数据分析案例,机器学习算法案例,R的数据访问接口。

我通过一个思维导图来表达。

Big Data Analytics with R and Hadoop - fens.me

书中最重要的是案例部分,作者分别使用R语言单机实现,以及RHadoop的分步式实现,介绍是多个案例的实践。

2. 图书内容剖析

  • R语言介绍
  • Hadoop介绍
  • R+Hadoop技术方案
  • 数据分析案例
  • 大数据分析案例
  • R语言的数据访问接口

1). R语言介绍

简单地介绍了R安装,RStudio安装,R语言最擅长算法模型:回归,分类,聚类,推荐。

2). Hadoop介绍

主要是介绍了Hadoop安装,在几种不同的Linux系统上,用Apache Hadoop和Cloudera Hadoop二个版本对比安装。

简单介绍了HDFS,dateeode和namenode;讲了MapReduce的工作原理,和用MapReduce对数据处理的过程。

介绍了Hadoop的命令行的使用。

3). R+Hadoop技术方案

作者对于R+Hadoop做了3种技术方案讨论,分别是RHipe, RHadoop, R + Hadoop Streaming。

a. RHipe

RHipe是R与Hadoop的集成编程环境。RHIPE可以让R语言与Hadoop进行通信,访问Hadoop的HDFS和调用MapReduce,让R语言的使用者利用Hadoop的分步式环境进行行大数据的分析。

RHipe官方网站:http://www.datadr.org/

b. RHadoop

RHadoop是由RevolutionAnalytics公司开发的一个R与Hadoop的集成编程环境,与RHipe的功能一样。RHadoop包含三个R包 (rmr,rhdfs,rhbase),分别是对应Hadoop系统架构中的,MapReduce, HDFS, HBase 三个部分。在2013年底,又增加第4个R包plyrmr,用于数据处理操作。本书中并没有涉及plyrmr包。

RHadoop的发布页:https://github.com/RevolutionAnalytics/RHadoop/wiki

RHadoop实践系列文章:http://blog.fens.me/series-rhadoop/

c. R + Hadoop Streaming

Hadoop Streaming是Hadoop提供的,允许任何可执行的脚本作为Mapper和Reducer的一种实现方实。R + Hadoop Streaming就是用R脚本实现Mapper和Reducer。作者用2种方式,进行了测试。

c1. R Script + Hadoop Streaming : 单纯的R语言脚本,通过Shell运行。
c2. HadoopStreaming + Hadoop Streaming: 使用一个封装好的R包HadoopStreaming来实现。

这3种技术方案,是目前R与Hadoop结合实现方案。我对RHadoop和R Script + Hadoop Streaming比较熟悉,经过我测试只有R Script + Hadoop Streaming这种方式,可以用于生产环境,RHadoop性能还是有一些问题的,可以提升的空间还是很大的。书中介绍的另外两种方法,我要有时间,再去试试。不过,我相信R和Hadoop的结合的项目,会越来越多的。

4). 数据分析案例

上面篇幅把技术的基础都说清楚,接下来就是核心案例了,书中介绍了如何使用R结合Hadoop进行数据分析。

首先书中介绍了,一个数据分析项目的框架,包括5个部分。

  • 问题
  • 定义数据需求
  • 数据预处理
  • 数据建模及执行
  • 数据可视化

然后,作者介绍了3个应用案例。

  • 网页分类:把一个网站中的网页按重要性进行排名
  • 股票分析:通过历史交易数据计算股票市场的变化
  • 价格预测:预测图书销售的价格,来自Kaggle的一道竞赛题

上面3个案例,都使用R和RHadoop进行实现,都是非常不错的案例,可以给我们的学习和使用提供很好的思路。

5). 机器学习算法案例

接下来,作者介绍了用RHadoop实现的基于大数据机器学习算法,充分结合了R和Hadoop的优势。作者把机器学习算法分为3类,监督学习算法,非监督学习算法,推荐算法。

4个算法的案例:

a. 线性回归

最简单的一种回归算法,可以下面公式表示。

y = ax + e 

在R语言中,一个函数lm()就可以实现。

在大数据的背景下,利用RHadoop,需要自己实现lm()这个函数。


# Reducer
Sum = function(., YY) keyval(1, list(Reduce('+', YY)))

# XtX =
values(

# For loading hdfs data in to R
from.dfs(

# MapReduce Job to produce XT*X
mapreduce(
input = X.index,

# Mapper – To calculate and emitting XT*X
map =
function(., Xi) {
yi = y[Xi[,1],]
Xi = Xi[,-1]
keyval(1, list(t(Xi) %*% Xi))},

# Reducer – To reduce the Mapper output by performing sum
operation over them
reduce = Sum,
combine = TRUE)))[[1]]

b. Logistic回归

比线性回归稍微复杂一些,可以用公式表示。


logit(p) = β0 + β1 × x1 + β2 × x2 + ... + βn × xn

R程序还是一个函数glm()。

在大数据的背景下,利用RHadoop,需要自己实现glm()这个函数。


# Mapper – computes the contribution of a subset of points to the
gradient.

lr.map =
  function(., M) {
    Y = M[,1]
    X = M[,-1]
    keyval(1, Y * X * g(-Y * as.numeric(X %*% t(plane))))
}

# Reducer – Perform sum operation over Mapper output.

lr.reduce = function(k, Z) keyval(k, t(as.matrix(apply(Z,2,sum))))

# MapReduce job – Defining MapReduce function for executing logistic
regression

logistic.regression =
  function(input, iterations, dims, alpha){
    plane = t(rep(0, dims))
    g = function(z) 1/(1 + exp(-z))
    for (i in 1:iterations) {
      gradient =
        values(
          from.dfs(
            mapreduce(input, map = lr.map, reduce = lr.reduce, combine = T)))
      plane = plane + alpha * gradient 
    }
    plane 
}

c. 聚类算法

这里介绍的是快速聚类kmeans,聚类属于非监督学习法。

通过R语言现实,一个函数kmeans()就可以完成了。

通过RHadoop现实,就需要自己重写这个迭代过程。


# distance calculation function
dist.fun = function(C, P) {
  apply(C,1,function(x) colSums((t(P) - x)^2))
}

# k-Means Mapper
kmeans.map = function(., P) {
  nearest = {

  # First interations- Assign random cluster centers
  if(is.null(C))
    sample(1:num.clusters,nrow(P),replace = T)

# Rest of the iterations, where the clusters are assigned # based
on the minimum distance from points

  else {
    D = dist.fun(C, P)
    nearest = max.col(-D)}}
    if(!(combine || in.memory.combine))
      keyval(nearest, P)
    else
      keyval(nearest, cbind(1, P))
}


# k-Means Reducer
kmeans.reduce = {

  # calculating the column average for both of the conditions
  if (!(combine || in.memory.combine) )
    function(., P) t(as.matrix(apply(P, 2, mean)))
  else function(k, P) keyval(k,t(as.matrix(apply(P, 2, sum))))
}


# k-Means MapReduce – for
kmeans.mr = function(P,num.clusters,num.iter,combine,in.memory.combine) {
  C = NULL
  for(i in 1:num.iter ) {
    C =  values(from.dfs(mapreduce(P,map = kmeans.map,reduce = kmeans.reduce)))
    if(combine || in.memory.combine)
    C = C[, -1]/C[, 1]
    if(nrow(C) < num.clusters) {
      C =rbind(C,matrix(rnorm((num.clusters -nrow(C)) * nrow(C)),ncol = row(C)) %*% C) 
    }
  }
  C
}

d. 推荐算法

这里介绍的是协同过滤算法,算法实现就有点复杂了。主要思想就是:

同现矩阵 * 评分矩阵 = 推荐结果

注:由于这个案例出自我的博客,我就直接贴我的文章地址了,也方便中文读者了解算法细节。

R语言的算法实现:http://blog.fens.me/r-mahout-usercf/

RHadoop分步式算法实现:http://blog.fens.me/rhadoop-mapreduce-rmr/

6). R语言的数据访问接口

最后一部分,书中介绍了R语言的各种数据访问接口。

FILE:

  • CSV: read.csv(), write.csv()
  • Excel: xlsx, xlsxjars,rJava

Database:

  • MySQL: RMySQL
  • SQLite: RSQLite
  • PostgreSQL: RPostgreSQL

NoSQL:

  • MongoDB: rmongodb
  • Hive: RHive
  • HBase: RHBase

补充一下,我的博客中也写了R的数据访问接口的文章。

R的多种接口已经打通,这也就说明R已经做好了准备,为工业界带来革命的力量。

3. 最后总结

这是一本不错的图书,从R+Hadoop的角度出发,打开R语言面向大数据应用的思路。这种结合是跨学科碰撞的结果,是市场需求的导向。但由于R+Hadoop还不够成熟,企业级应用依然缺乏成功案例,所以当实施R+Hadoop的应用时,还会碰到非常多的问题。期待有担当的公司和个人,做出被大家认可的产品来。

转载请注明出处:
http://blog.fens.me/r-hadoop-book-big-data/

打赏作者

Mahout构建图书推荐系统

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-mahout-recommend-book/

mahout-recommendation-book

前言

本文是Mahout实现推荐系统的又一案例,用Mahout构建图书推荐系统。与之前的两篇文章,思路上面类似,侧重点在于图书的属性如何利用。本文的数据在自于Amazon网站,由爬虫抓取获得。

目录

  1. 项目背景
  2. 需求分析
  3. 数据说明
  4. 算法模型
  5. 程序开发

1. 项目背景

Amazon是最早的电子商务网站之一,以网上图书起家,最后发展成为音像,电子消费品,游戏,生活用品等的综合性电子商务平台。Amazon的推荐系统,是互联网上最早的商品推荐系统,它为Amazon带来了至少30%的流量,和可观的销售利润。

如今推荐系统已经成为电子商务网站的标配,如果还没有推荐系统都不好意思,说自己是做电商的。

2. 需求分析

推荐系统如此重要,我们应该如果理解?

打开Amazon的Mahout In Action图书页面:
http://www.amazon.com/Mahout-Action-Sean-Owen/dp/1935182684/ref=pd_sim_b_1?ie=UTF8&refRID=0H4H2NSSR8F34R76E2TP

网页上的元素:

  • 广告位:广告商投放广告的位置,网站可以靠网络广告赚钱,一般是网页最好的位置。
  • 平均分:用户对图书的打分
  • 关联规则:通过关联规则,推荐位
  • 协同过滤:通过基于物品的协同过滤算法的,推荐位
  • 图书属性:包括页数,出版社,ISBN,语言等
  • 作者介绍:有关作者的介绍,和作者的其他著作
  • 用户评分:用户评分行为
  • 用户评论:用户评论的内容

amazon-book

在网页上,其他的推荐位:

amazon-book-2

结合上面2张截图,我们不难发现,推荐对于Amazon的重要性。除了最明显的广告位给了能直接带来利润的广告商,网页中有4处推荐位,分别从不同的维度,用不同的推荐算法,猜用户喜欢的商品。

3. 数据说明

2个数据文件:

  • rating.csv :用户评分行为数据
  • users.csv :用户属性数据

1). book-ratings.csv

  • 3列数据:用户ID,图书ID, 用户对图书的评分
  • 记录数: 4000次的图书评分
  • 用户数: 200个
  • 图书数: 1000个
  • 评分:1-10

数据示例


1,565,3
1,807,2
1,201,1
1,557,9
1,987,10
1,59,5
1,305,6
1,153,3
1,139,7
1,875,5
1,722,10
2,977,4
2,806,3
2,654,8
2,21,8
2,662,5
2,437,6
2,576,3
2,141,8
2,311,4
2,101,3
2,540,9
2,87,3
2,65,8
2,501,6
2,710,5
2,331,9
2,542,4
2,757,9
2,590,7

2). users.csv

  • 3列数据:用户ID,用户性别,用户年龄
  • 用户数: 200个
  • 用户性别: M为男性,F为女性
  • 用户年龄: 11-80岁之间

数据示例


1,M,40
2,M,27
3,M,41
4,F,43
5,F,16
6,M,36
7,F,36
8,F,46
9,M,50
10,M,21
11,F,11
12,M,42
13,F,40
14,F,28
15,M,25
16,M,68
17,M,53
18,F,69
19,F,48
20,F,56
21,F,36

4. 算法模型

本文主要介绍Mahout的基于物品的协同过滤模型,其他的算法模型将不再这里解释。

针对上面的数据,我将用7种算法组合进行测试:有关Mahout算法组合的详细解释,请参考文章:从源代码剖析Mahout推荐引擎

7种算法组合

  • userCF1: EuclideanSimilarity+ NearestNUserNeighborhood+ GenericUserBasedRecommender
  • userCF2: LogLikelihoodSimilarity+ NearestNUserNeighborhood+ GenericUserBasedRecommender
  • userCF3: EuclideanSimilarity+ NearestNUserNeighborhood+ GenericBooleanPrefUserBasedRecommender
  • itemCF1: EuclideanSimilarity + GenericItemBasedRecommender
  • itemCF2: LogLikelihoodSimilarity + GenericItemBasedRecommender
  • itemCF3: EuclideanSimilarity + GenericBooleanPrefItemBasedRecommender
  • slopeOne:SlopeOneRecommender

对上面的算法进行算法评估,有关于算法评估的详细解释,请参考文章:Mahout推荐算法API详解

  • 查准率:
  • 召回率(查全率):

5. 程序开发

系统架构:Mahout中推荐过滤算法支持单机算法和分步式算法两种。

  • 单机算法: 在单机内存计算,支持多种算法推荐算法,部署运行简单,修正处理数据量有限
  • 分步式算法: 基于Hadoop集群运行,支持有限的几种推荐算法,部署运行复杂,支持海量数据

mahout-recommend-job-architect

开发环境

  • Win7 64bit
  • Java 1.6.0_45
  • Maven3
  • Eclipse Juno Service Release 2
  • Mahout-0.8
  • Hadoop-1.1.2

开发环境mahout版本为0.8。 请参考文章:用Maven构建Mahout项目

新建Java类:

  • BookEvaluator.java, 选出“评估推荐器”验证得分较高的算法
  • BookResult.java, 对指定数量的结果人工比较
  • BookFilterGenderResult.java,只保留男性用户的图书列表

1). BookEvaluator.java, 选出“评估推荐器”验证得分较高的算法

源代码


package org.conan.mymahout.recommendation.book;

import java.io.IOException;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.neighborhood.UserNeighborhood;
import org.apache.mahout.cf.taste.similarity.ItemSimilarity;
import org.apache.mahout.cf.taste.similarity.UserSimilarity;

public class BookEvaluator {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/book/rating.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        userEuclidean(dataModel);
        userLoglikelihood(dataModel);
        userEuclideanNoPref(dataModel);
        itemEuclidean(dataModel);
        itemLoglikelihood(dataModel);
        itemEuclideanNoPref(dataModel);
        slopeOne(dataModel);
    }

    public static RecommenderBuilder userEuclidean(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userEuclidean");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
    
    public static RecommenderBuilder userLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userLoglikelihood");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
    
    public static RecommenderBuilder userEuclideanNoPref(DataModel dataModel) throws TasteException, IOException {
        System.out.println("userEuclideanNoPref");
        UserSimilarity userSimilarity = RecommendFactory.userSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        UserNeighborhood userNeighborhood = RecommendFactory.userNeighborhood(RecommendFactory.NEIGHBORHOOD.NEAREST, userSimilarity, dataModel, NEIGHBORHOOD_NUM);
        RecommenderBuilder recommenderBuilder = RecommendFactory.userRecommender(userSimilarity, userNeighborhood, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemEuclidean(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemEuclidean");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder itemLoglikelihood(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemLoglikelihood");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.LOGLIKELIHOOD, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, true);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
    
    public static RecommenderBuilder itemEuclideanNoPref(DataModel dataModel) throws TasteException, IOException {
        System.out.println("itemEuclideanNoPref");
        ItemSimilarity itemSimilarity = RecommendFactory.itemSimilarity(RecommendFactory.SIMILARITY.EUCLIDEAN, dataModel);
        RecommenderBuilder recommenderBuilder = RecommendFactory.itemRecommender(itemSimilarity, false);

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }

    public static RecommenderBuilder slopeOne(DataModel dataModel) throws TasteException, IOException {
        System.out.println("slopeOne");
        RecommenderBuilder recommenderBuilder = RecommendFactory.slopeOneRecommender();

        RecommendFactory.evaluate(RecommendFactory.EVALUATOR.AVERAGE_ABSOLUTE_DIFFERENCE, recommenderBuilder, null, dataModel, 0.7);
        RecommendFactory.statsEvaluator(recommenderBuilder, null, dataModel, 2);
        return recommenderBuilder;
    }
}

控制台输出:


userEuclidean
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:0.33333325386047363
Recommender IR Evaluator: [Precision:0.3010752688172043,Recall:0.08542713567839195]
userLoglikelihood
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.5245869159698486
Recommender IR Evaluator: [Precision:0.11764705882352945,Recall:0.017587939698492466]
userEuclideanNoPref
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:4.288461538461536
Recommender IR Evaluator: [Precision:0.09045226130653267,Recall:0.09296482412060306]
itemEuclidean
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:1.408880928305655
Recommender IR Evaluator: [Precision:0.0,Recall:0.0]
itemLoglikelihood
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.448554412835434
Recommender IR Evaluator: [Precision:0.0,Recall:0.0]
itemEuclideanNoPref
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.5665197873957957
Recommender IR Evaluator: [Precision:0.6005025125628134,Recall:0.6055276381909548]
slopeOne
AVERAGE_ABSOLUTE_DIFFERENCE Evaluater Score:2.6893078179405814
Recommender IR Evaluator: [Precision:0.0,Recall:0.0]

可视化“评估推荐器”输出:

推荐的结果的平均距离

difference

推荐器的评分

evaluator

只有itemEuclideanNoPref算法评估的结果是非常好的,其他算法的结果都不太好。

2). BookResult.java, 对指定数量的结果人工比较

为得到差异化结果,我们分别取4个算法:userEuclidean,itemEuclidean,userEuclideanNoPref,itemEuclideanNoPref,对推荐结果人工比较。

源代码


package org.conan.mymahout.recommendation.book;

import java.io.IOException;
import java.util.List;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;

public class BookResult {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/book/rating.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        RecommenderBuilder rb1 = BookEvaluator.userEuclidean(dataModel);
        RecommenderBuilder rb2 = BookEvaluator.itemEuclidean(dataModel);
        RecommenderBuilder rb3 = BookEvaluator.userEuclideanNoPref(dataModel);
        RecommenderBuilder rb4 = BookEvaluator.itemEuclideanNoPref(dataModel);
        
        LongPrimitiveIterator iter = dataModel.getUserIDs();
        while (iter.hasNext()) {
            long uid = iter.nextLong();
            System.out.print("userEuclidean       =>");
            result(uid, rb1, dataModel);
            System.out.print("itemEuclidean       =>");
            result(uid, rb2, dataModel);
            System.out.print("userEuclideanNoPref =>");
            result(uid, rb3, dataModel);
            System.out.print("itemEuclideanNoPref =>");
            result(uid, rb4, dataModel);
        }
    }

    public static void result(long uid, RecommenderBuilder recommenderBuilder, DataModel dataModel) throws TasteException {
        List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM);
        RecommendFactory.showItems(uid, list, false);
    }
}

控制台输出:只截取部分结果


...
userEuclidean       =>uid:63,
itemEuclidean       =>uid:63,(984,9.000000)(690,9.000000)(943,8.875000)
userEuclideanNoPref =>uid:63,(4,1.000000)(723,1.000000)(300,1.000000)
itemEuclideanNoPref =>uid:63,(867,3.791667)(947,3.083333)(28,2.750000)
userEuclidean       =>uid:64,
itemEuclidean       =>uid:64,(368,8.615385)(714,8.200000)(290,8.142858)
userEuclideanNoPref =>uid:64,(860,1.000000)(490,1.000000)(64,1.000000)
itemEuclideanNoPref =>uid:64,(409,3.950000)(715,3.830627)(901,3.444048)
userEuclidean       =>uid:65,(939,7.000000)
itemEuclidean       =>uid:65,(550,9.000000)(334,9.000000)(469,9.000000)
userEuclideanNoPref =>uid:65,(939,2.000000)(185,1.000000)(736,1.000000)
itemEuclideanNoPref =>uid:65,(666,4.166667)(96,3.093931)(345,2.958333)
userEuclidean       =>uid:66,
itemEuclidean       =>uid:66,(971,9.900000)(656,9.600000)(918,9.577709)
userEuclideanNoPref =>uid:66,(6,1.000000)(492,1.000000)(676,1.000000)
itemEuclideanNoPref =>uid:66,(185,3.650000)(533,3.617307)(172,3.500000)
userEuclidean       =>uid:67,
itemEuclidean       =>uid:67,(663,9.700000)(987,9.625000)(486,9.600000)
userEuclideanNoPref =>uid:67,(732,1.000000)(828,1.000000)(113,1.000000)
itemEuclideanNoPref =>uid:67,(724,3.000000)(279,2.950000)(890,2.750000)
...

我们查看uid=65的用户推荐信息:

查看user.csv数据集


> user[65,]
userid gender age
65     65      M  14

用户65,男性,14岁。

以itemEuclideanNoPref的算法的推荐结果,查看bookid=666的图书评分情况


> rating[which(rating$bookid==666),]
userid bookid pref
646      44    666   10
1327     89    666    7
2470    165    666    3
2697    179    666    7

发现有4个用户对666的图书评分,查看这4个用户的属性数据


> user[c(44,89,165,179),]
userid gender age
44      44      F  76
89      89      M  40
165    165      F  59
179    179      F  68

这4个用户,3女1男。

我们假设男性和男性有相同的图书兴趣,女性和女性有相同的图书偏好。因为用户65是男性,所以我们接下来排除女性的评分者,只保留男性评分者的评分记录。

3). BookFilterGenderResult.java,只保留男性用户的图书列表

源代码


package org.conan.mymahout.recommendation.book;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.eval.RecommenderBuilder;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.IDRescorer;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;

public class BookFilterGenderResult {

    final static int NEIGHBORHOOD_NUM = 2;
    final static int RECOMMENDER_NUM = 3;

    public static void main(String[] args) throws TasteException, IOException {
        String file = "datafile/book/rating.csv";
        DataModel dataModel = RecommendFactory.buildDataModel(file);
        RecommenderBuilder rb1 = BookEvaluator.userEuclidean(dataModel);
        RecommenderBuilder rb2 = BookEvaluator.itemEuclidean(dataModel);
        RecommenderBuilder rb3 = BookEvaluator.userEuclideanNoPref(dataModel);
        RecommenderBuilder rb4 = BookEvaluator.itemEuclideanNoPref(dataModel);
        
        long uid = 65;
        System.out.print("userEuclidean       =>");
        filterGender(uid, rb1, dataModel);
        System.out.print("itemEuclidean       =>");
        filterGender(uid, rb2, dataModel);
        System.out.print("userEuclideanNoPref =>");
        filterGender(uid, rb3, dataModel);
        System.out.print("itemEuclideanNoPref =>");
        filterGender(uid, rb4, dataModel);
    }

    /**
     * 对用户性别进行过滤
     */
    public static void filterGender(long uid, RecommenderBuilder recommenderBuilder, DataModel dataModel) throws TasteException, IOException {
        Set userids = getMale("datafile/book/user.csv");

        //计算男性用户打分过的图书
        Set bookids = new HashSet();
        for (long uids : userids) {
            LongPrimitiveIterator iter = dataModel.getItemIDsFromUser(uids).iterator();
            while (iter.hasNext()) {
                long bookid = iter.next();
                bookids.add(bookid);
            }
        }

        IDRescorer rescorer = new FilterRescorer(bookids);
        List list = recommenderBuilder.buildRecommender(dataModel).recommend(uid, RECOMMENDER_NUM, rescorer);
        RecommendFactory.showItems(uid, list, false);
    }

    /**
     * 获得男性用户ID
     */
    public static Set getMale(String file) throws IOException {
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));
        Set userids = new HashSet();
        String s = null;
        while ((s = br.readLine()) != null) {
            String[] cols = s.split(",");
            if (cols[1].equals("M")) {// 判断男性用户
                userids.add(Long.parseLong(cols[0]));
            }
        }
        br.close();
        return userids;
    }

}

/**
 * 对结果重计算
 */
class FilterRescorer implements IDRescorer {
    final private Set userids;

    public FilterRescorer(Set userids) {
        this.userids = userids;
    }

    @Override
    public double rescore(long id, double originalScore) {
        return isFiltered(id) ? Double.NaN : originalScore;
    }

    @Override
    public boolean isFiltered(long id) {
        return userids.contains(id);
    }
}

控制台输出:


userEuclidean       =>uid:65,
itemEuclidean       =>uid:65,(784,8.090909)(276,8.000000)(476,7.666667)
userEuclideanNoPref =>uid:65,
itemEuclideanNoPref =>uid:65,(887,2.250000)(356,2.166667)(430,1.866667)

我们发现,由于只保留男性的评分记录,数据量就变得比较少了,基于用户的协同过滤算法,已经没有输出的结果了。基于物品的协同过滤算法,结果集也有所变化。

对于itemEuclideanNoPref算法,输出排名第一条为ID为887的图书。

我再进一步向下追踪:查询哪些用户对图书887进行了打分。


> rating[which(rating$bookid==887),]
userid bookid pref
1280     85    887    2
1743    119    887    8
2757    184    887    4
2791    186    887    5

有4个用户对图书887评分,再分别查看这个用户的属性


> user[c(85,119,184,186),]
userid gender age
85      85      F  31
119    119      F  49
184    184      M  27
186    186      M  35

其中2男,2女。由于我们的算法,已经排除了女性的评分,我们可以推断图书887的推荐应该来自于2个男性的评分者的推荐。

分别计算用户65,与用户184和用户186的评分的图书交集。


rat65<-rating[which(rating$userid==65),]
rat184<-rating[which(rating$userid==184),]
rat186<-rating[which(rating$userid==186),]

> intersect(rat65$bookid ,rat184$bookid)
integer(0)
> intersect(rat65$bookid ,rat186$bookid)
[1]  65 375

最后发现,用户65与用户186都给图书65和图书375打过分。我们再打分出用户186的评分记录。


> rat186
userid bookid pref
2790    186     65    7
2791    186    887    5
2792    186    529    3
2793    186    375    6
2794    186    566    7
2795    186    169    4
2796    186    907    1
2797    186    821    2
2798    186    720    5
2799    186    642    5
2800    186    137    3
2801    186    744    1
2802    186    896    2
2803    186    156    6
2804    186    392    3
2805    186    386    3
2806    186    901    7
2807    186     69    6
2808    186    845    6
2809    186    998    3

用户186,还给图书887打过分,所以对于给65用户推荐图书887,是合理的。

我们通过一个实际的图书推荐的案例,更进一步地了解了如何用Mahout构建推荐系统。

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-mahout-recommend-book/

打赏作者