• Archive by category "JAVA语言实践"

Blog Archives

基于Zookeeper的分步式队列系统集成案例

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

zookeeper-case

前言

软件系统集成一直是工业界的一个难题,像10年以上的遗留系统集成,公司收购后的多系统集成,全球性的分步式系统集成等。虽然基于SOA的软件架构,从理论上都可以解决这些集成的问题,但是具体实施过程,有些集成项目过于复杂而失败。

随着技术的创新和发展,对于分步式集群应用的集成,有了更好的开源软件的支持,像zookeeper就是一个不错的分步式协作软件平台。本文将通过一个案例介绍Zookeeper的强大。

目录

  1. 项目背景:分布式消息中间件
  2. 需求分析:业务系统升级方案
  3. 架构设计:搭建Zookeeper的分步式协作平台
  4. 程序开发:基于Zookeeper的程序设计
  5. 程序运行

1. 项目背景:分布式消息中间件

随着Hadoop的普及,越来越多的公司开始构建自己的Hadoop系统。有时候,公司内部的不同部门或不同的团队,都有自己的Hadoop集群。这种多集群的方式,既能让每个团队拥有个性化的Hadoop,又能避免大集群的高度其中化运维难度。当数据量不是特别巨大的时候,小型集群会有很多适用的场合。

当然,多个小型集群也有缺点,就是资源配置可能造成浪费。每个团队的Hadoop集群,都要配有服务器和运维人员。有些能力强的团队,构建的hadoop集群,可以达到真正的个性化要求;而有一些能力比较差的团队,搭建的Hadoop集群性能会比较糟糕。

还有一些时候,多个团队需要共同完成一个任务,比如,A团队通过Hadoop集群计算的结果,交给B团队继续工作,B完成了自己任务再交给C团队继续做。这就有点像业务系统的工作流一样,一环一环地传下去,直到最后一部分完成。

在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB服务器上部署自己的服务,然后通过消息中间件完成调度任务。对于分步式的多个Hadoop集群系统的协作,同样可以用这种架构来做,只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了。

Zookeeper就可以做为 分步式消息中间件,来完成上面的说的业务需求。ZooKeeper是Hadoop家族的一款高性能的分布式协作的产品,是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。Zookeeper的安装和使用,请参考文章 ZooKeeper伪分布式集群安装及使用

ZooKeeper提供分布式协作服务,并不需要依赖于Hadoop的环境。

2. 需求分析:业务系统升级方案

下面我将从一个案例出发,来解释如何进行分步式协作平台的系统设计。

2.1 案例介绍

某大型软件公司,从事领域为供应链管理,主要业务包括了 采购管理、应付账款管理、应收账款管理、供应商反复管理、退货管理、销售管理、库存管理、电子商务、系统集成等。

biz

每块业务的逻辑都很复杂,由单独部门进行软件开发和维护,部门之间的系统没有直接通信需求,每个部门完成自己的功能就行了,最后通过数据库来共享数据,实现各功能之间的数据交换。

zk1

随着业务的发展,客户对响应速度要求越来越高,通过数据库来共享数据的方式,已经达不到信息交换的要求,系统进行了第一次升级,通过企业服务总线(ESB)统一管理公司内部所有业务。通过WebServices发布服务,通过Message Queue实现业务功能的调度。

zk2

公司业务规模继续扩大,跨国收购了多家公司。业务系统从原来的一个机房的集中式部署,变成了全球性的多机房的分步式部署。这时,Message Queue已经不能满足多机房跨地域的业务系统的功能需求了,需要一种分步式的消息中间件解决方案,来代替原有消息中间件的服务。

系统进行了第二次升级,采用Zookeeper作为分步式中间件调度引擎。

zk3

通过上面的描述,我们可以看出,当一个公司从小到大,从国内业务发展到全球性业务的时候。
为了配合业务发展,IT系统也是越来越复杂的,从最早的主从数据库设计,到ESB企业系统总线的扩展,再到分步式ESB配合分步式消息系统,每一次的升级都需要软件技术的支撑。

2.2 功能需求

全球性采购业务和全球性销售业务,让公司在市场中处于竞争优势。但由于采购和销售分别是由不同部门进行的软件开发和维护,而且业务往来也在不同的国家和地区。所以在每月底结算时,工作量都特别大。

比如,计算利润表 (请不要纠结于公式的准确性)

当月利润 = 当月销售金额 - 当月采购金额 - 当月其他支出

这样一个非常简单的计算公式,但对于跨国公司和部门来说,一点也不简单的。

从系统角度来看,采购部门要统计采购数据(海量数据),销售部门统计销售数据((海量数据),其他部门统计的其他费用支出(汇总的少量数据),最后系统计算得到当月的利润。

这里要说明的是,采购系统是单独的系统,销售是另外单独的系统,及以其他几十个大大小小的系统,如何能让多个系统,配合起来做这道计算题呢??

3. 架构设计:搭建Zookeeper的分步式协作平台

接下来,我们基于zookeeper来构建一个分步式队列的应用,来解决上面的功能需求。下面内容,排除了ESB的部分,只保留zookeeper进行实现。

  • 采购数据,为海量数据,基于Hadoop存储和分析。
  • 销售数据,为海量数据,基于Hadoop存储和分析。
  • 其他费用支出,为少量数据,基于文件或数据库存储和分析。

我们设计一个同步队列,这个队列有3个条件节点,分别对应采购(purchase),销售(sell),其他费用(other)3个部分。当3个节点都被创建后,程序会自动触发计算利润,并创建利润(profit)节点。上面3个节点的创建,无顺序要求。每个节点只能被创建一次。

zk4

系统环境

  • 2个独立的Hadoop集群
  • 2个独立的Java应用
  • 3个Zookeeper集群节点

图标解释:

  • Hadoop App1,Hadoop App2 是2个独立的Hadoop集群应用
  • Java App3,Java App4 是2个独立的Java应用
  • zk1,zk2,zk3是ZooKeeper集群的3个连接点
  • /queue,是znode的队列目录,假设队列长度为3
  • /queue/purchase,是znode队列中,1号排对者,由Hadoop App1提交,用于统计采购金额。
  • /queue/sell,是znode队列中,2号排对者,由Hadoop App2提交,用于统计销售金额。
  • /queue/other,是znode队列中,3号排对者,由Java App3提交,用于统计其他费用支出金额。
  • /queue/profit,当znode队列中满了,触发创建利润节点。
  • 当/qeueu/profit被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束。

补充说明:

  • 创建/queue/purchase,/queue/sell,/queue/other目录时,没有前后顺序,程序提交后,/queue目录下会生成对应该子目录
  • App1可以通过zk2提交,App2也可通过zk3提交。原则上,找最近路由最近的znode节点提交。
  • 每个应用不能重复提出,直到3个任务都提交,计算利润的任务才会被执行。
  • /queue/profit被创建后,zk的应用会监听到这个事件,通知应用,队列已完成!

这里的同步队列的架构更详细的设计思路,请参考文章 ZooKeeper实现分布式队列Queue

4. 程序开发:基于Zookeeper的程序设计

最终的功能需求:计算2013年01月的利润。

4.1 实验环境

在真正企业开发时,我们的实验环境应该与需求是一致的,但我的硬件条件有限,因些做了一个简化的环境设置。

  • 把zookeeper的完全分步式部署的3台服务器集群节点的,改为一台服务器上3个集群节点。
  • 把2个独立Hadoop集群,改为一个集群的2个独立的MapReduce任务。

开发环境:

  • Win7 64bit
  • JDK 1.6
  • Maven3
  • Juno Service Release 2
  • IP:192.168.1.10

Zookeeper服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Zookeeper: 3.4.5
  • IP: 192.168.1.201
  • 3个集群节点

Hadoop服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Hadoop: 1.0.3
  • IP: 192.168.1.210

4.2 实验数据

3组实验数据:

  • 采购数据,purchase.csv
  • 销售数据,sell.csv
  • 其他费用数据,other.csv

4.2.1 采购数据集

一共4列,分别对应 产品ID,产品数量,产品单价,采购日期。


1,26,1168,2013-01-08
2,49,779,2013-02-12
3,80,850,2013-02-05
4,69,1585,2013-01-26
5,88,1052,2013-01-13
6,84,2363,2013-01-19
7,64,1410,2013-01-12
8,53,910,2013-01-11
9,21,1661,2013-01-19
10,53,2426,2013-02-18
11,64,2022,2013-01-07
12,36,2941,2013-01-28
13,99,3819,2013-01-19
14,64,2563,2013-02-16
15,91,752,2013-02-05
16,65,750,2013-02-04
17,19,2426,2013-02-23
18,19,724,2013-02-05
19,87,137,2013-01-25
20,86,2939,2013-01-14
21,92,159,2013-01-23
22,81,2331,2013-03-01
23,88,998,2013-01-20
24,38,102,2013-02-22
25,32,4813,2013-01-13
26,36,1671,2013-01-19

//省略部分数据

4.2.2 销售数据集

一共4列,分别对应 产品ID,销售数量,销售单价,销售日期。


1,14,1236,2013-01-14
2,19,808,2013-03-06
3,26,886,2013-02-23
4,23,1793,2013-02-09
5,27,1206,2013-01-21
6,27,2648,2013-01-30
7,22,1502,2013-01-19
8,20,1050,2013-01-18
9,13,1778,2013-01-30
10,20,2718,2013-03-14
11,22,2175,2013-01-12
12,16,3284,2013-02-12
13,30,4152,2013-01-30
14,22,2770,2013-03-11
15,28,778,2013-02-23
16,22,874,2013-02-22
17,12,2718,2013-03-22
18,12,747,2013-02-23
19,27,172,2013-02-07
20,27,3282,2013-01-22
21,28,224,2013-02-05
22,26,2613,2013-03-30
23,27,1147,2013-01-31
24,16,141,2013-03-20
25,15,5343,2013-01-21
26,16,1887,2013-01-30
27,12,2535,2013-01-12
28,16,469,2013-01-07
29,29,2395,2013-03-30
30,17,1549,2013-01-30
31,25,4173,2013-03-17

//省略部分数据

4.2.3 其他费用数据集

一共2列,分别对应 发生日期,发生金额


2013-01-02,552
2013-01-03,1092
2013-01-04,1794
2013-01-05,435
2013-01-06,960
2013-01-07,1066
2013-01-08,1354
2013-01-09,880
2013-01-10,1992
2013-01-11,931
2013-01-12,1209
2013-01-13,1491
2013-01-14,804
2013-01-15,480
2013-01-16,1891
2013-01-17,156
2013-01-18,1439
2013-01-19,1018
2013-01-20,1506
2013-01-21,1216
2013-01-22,2045
2013-01-23,400
2013-01-24,1795
2013-01-25,1977
2013-01-26,1002
2013-01-27,226
2013-01-28,1239
2013-01-29,702
2013-01-30,1396

//省略部分数据

4.3 程序设计

我们要编写5个文件:

  • 计算采购金额,Purchase.java
  • 计算销售金额,Sell.java
  • 计算其他费用金额,Other.java
  • 计算利润,Profit.java
  • Zookeeper的调度,ZookeeperJob.java

4.3.1 计算采购金额

采购金额,是基于Hadoop的MapReduce统计计算。


public class Purchase {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class PurchaseMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class PurchaseReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("purchase");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化purchase
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Purchase.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(PurchaseMapper.class);
        job.setReducerClass(PurchaseReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("purchase", "logfile/biz/purchase.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/purchase");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.2 计算销售金额

销售金额,是基于Hadoop的MapReduce统计计算。


public class Sell {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class SellMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class SellReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("sell");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化sell
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Sell.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(SellMapper.class);
        job.setReducerClass(SellReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("sell", "logfile/biz/sell.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.3 计算其他费用金额

其他费用金额,是基于本地文件的统计计算。


public class Other {

    public static String file = "logfile/biz/other.csv";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    private static String month = "2013-01";

    public static void main(String[] args) throws IOException {
        calcOther(file);
    }

    public static int calcOther(String file) throws IOException {
        int money = 0;
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));

        String s = null;
        while ((s = br.readLine()) != null) {
            // System.out.println(s);
            String[] tokens = DELIMITER.split(s);
            if (tokens[0].startsWith(month)) {// 1月的数据
                money += Integer.parseInt(tokens[1]);
            }
        }
        br.close();

        System.out.println("Output:" + month + "," + money);
        return money;
    }
}

4.3.4 计算利润

利润,通过zookeeper分步式自动调度计算利润。


public class Profit {

    public static void main(String[] args) throws Exception {
        profit();
    }

    public static void profit() throws Exception {
        int sell = getSell();
        int purchase = getPurchase();
        int other = getOther();
        int profit = sell - purchase - other;
        System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
    }

    public static int getPurchase() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
        return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
    }

    public static int getSell() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
        return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
    }

    public static int getOther() throws IOException {
        return Other.calcOther(Other.file);
    }

}

4.3.5 Zookeeper调度

调度,通过构建分步式队列系统,自动化程序代替人工操作。


public class ZooKeeperJob {

    final public static String QUEUE = "/queue";
    final public static String PROFIT = "/queue/profit";
    final public static String PURCHASE = "/queue/purchase";
    final public static String SELL = "/queue/sell";
    final public static String OTHER = "/queue/other";

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.out.println("Please start a task:");
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            doPurchase(zk);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            doSell(zk);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            doOther(zk);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
                    System.out.println("Queue has Completed!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => " + PROFIT);
        zk.exists(PROFIT, true);

        if (zk.exists(QUEUE, false) == null) {
            System.out.println("create " + QUEUE);
            zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(QUEUE + " is exist!");
        }
    }

    public static void doPurchase(ZooKeeper zk) throws Exception {
        if (zk.exists(PURCHASE, false) == null) {

            Purchase.run(Purchase.path());

            System.out.println("create " + PURCHASE);
            zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(PURCHASE + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doSell(ZooKeeper zk) throws Exception {
        if (zk.exists(SELL, false) == null) {

            Sell.run(Sell.path());

            System.out.println("create " + SELL);
            zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(SELL + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doOther(ZooKeeper zk) throws Exception {
        if (zk.exists(OTHER, false) == null) {

            Other.calcOther(Other.file);

            System.out.println("create " + OTHER);
            zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(OTHER + " is exist!");
        }
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws Exception {
        int size = 3;
        List children = zk.getChildren(QUEUE, true);
        int length = children.size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create " + PROFIT);
            Profit.profit();
            zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

            for (String child : children) {// 清空节点
                zk.delete(QUEUE + "/" + child, -1);
            }
        }
    }
}

5. 运行程序

最后,我们运行整个的程序,包括3个部分。

  • zookeeper服务器
  • hadoop服务器
  • 分步式队列应用

5.1 启动zookeeper服务

启动zookeeper服务器集群:


~ cd toolkit/zookeeper345

# 启动zk集群3个节点
~ bin/zkServer.sh start conf/zk1.cfg
~ bin/zkServer.sh start conf/zk2.cfg
~ bin/zkServer.sh start conf/zk3.cfg

~ jps
4234 QuorumPeerMain
5002 Jps
4275 QuorumPeerMain
4207 QuorumPeerMain

查看zookeeper集群中,各节点的状态


# 查看zk1节点状态
~ bin/zkServer.sh status conf/zk1.cfg
JMX enabled by default
Using config: conf/zk1.cfg
Mode: follower

# 查看zk2节点状态,zk2为leader
~ bin/zkServer.sh status conf/zk2.cfg
JMX enabled by default
Using config: conf/zk2.cfg
Mode: leader

# 查看zk3节点状态
~ bin/zkServer.sh status conf/zk3.cfg
JMX enabled by default
Using config: conf/zk3.cfg
Mode: follower

启动zookeeper客户端:


~ bin/zkCli.sh -server 192.168.1.201:2181

# 查看zk
[zk: 192.168.1.201:2181(CONNECTED) 0] ls /
[queue, queue-fifo, zookeeper]

# /queue路径无子目录
[zk: 192.168.1.201:2181(CONNECTED) 1] ls /queue
[]

5.2 启动Hadoop服务


~ hadoop/hadoop-1.0.3
~ bin/start-all.sh

~ jps
25979 JobTracker
26257 TaskTracker
25576 DataNode
25300 NameNode
12116 Jps
25875 SecondaryNameNode

5.3 启动分步式队列ZookeeperJob

5.3.1 启动统计采购数据程序,设置启动参数1

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
copy from: logfile/biz/purchase.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Output:2013-01,9609887
create /queue/purchase
Queue Complete:1/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 3] ls /queue
[purchase]

5.3.2 启动统计销售数据程序,设置启动参数2

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
copy from: logfile/biz/sell.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Output:2013-01,2950315
create /queue/sell
Queue Complete:2/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[purchase, sell]

5.3.3 启动统计其他费用数据程序,设置启动参数3

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Output:2013-01,34193
create /queue/other
Queue Complete:3/3
create /queue/profit
cat: hdfs://192.168.1.210:9000/user/hdfs/biz/sell/output/part-r-00000
2950315

cat: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase/output/part-r-00000
9609887

Output:2013-01,34193
profit = sell - purchase - other = 2950315 - 9609887 - 34193 = -6693765
Queue has Completed!!!

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[profit]

在最后一步,统计其他费用数据程序运行后,从日志中看到3个条件节点都已满足要求。然后,通过同步的分步式队列自动启动了计算利润的程序,并在日志中打印了2013年1月的利润为-6693765。

本文介绍的源代码,已上传到github: https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myzk/hadoop

通过这个复杂的实验,我们成功地用zookeeper实现了分步式队列,并应用到了业务中。当然,实验中也有一些不是特别的严谨的地方,请同学边做边思考。

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

打赏作者

那些年考的那些认证

架构师的信仰系列文章,主要介绍我对系统架构的理解,从我的视角描述各种软件应用系统的架构设计思想和实现思路。

从程序员开始,到架构师一路走来,经历过太多的系统和应用。做过手机游戏,写过编程工具;做过大型Web应用系统,写过公司内部CRM;做过SOA的系统集成,写过基于Hadoop的大数据工具;做过外包,做过电商,做过团购,做过支付,做过SNS,也做过移动SNS。以前只用Java,然后学了PHP,现在用R和Javascript。最后跳出IT圈,进入金融圈,研发量化交易软件。

架构设计就是定义一套完整的程序规范,坚持架构师的信仰,做自己想做的东西。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/architect-certification/

architect-cert

前言

回忆当年在学校的生活,真是幸福,毫无生活压力;时不常逃逃课,学习自己追求的技术,享受编程乐趣。那些年考过了不少有价值的认证,都不知道自己是怎么坚持下来的,5年时间一共完成了10的认证。

回过头来,我自己都不知道当时哪里来的动力,几乎是一件不可能完成的事情。一头走来跌跌撞撞,但最终找到了自己的职业方向。

目录

  1. SCJP
  2. SCWCD
  3. XML-141
  4. Websphere-285 & websphere-286
  5. DB2-700 & DB2-701
  6. SCBCD
  7. SCDJWS
  8. SCMAD

1. SCJP

我记得学Java是从大二开始的,对于专业课完全没有兴趣,同时又觉得C的代码很丑,觉得JSP做网页很牛的样子,就开始了我的Java生涯。自己学了半年,看着书按照例子,可以写一个小程序,但总觉得摸不着门,于是假期回到北京报了一个新东方的SCJP培训班,3天课程,800元。老师很负责把Java基础整个讲了一篇,我听的特别过瘾,(当时真应该留下老师的电话,特别感谢!)。

上完课后,剩下的25天的假期,我完全就在家中对着电脑度过的。从那一刻起,我终于开始对Java入门了。在假期的倒数第二天,我去考了SCJP1.4的考试。2个半小时考试,紧张的不得了,第一次用电脑考试,全英文的试题,硬着头皮一点一点读。题目都是选择题,单选和不定项选择题,对于不定项选择题ABCDEFGH 8个选择项给你,猜对的可能性很小,不过能让参加考试的有思路上的方向。考试过程中有摄像头一直监控,考试的屋子很小,一排5个台式机,每个人有一个小格子,一张草稿纸,外面是嘈杂的市场叫卖声音,坐满了2个半小时,检查了3篇,我最后是82分考过的。

从那时起,我终于开始对自己有信心了,并坚信自己找到自己的方向。

2. SCWCD

考完SCJP,对Java的基础核心有所了解了,但还是不知道Java能干什么。想做Web开发,又要学JSP和Servlet,当时甚至都不知道,JSP与ASP,PHP的区别。按照SCWCD的考试大纲,自己准备复习。虽然,还有补习班可以去学,但动辄4000-5000元的价格,已经不是一个学生可以承担了。

通过Google,在互联网上找到一切可以学习的教程,几乎看了几百份的教程,去相关的论坛天天逛逛。10年前我逛学习网站coderanch,竟然还在: http://www.coderanch.com/forums

当时看着网站上,老外各种晒着自己的认证,真是羡慕嫉妒恨啊!我就憋着,什么时候,我也能晒出我的认证,让别人也羡慕嫉妒一下。嘿嘿。

一咬牙一跺脚用了一个学期,就在屋里把所有的SCWCD认证的相关知识都学透了。这里不得不提一句,当时我认为写的最好的参考书:“SCWCD 认证专家应考指南”。

8821265-1_w

我吃透里面每一句话,完成所有的代码实例。就这样,我在第二个假期回到北京,又考过了SCWCD的认证。

此时,开发一个网站,对于我来说已经很简单了。不仅可以做网站,还可以利用Web容器的机制,实现各种复杂的功能,比如认证和授权,定时器,JNDI等。

3. XML-141

连续考了2个SUN的认证,几乎让我破产了,都是1200元一次大本投入,好在都通过了,不然真的要哭死了。

大三的时候,学校成立了IBM俱乐部,我就报名参加了,加入俱乐部的好处是考IBM的认证从原价1000元一次,变成学生价150元一次。简单一算,考一次SUN的认证可以考8次IBM的认证,接下我的重点马上转向IBM认证。我当时应该所有人中是最大的受益者了。

XML-141,这个认证是对我计算机软件基础知识形成最大支持的一个认证,让我认识到了计算机可以变成世界,也是我所有考过的认证中最难的一个。

这个认证的难点,主要在于知识面非常广,并与现实应用结合,所以对于当时我来说特别的困难。我将近花了1个月时间,从互联网上找到所有关于XML的资料,并每天晚上偷偷跑去旁听计算机研究生的XML课程,长期泡图书馆每天都盯着一本书 XML高级编程(第2版),这是一本看上去就不可能读完的书,16开1051页。伴着有点发霉的味道,第一遍读就花了3个月。对照着上面的例子,运行了所有程序。

507302-1_w

在这个学期结束的时候,我报名了参考XML-141的考试,虽然我认为我已准备的足够充分了,但是我还是高估了自己的基础知识,好像是48分没有通过。这个打击对于我来说是巨大的,我花了一学期的时间,几乎每天是每天12小时在学习,半年的努力竟然不能给自己划一个逗号。可能当时真的是自己没有企业应用的经验,很多案例题的解决方案都出了差错。痛定思痛,从图书馆借走了,“XML高级编程(第2版)” 背着回家了继续苦读。艰苦的又过了一个假期,我把 “XML高级编程(第2版)” 这本书一共看了3遍,而且把里面很多简单介绍的框架和程序包,都去对应的官网下载并运行。这1个月的补习,让我更加了解 为什么需要XML?XML能做什么?XML可以对原来方案有什么改进?JavaEE的Web容器中的以web.xml做为配置文件的意义!

再次开学的时候,我是带来卷土重来的决心回来的。开学的第三周,我又一次去考了XML-141这个认证,并且是90分的成绩通过了。这次考试的通过,我可能就已经具备了架构师的思考方式,并且很深刻地影响了我很多年的程序设计理念。

4. Websphere-285 & websphere-286

同是这个学期,我又完成参加了 Websphere-285和websphere-286的考试。对我来说,这两个考试基本上已经难度不大了,我是在一个下午,连续考了两门了,就像高考连考一样。最后很幸运的都通过了,并且没有给自己太大的压力。

Websphere-285,主要是考Websphere的家族产品,安装和使用。而Websphere-286,有点像SCJP和XML的结合,主要考如何用Websphere家族产品,设计基于JAVA的企业级解决方案。Websphere-286的证书,又依赖于SCJP的认证,考过了以后,Websphere-285很快就寄到了,Websphere-286还需要我先把SCJP的证书传真发给美国的IBM考试中心(就两张纸花了好几十块),这样就获得了这两个证书。

5. DB2-700 & DB2-701

DB2的认证,完全是无意所获得的。大三下学期,IBM的俱乐部开始推广IBM的数据库产品DB2,我也申请到了学生用的正版软件,然后就在自己的电脑上开始了各种倒腾。之前的专业课,学过MS-SQL-Server,对数据库有所了解。不过对于DB2来说,原来的基础知识是远远不够的。在DB2创建数据库,竟然要10行左右的命令,而不是像MS-SQL-Server点一个按钮就完成了的。

建库,建缓冲区,建表空间,定义分区大小,分离表空间和索引空间,再建Schema,建用户,分配权限,完成上面的整个过程,才能开始创建表,然后建视图,定义触发器和存储过程,性能监控,SQL执行计划,再通过JDBC与Java连接。。。一大串东西走下来,内容是丰富的,过程是复杂的,还需要细心整理各种命令脚本。不过对我来说,DB2已经没有特别难度了,只要一步一步仔细地操作一下来,就可以完成了。

DB2-700如同Websphere-285一样,主要是考DB2的家族产品,产品线,安装和使用。而DB2-701就考到了,从建库到存储过程,再到SQL优化,和最后的数据库案例设计。

一个学期又完成了2个认证,到大三结束的时候,我已经有了7个认证。

6. SCDJWS

这些认证意味什么?我开始的时候,很迷茫。当初的想法,是很天真很幼稚的,以为有了认证就可以去找到一份不错的工作;以为有了认证就可以直接去做架构师。但当你去找工作的时候,亮出认证的那一刻,你会发现,招聘的考官仅仅是有点的好奇,拿着各种认证证书,简单扫一遍,然后就是放到旁边,就跟认证不存在一样,继续问着他们那些应届毕业生在课堂中应该学到的那些问题。

我坦率的讲,我不是学计算机的,并且当时的确不了解 教课书里的算法,不会用时间复杂度和空间复杂度去解决数据结构中树或图的问题,我也不懂C和C++。回想我学的Java的所有知识体系中,都没讲要特别地要用Java去实现这些算法,JDK中的各种基础包已经封装好了这些算法。虽然当时各不理解,也几乎没有获得大公司的offer,不过我还是相信自己选的路,因为我知道我能做什么出来。

毕业了,回到北京。通过社会招聘,投简历,很容易的就找到了一份工作。几乎是回到北京的第二天投的简历,第三天就去面试,第四天就上班了。看着别人应聘的辛苦,我的这种上班速度还是有点小快呢。

从之前的面试经历,考认证已经不再是为了找工作了。我遇到的99%考官都不知道,认证是个啥玩意,有个别人的会说,我们这里招过获得SCJP认证的人也挺一般的。所以,我觉得中国和美国在教育上的差距真的是不可逾越的鸿沟。

看到coderanch上那些晒认证的美国人,无一不是通过认证获得了与自己付出相对应的工作,而在中国知识确是无法量化的。在我大三的时候,有个印度的朋友,获得SCJP后,向我请教如何学习Websphere, DB2, SCWCD,并想继续考认证,而且目标很明确的告诉我,有了认证就可以申请去美国的读书了。再一次的心理落差,让我感觉到中国教育的落后,中国人比印度人的出路都少。

从这以后,我考认证就不以找工作为目的了,而变成了为了丰富自己的知识结构而参加的考试,从而给自己的阶段性学习划上句号。

在XML成为标准以后,WebService和SOA成为后来的大火大紫的技术热点。为了明白底层的原理,我又开始准备SCDJWS的认证考试。很庆幸的是,有了XML的基础,在学习WebService的基础的架构的时候,没让我走太多的弯路,像DOM,SAX, SOAP, WSDL, UDDI, JAXB, JAXP, XSTL, XPath, XPointor等的各种技术规范,我都快速熟练地掌握了,对于SOA的框架层次理解地更清晰。虽然工作中难以用到这些,但很多时候别人的问题我已经能通过原理做解释了。我自己已经开始构思自己未来的系统了。

就算基础很好,学习SCDJWS也花了我半年的时间,白天上班晚上学习,拿到这个认证也是不容易的。

7. SCBCD

说起EJB,曾经是Java程序员都希望追求的技术,不是因为EJB有多牛,而EJB足够复杂。在Spring出现之前,Java社区的开发都是来自于SUN, IBM, Oracle的声音,他们在尝试用Java构建整个的软件生态系统,其中EJB就是SUN的企业级应用开发规范中必须的作为核心的一部分,分为Session Bean, Entity Bean, Message Bean。当然这个方案的实体Bean部分的设计比较复杂,比今天的Hibernate的复杂度要高好几个量级。BEA公司基于EJB做的宠物商店的demo,动辄就是上万行代码,让Java程序员感觉这才“高大上”的程序开发。

9037541-1_w

后来Spring框架出现了,expert one-on-one J2EE Development without EJB 一书动摇了大型软件厂商对企业级应用的定义。越来越多的复杂应用,开始像Spring的IOC模式迁移,Java开始变得更轻更自由了,Apache中的很多项目也开始基于Spring去重新构建,或者提供或Spring集成的接口了。后来,我们常用的SSH (Struts+Spring+Hibernate) 变形成了Java的Web开始的MVC铁三角组合。

当Hibernate在持久化层获得市场上的巨大成功以后,SUN开始重新定义EJB的规定,设计EJB3.0来轻量化EJB2.x,减少开发人员的准入门槛。

此时我依然是SUN的忠实粉丝,期待着EJB3.0的改进,我又完成了SCBCD的认证考试。而讽刺的是,完成考试后,我决定了放弃所有的ORM映射的框架,用iBatis的提供的原生SQL来实现持久层调用,同时自己写了一套基于MyBatis(iBatis)的持久层工具,更快速和轻量的构建我自己的Java系统。

8. SCMAD

SCMAD 这个认证是针对 JavaME手机移动的应用开发,考这个认证就有点奇葩了。当时,只是为了丰富自己的知识,虽然做半年过手机游戏,但没有理解很深,以后也不知道有没有机会再做手机相关的应用,为了完善自己的知识结构,从而考的SCMAD。

这个认证考试的时候,已经是我工作的第三年了,我已经没有太多的时间学习了,而且会很容易被身边的其他的事情所影响。复习过程中,虽然整体内容不太多,而且自己也有了一定的实际经验,但是拖的时间很长,花了将近一年的时间复习和准备。最后,也是考试券快过期之前完成的。

从2003年底大二开始到2008年初,5年时间我一共完成了10的认证。回过头来,我自己都不知道,当时哪里来的学习动力,几乎是一件不可能完成的事情。如果没有这些认证所学的基础知识,我现在又何去何从?

一篇博客,记录自己以前的学习经历。告诫后人珍惜时间,未来的成就,都是之前的积累,爆发就在未来的某个瞬间。

转载请注明出处:
http://blog.fens.me/architect-certification/

打赏作者

读书笔记 Big Data Analytics with R and Hadoop

RHadoop实践系列文章,包含了R语言与Hadoop结合进行海量数据分析。Hadoop主要用来存储海量数据,R语言完成MapReduce 算法,用来替代Java的MapReduce实现。有了RHadoop可以让广大的R语言爱好者,有更强大的工具处理大数据1G, 10G, 100G, TB, PB。 由于大数据所带来的单机性能问题,可能会一去不复返了。

RHadoop实践是一套系列文章,主要包括”Hadoop环境搭建”,”RHadoop安装与使用”,R实现MapReduce的协同过滤算法”,”HBase和rhbase的安装与使用”。对于单独的R语言爱好者,Java爱好者,或者Hadoop爱好者来说,同时具备三种语言知识并不容 易。此文虽为入门文章,但R,Java,Hadoop基础知识还是需要大家提前掌握。

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/r-hadoop-book-big-data/

r-hadoop-book-big-data

前言

最近的一本新书Big Data Analytics with R and Hadoop是关于R和Hadoop实践的第一本图书,作者Vignesh Prajapati曾经在图书出版的半年前联系过我,通过Google翻译发现了我的博客,希望把其中的1-2个例子放到他的书中。

没想到这本书,经过半年就出版了,作者效率还是挺高的。受Packt Publishing编辑Amol Bhosle委托为本书写个书评,于是就有本篇文章。

目录

  1. 图书概览
  2. 图书内容剖析
  3. 最后总结

1. 图书概览

本书的几个核心点:R,Hadoop, R+Hadoop, 数据分析案例,机器学习算法案例,R的数据访问接口。

我通过一个思维导图来表达。

Big Data Analytics with R and Hadoop - fens.me

书中最重要的是案例部分,作者分别使用R语言单机实现,以及RHadoop的分步式实现,介绍是多个案例的实践。

2. 图书内容剖析

  • R语言介绍
  • Hadoop介绍
  • R+Hadoop技术方案
  • 数据分析案例
  • 大数据分析案例
  • R语言的数据访问接口

1). R语言介绍

简单地介绍了R安装,RStudio安装,R语言最擅长算法模型:回归,分类,聚类,推荐。

2). Hadoop介绍

主要是介绍了Hadoop安装,在几种不同的Linux系统上,用Apache Hadoop和Cloudera Hadoop二个版本对比安装。

简单介绍了HDFS,dateeode和namenode;讲了MapReduce的工作原理,和用MapReduce对数据处理的过程。

介绍了Hadoop的命令行的使用。

3). R+Hadoop技术方案

作者对于R+Hadoop做了3种技术方案讨论,分别是RHipe, RHadoop, R + Hadoop Streaming。

a. RHipe

RHipe是R与Hadoop的集成编程环境。RHIPE可以让R语言与Hadoop进行通信,访问Hadoop的HDFS和调用MapReduce,让R语言的使用者利用Hadoop的分步式环境进行行大数据的分析。

RHipe官方网站:http://www.datadr.org/

b. RHadoop

RHadoop是由RevolutionAnalytics公司开发的一个R与Hadoop的集成编程环境,与RHipe的功能一样。RHadoop包含三个R包 (rmr,rhdfs,rhbase),分别是对应Hadoop系统架构中的,MapReduce, HDFS, HBase 三个部分。在2013年底,又增加第4个R包plyrmr,用于数据处理操作。本书中并没有涉及plyrmr包。

RHadoop的发布页:https://github.com/RevolutionAnalytics/RHadoop/wiki

RHadoop实践系列文章:http://blog.fens.me/series-rhadoop/

c. R + Hadoop Streaming

Hadoop Streaming是Hadoop提供的,允许任何可执行的脚本作为Mapper和Reducer的一种实现方实。R + Hadoop Streaming就是用R脚本实现Mapper和Reducer。作者用2种方式,进行了测试。

c1. R Script + Hadoop Streaming : 单纯的R语言脚本,通过Shell运行。
c2. HadoopStreaming + Hadoop Streaming: 使用一个封装好的R包HadoopStreaming来实现。

这3种技术方案,是目前R与Hadoop结合实现方案。我对RHadoop和R Script + Hadoop Streaming比较熟悉,经过我测试只有R Script + Hadoop Streaming这种方式,可以用于生产环境,RHadoop性能还是有一些问题的,可以提升的空间还是很大的。书中介绍的另外两种方法,我要有时间,再去试试。不过,我相信R和Hadoop的结合的项目,会越来越多的。

4). 数据分析案例

上面篇幅把技术的基础都说清楚,接下来就是核心案例了,书中介绍了如何使用R结合Hadoop进行数据分析。

首先书中介绍了,一个数据分析项目的框架,包括5个部分。

  • 问题
  • 定义数据需求
  • 数据预处理
  • 数据建模及执行
  • 数据可视化

然后,作者介绍了3个应用案例。

  • 网页分类:把一个网站中的网页按重要性进行排名
  • 股票分析:通过历史交易数据计算股票市场的变化
  • 价格预测:预测图书销售的价格,来自Kaggle的一道竞赛题

上面3个案例,都使用R和RHadoop进行实现,都是非常不错的案例,可以给我们的学习和使用提供很好的思路。

5). 机器学习算法案例

接下来,作者介绍了用RHadoop实现的基于大数据机器学习算法,充分结合了R和Hadoop的优势。作者把机器学习算法分为3类,监督学习算法,非监督学习算法,推荐算法。

4个算法的案例:

a. 线性回归

最简单的一种回归算法,可以下面公式表示。

y = ax + e 

在R语言中,一个函数lm()就可以实现。

在大数据的背景下,利用RHadoop,需要自己实现lm()这个函数。


# Reducer
Sum = function(., YY) keyval(1, list(Reduce('+', YY)))

# XtX =
values(

# For loading hdfs data in to R
from.dfs(

# MapReduce Job to produce XT*X
mapreduce(
input = X.index,

# Mapper – To calculate and emitting XT*X
map =
function(., Xi) {
yi = y[Xi[,1],]
Xi = Xi[,-1]
keyval(1, list(t(Xi) %*% Xi))},

# Reducer – To reduce the Mapper output by performing sum
operation over them
reduce = Sum,
combine = TRUE)))[[1]]

b. Logistic回归

比线性回归稍微复杂一些,可以用公式表示。


logit(p) = β0 + β1 × x1 + β2 × x2 + ... + βn × xn

R程序还是一个函数glm()。

在大数据的背景下,利用RHadoop,需要自己实现glm()这个函数。


# Mapper – computes the contribution of a subset of points to the
gradient.

lr.map =
  function(., M) {
    Y = M[,1]
    X = M[,-1]
    keyval(1, Y * X * g(-Y * as.numeric(X %*% t(plane))))
}

# Reducer – Perform sum operation over Mapper output.

lr.reduce = function(k, Z) keyval(k, t(as.matrix(apply(Z,2,sum))))

# MapReduce job – Defining MapReduce function for executing logistic
regression

logistic.regression =
  function(input, iterations, dims, alpha){
    plane = t(rep(0, dims))
    g = function(z) 1/(1 + exp(-z))
    for (i in 1:iterations) {
      gradient =
        values(
          from.dfs(
            mapreduce(input, map = lr.map, reduce = lr.reduce, combine = T)))
      plane = plane + alpha * gradient 
    }
    plane 
}

c. 聚类算法

这里介绍的是快速聚类kmeans,聚类属于非监督学习法。

通过R语言现实,一个函数kmeans()就可以完成了。

通过RHadoop现实,就需要自己重写这个迭代过程。


# distance calculation function
dist.fun = function(C, P) {
  apply(C,1,function(x) colSums((t(P) - x)^2))
}

# k-Means Mapper
kmeans.map = function(., P) {
  nearest = {

  # First interations- Assign random cluster centers
  if(is.null(C))
    sample(1:num.clusters,nrow(P),replace = T)

# Rest of the iterations, where the clusters are assigned # based
on the minimum distance from points

  else {
    D = dist.fun(C, P)
    nearest = max.col(-D)}}
    if(!(combine || in.memory.combine))
      keyval(nearest, P)
    else
      keyval(nearest, cbind(1, P))
}


# k-Means Reducer
kmeans.reduce = {

  # calculating the column average for both of the conditions
  if (!(combine || in.memory.combine) )
    function(., P) t(as.matrix(apply(P, 2, mean)))
  else function(k, P) keyval(k,t(as.matrix(apply(P, 2, sum))))
}


# k-Means MapReduce – for
kmeans.mr = function(P,num.clusters,num.iter,combine,in.memory.combine) {
  C = NULL
  for(i in 1:num.iter ) {
    C =  values(from.dfs(mapreduce(P,map = kmeans.map,reduce = kmeans.reduce)))
    if(combine || in.memory.combine)
    C = C[, -1]/C[, 1]
    if(nrow(C) < num.clusters) {
      C =rbind(C,matrix(rnorm((num.clusters -nrow(C)) * nrow(C)),ncol = row(C)) %*% C) 
    }
  }
  C
}

d. 推荐算法

这里介绍的是协同过滤算法,算法实现就有点复杂了。主要思想就是:

同现矩阵 * 评分矩阵 = 推荐结果

注:由于这个案例出自我的博客,我就直接贴我的文章地址了,也方便中文读者了解算法细节。

R语言的算法实现:http://blog.fens.me/r-mahout-usercf/

RHadoop分步式算法实现:http://blog.fens.me/rhadoop-mapreduce-rmr/

6). R语言的数据访问接口

最后一部分,书中介绍了R语言的各种数据访问接口。

FILE:

  • CSV: read.csv(), write.csv()
  • Excel: xlsx, xlsxjars,rJava

Database:

  • MySQL: RMySQL
  • SQLite: RSQLite
  • PostgreSQL: RPostgreSQL

NoSQL:

  • MongoDB: rmongodb
  • Hive: RHive
  • HBase: RHBase

补充一下,我的博客中也写了R的数据访问接口的文章。

R的多种接口已经打通,这也就说明R已经做好了准备,为工业界带来革命的力量。

3. 最后总结

这是一本不错的图书,从R+Hadoop的角度出发,打开R语言面向大数据应用的思路。这种结合是跨学科碰撞的结果,是市场需求的导向。但由于R+Hadoop还不够成熟,企业级应用依然缺乏成功案例,所以当实施R+Hadoop的应用时,还会碰到非常多的问题。期待有担当的公司和个人,做出被大家认可的产品来。

转载请注明出处:
http://blog.fens.me/r-hadoop-book-big-data/

打赏作者

PeopleRank从社交网络中发现个体价值

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-social-peoplerank/

peoplerank

前言

如果说Google改变了互联网,那么社交网络就改变人们的生活方式。通过社交网络,我们每个个体,都是成为了网络的中心。我们的生活半径,被无限放大,通过6个朋友关系,就可以认识世界上任何一个人。

未来的互联网将是属于我们每一个人。

目录

  1. PeopleRank和PageRank
  2. 需求分析:从社交网络中发现个体价值
  3. 算法模型:PeopleRank算法
  4. 架构设计:PeopleRank计算引擎系统架构
  5. 程序开发:PeopleRank算法实现

1. PeopleRank和PageRank

PageRank让Google成为搜索领域的No.1,也是当今最有影响力的互联网公司之一,用技术创新改变人们的生活。PageRank主要用于网页评分计算,把互联网上的所有网页都进行打分,给网页价值的体现。

自2012以来,中国开始进入社交网络的时代,开心网,人人网,新浪微博,腾讯微博,微信等社交网络应用,开始进入大家的生活。最早是由“抢车位”,“偷菜”等社交游戏带动的社交网络的兴起,如今人们会更多的利用社交网络,获取信息和分享信息。我们的互联网,正在从以网页信息为核心的网络,向着以人为核心的网络转变着。

于是有人就提出了,把PageRank模型应用于社交网络,定义以人为核心的个体价值。这样PageRank模型就有了新的应用领域,同时也有了一个新的名字PeopleRank。

关于PageRank的介绍,请参考文章:PageRank算法R语言实现

注:PeopleRank网上还有不同的解释,我这里仅仅表示用来解释“PageRank模型”。

下面我们将从一个PeopleRank的案例来解释,如何从社交网络中发现个体价值。

2. 需求分析:从社交网络中发现个体价值

案例介绍:
以新浪微博为例,给微博中每个用户进行评分!
从新浪微博上,把我们的关注和粉丝的关系都找到。

如下图:我在微博上,随便找了几个微博账号。

weibo-logo

我们的任务是,需要给这些账号评分!

  • 方法一,简单求和:评分=关注数+粉丝数+微博数
  • 方法二,加权求和:评分=a*关注数+b*粉丝数+c*微博数

新建数据文件:weibo.csv


~ vi weibo.csv

A,314,1091,1488
B,196,10455,327
C,557,51635228,13793
D,55,14655464,1681
E,318,547,4899
F,166,145,170
G,17,890,169
H,54,946759,17229

R语言读入数据文件


weibo<-read.csv(file="weibo.csv",header=FALSE)
names(weibo)<-c("id","follow","fans","tweet")

1). 方法一,简单相加法


> data.frame(weibo[1],rank=rowSums(weibo[2:4]))
  id     rank
1  A     2893
2  B    10978
3  C 51649578
4  D 14657200
5  E     5764
6  F      481
7  G     1076
8  H   964042

这种方法简单粗暴的方式,是否能代码公平的打分呢?!

2). 方法二,加权求和

通过a,b,c的3个参数,分别设置权重求和。与方法一存在同样的问题,a,b,c的权值都是人为指定的,也是不能表示公平的打分的。

除了上面的两个方法,你能否想到不一样的思路呢!

3. 算法模型:PeopleRank算法

基于PageRank的理论,我们以每个微博账户的“关注”为链出链接,“粉丝”为链入链接,我们把这种以人为核心的关系,叫PeopleRank。

关于PageRank的介绍,请参考文章:PageRank算法R语言实现

PeopleRank假设条件:

  • 数量假设:如果一个用户节点接收到的其他用户“关注”的数量越多,那么这个用户越重要。
  • 质量假设:用户A的“粉丝”质量不同,质量高的“粉丝”会通“关注”接向其他用户传递更多的权重。所以越是质量高的“用户”关注用户A,则用户A越重要。

衡量PeopleRank的3个指标:

  • 粉丝数
  • 粉丝是否有较高PeopleRank值
  • 粉丝关注了多少人

我们以下的数据为例,构造基于微博的数据模型:
(由于微博数据已增加访问权限,我无法拿到当前的实际数据,我用以前@晒粉丝应用,收集到的微博数据为例,这里ID已经过处理)

测试数据集:people.csv

  • 25个用户
  • 66个关系,关注和粉丝的关系

数据集: people.csv


1,19
1,21
2,11
2,17
2,21
3,1
3,20
3,2
3,7
3,6
3,10
4,3
4,6
5,19
5,11
5,2
6,4
6,12
6,18
6,15
6,10
6,5
7,9
7,18
7,10
8,3
8,11
8,7
8,16
8,14
9,6
10,8
10,18
11,13
11,3
12,9
12,4
12,16
12,5
13,19
13,1
13,6
14,7
14,17
14,19
14,1
14,5
14,2
15,11
15,14
15,12
16,20
17,4
17,6
18,10
18,11
18,15
18,14
19,18
20,10
20,5
21,24
22,11
23,17
24,15
25,24

第一列为用户ID,第二列也是用户ID。第一列用户,关注了第二用户。

以R语言可视化输出

igraph-refs

R语言程序


library(igraph)
people<-read.csv(file="people.csv",header=FALSE)
drawGraph<-function(data){
  names(data)<-c("from","to") 
  g <- graph.data.frame(data, directed=TRUE)
  V(g)$label <- V(g)$name
  V(g)$size <- 15
  E(g)$color <- grey(0.5)
  g2 <- simplify(g)
  plot(g2,layout=layout.circle)
}
drawGraph(people)

用R语言构建PeopleRank的算法原型

  • 构建邻接矩阵
  • 变换概率矩阵
  • 递归计算矩阵特征值
  • 标准化结果
  • 对结果排序输出

R语言算法模型


#构建邻接矩阵
adjacencyMatrix<-function(pages){
  n<-max(apply(pages,2,max))
  A <- matrix(0,n,n)
  for(i in 1:nrow(pages)) A[pages[i,]$dist,pages[i,]$src]<-1
  A
}

#变换概率矩阵
dProbabilityMatrix<-function(G,d=0.85){
  cs <- colSums(G)
  cs[cs==0] <- 1
  n <- nrow(G)
  delta <- (1-d)/n
  A <- matrix(delta,n,n)
  for (i in 1:n) A[i,] <- A[i,] + d*G[i,]/cs
  A
}

#递归计算矩阵特征值
eigenMatrix<-function(G,iter=100){
  n<-nrow(G)
  x <- rep(1,n)
  for (i in 1:iter) x <- G %*% x
  x/sum(x)
}

#直接计算矩阵特征值
calcEigenMatrix<-function(G){
  x <- Re(eigen(G)$vectors[,1])
  x/sum(x)
}

PeopleRank计算,带入数据集people.csv


people<-read.csv(file="people.csv",header=FALSE)
names(people)<-c("src","dist");people
A<-adjacencyMatrix(people);A
G<-dProbabilityMatrix(A);G
q<-calcEigenMatrix(G);

q
[1] 0.03274732 0.03404052 0.05983465 0.03527074 0.04366519 0.07042752 0.02741232
 [8] 0.03378595 0.02118713 0.06537870 0.07788465 0.03491910 0.03910097 0.05076803
[15] 0.06685364 0.01916392 0.02793695 0.09450614 0.05056016 0.03076591 0.02956243
[22] 0.00600000 0.00600000 0.03622806 0.00600000

我们给这25用户进行打分,从高到低进行排序。

对结果排序输出:


result<-data.frame(userid=userid,PR=q[userid])
result
   userid          PR
1      18 0.09450614
2      11 0.07788465
3       6 0.07042752
4      15 0.06685364
5      10 0.06537870
6       3 0.05983465
7      14 0.05076803
8      19 0.05056016
9       5 0.04366519
10     13 0.03910097
11     24 0.03622806
12      4 0.03527074
13     12 0.03491910
14      2 0.03404052
15      8 0.03378595
16      1 0.03274732
17     20 0.03076591
18     21 0.02956243
19     17 0.02793695
20      7 0.02741232
21      9 0.02118713
22     16 0.01916392
23     22 0.00600000
24     23 0.00600000
25     25 0.00600000

查看评分最高的用户18的关系数据:


people[c(which(people$src==18), which(people$dist==18)),]

   src dist
55  18   10
56  18   11
57  18   15
58  18   14
19   6   18
24   7   18
33  10   18
59  19   18

粉丝的PeopleRank排名:


which(result$userid %in% people$src[which(people$dist==18)])

[1]  3  5  8 20

粉丝的关注数:


table(people$src)[people$src[which(people$dist==18)]]

 6  7 10 19 
 6  3  2  1 

数据解释:用户18

  • 有4个粉丝为别是6,7,10,19。(粉丝数)
  • 4个粉丝的PeopleRank排名,是3,5,8,20。(粉丝是否有较高PeopleRank值)
  • 粉丝的关注数量,是6,3,2,1。(粉丝关注了多少人)

因此,通过对上面3个指标的综合打分,用户18是评分最高的用户。

通过R语言实现的计算模型,已经比较符合我们的评分标准了,下面我们把PeopleRank用MapReduce实现,以满足对海量数据的计算需求。

4. 架构设计:PeopleRank计算引擎系统架构

pagerank-spider

上图中,左边是数据爬虫系统,右边是Hadoop的HDFS, MapReduce。

  • 数据爬虫系统实时爬取微博数据
  • 设置系统定时器CRON,每xx小时,增量向HDFS导入数据(userid1,userid2)
  • 完成导入后,设置系统定时器,启动MapReduce程序,运行推荐算法。
  • 完成计算后,设置系统定时器,从HDFS导出推荐结果数据到数据库,方便以后的及时查询。

5. 程序开发:PeopleRank算法实现

win7的开发环境 和 Hadoop的运行环境 ,请参考文章:用Maven构建Hadoop项目

开发步骤:

  • 微博好友的关系数据: people.csv
  • 出始的PR数据:peoplerank.csv
  • 邻接矩阵: AdjacencyMatrix.java
  • PeopleRank计算: PageRank.java
  • PR标准化: Normal.java
  • 启动程序: PageRankJob.java

1). 微博好友的关系数据: people.csv,在上文中已列出

2). 出始的PR数据:peoplerank.csv


1,1
2,1
3,1
4,1
5,1
6,1
7,1
8,1
9,1
10,1
11,1
12,1
13,1
14,1
15,1
16,1
17,1
18,1
19,1
20,1
21,1
22,1
23,1
24,1
25,1

3). 邻接矩阵

矩阵解释:

  • 阻尼系数为0.85
  • 用户数为25
  • reduce以行输出矩阵的列,输出列主要用于分步式存储,下一步需要转成行

部分数据输出:


~ hadoop fs -cat /user/hdfs/pagerank/tmp1/part-r-00000|head -n 4

1	0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999
10	0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999
11	0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.43100002,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999
12	0.005999999,0.005999999,0.005999999,0.2185,0.2185,0.005999999,0.005999999,0.005999999,0.2185,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.2185,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999,0.005999999

4). PeopleRank计算: PageRank.java

迭代一次的PeopleRank值


~ hadoop fs -cat /user/hdfs/pagerank/pr/part-r-00000

1       0.716666
10      1.354167
11      2.232500
12      0.575000
13      0.575000
14      0.815833
15      1.354167
16      0.532500
17      1.425000
18      1.850000
19      1.283334
2       0.716667
20      1.141667
21      0.858333
22      0.150000
23      0.150000
24      1.850000
25      0.150000
3       1.170001
4       0.929167
5       1.070833
6       2.275001
7       0.603333
8       0.575000
9       0.645833

5). PR标准化: Normal.java

迭代10次,并标准化的结果:


~ hadoop fs -cat /user/hdfs/pagerank/result/part-r-00000

1       0.032842
10      0.065405
11      0.077670
12      0.034864
13      0.039175
14      0.050574
15      0.066614
16      0.019167
17      0.027990
18      0.094460
19      0.050673
2       0.034054
20      0.030835
21      0.029657
22      0.006000
23      0.006000
24      0.036111
25      0.006000
3       0.059864
4       0.035314
5       0.043805
6       0.070516
7       0.027444
8       0.033715
9       0.021251

我们对结果进行排序


   id       pr
10 18 0.094460
3  11 0.077670
22  6 0.070516
7  15 0.066614
2  10 0.065405
19  3 0.059864
11 19 0.050673
6  14 0.050574
21  5 0.043805
5  13 0.039175
17 24 0.036111
20  4 0.035314
4  12 0.034864
12  2 0.034054
24  8 0.033715
1   1 0.032842
13 20 0.030835
14 21 0.029657
9  17 0.027990
23  7 0.027444
25  9 0.021251
8  16 0.019167
15 22 0.006000
16 23 0.006000
18 25 0.006000

第一名是用户18,第二名是用户11,第三名是用户6,第三名与之前R语言单机计算的结果有些不一样,而且PR值也稍有不同,这是因为我们迭代10次时,特征值还没有完全的收敛,需要更多次的迭代计算,才能得矩阵的特征值。

程序API的实现,请参考文章:PageRank算法并行实现

我们通过PageRank的模型,成功地应用到了社交网络,实现了PeopleRank的计算,通过设计数据挖掘算法,来取代不成熟的人脑思想。算法模型将更客观,更精准。

最后,大家可以利用这个案例的设计思路,认真地了解社交网络,做出属于的自己的算法。

由于时间仓促,代码可能存在bug。请有能力同学,自行发现问题,解决问题!!

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-social-peoplerank/

打赏作者

PageRank算法并行实现

算法为王系列文章,涵盖了计算机算法,数据挖掘(机器学习)算法,统计算法,金融算法等的多种跨学科算法组合。在大数据时代的背景下,算法已经成为了金字塔顶的明星。一个好的算法可以创造一个伟大帝国,就像Google。

算法为王的时代正式到来….

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/algorithm-pagerank-mapreduce/

pagerank-mapreduce

前言

Google通过PageRank算法模型,实现了对全互联网网页的打分。但对于海量数据的处理,在单机下是不可能实现,所以如何将PageRank并行计算,将是本文的重点。

本文将继续上一篇文章 PageRank算法R语言实现,把PageRank单机实现,改成并行实现,利用MapReduce计算框架,在集群中跑起来。

目录

  1. PageRank算法并行化原理
  2. MapReduce分步式编程

1. PageRank算法分步式原理

单机算法原理请参考文章:PageRank算法R语言实现

pagerank-sample

PageRank的分步式算法原理,简单来讲,就是通过矩阵计算实现并行化。

1). 把邻接矩阵的列,按数据行存储

邻接矩阵


          [,1]   [,2]   [,3]   [,4]
[1,] 0.0375000 0.0375 0.0375 0.0375
[2,] 0.3208333 0.0375 0.0375 0.8875
[3,] 0.3208333 0.4625 0.0375 0.0375
[4,] 0.3208333 0.4625 0.8875 0.0375

按行存储HDFS


1       0.037499994,0.32083333,0.32083333,0.32083333
2       0.037499994,0.037499994,0.4625,0.4625
3       0.037499994,0.037499994,0.037499994,0.88750005
4       0.037499994,0.88750005,0.037499994,0.037499994

2). 迭代:求矩阵特征值

pagerank-mr

map过程:

  • input: 邻接矩阵, pr值
  • output: key为pr的行号,value为邻接矩阵和pr值的乘法求和公式

reduce过程:

  • input: key为pr的行号,value为邻接矩阵和pr值的乘法求和公式
  • output: key为pr的行号, value为计算的结果,即pr值

第1次迭代


0.0375000 0.0375 0.0375 0.0375     1     0.150000
0.3208333 0.0375 0.0375 0.8875  *  1  =  1.283333
0.3208333 0.4625 0.0375 0.0375     1     0.858333
0.3208333 0.4625 0.8875 0.0375     1     1.708333

第2次迭代


0.0375000 0.0375 0.0375 0.0375     0.150000      0.150000
0.3208333 0.0375 0.0375 0.8875  *  1.283333  =   1.6445833
0.3208333 0.4625 0.0375 0.0375     0.858333      0.7379167
0.3208333 0.4625 0.8875 0.0375     1.708333      1.4675000

… 10次迭代

特征值


0.1500000
1.4955721
0.8255034
1.5289245

3). 标准化PR值


0.150000                                              0.0375000
1.4955721  / (0.15+1.4955721+0.8255034+1.5289245) =   0.3738930
0.8255034                                             0.2063759
1.5289245                                             0.3822311

2. MapReduce分步式编程

MapReduce流程分解

PageRankJob

HDFS目录

  • input(/user/hdfs/pagerank): HDFS的根目录
  • input_pr(/user/hdfs/pagerank/pr): 临时目录,存储中间结果PR值
  • tmp1(/user/hdfs/pagerank/tmp1):临时目录,存储邻接矩阵
  • tmp2(/user/hdfs/pagerank/tmp2):临时目录,迭代计算PR值,然后保存到input_pr
  • result(/user/hdfs/pagerank/result): PR值输出结果

开发步骤:

  • 网页链接关系数据: page.csv
  • 出始的PR数据:pr.csv
  • 邻接矩阵: AdjacencyMatrix.java
  • PageRank计算: PageRank.java
  • PR标准化: Normal.java
  • 启动程序: PageRankJob.java

1). 网页链接关系数据: page.csv

新建文件:page.csv


1,2
1,3
1,4
2,3
2,4
3,4
4,2

2). 出始的PR数据:pr.csv

设置网页的初始值都是1

新建文件:pr.csv


1,1
2,1
3,1
4,1

3). 邻接矩阵: AdjacencyMatrix.java

adjacencyMatrix

矩阵解释:

  • 阻尼系数为0.85
  • 页面数为4
  • reduce以行输出矩阵的列,输出列主要用于分步式存储,下一步需要转成行

新建程序:AdjacencyMatrix.java


package org.conan.myhadoop.pagerank;

import java.io.IOException;
import java.util.Arrays;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class AdjacencyMatrix {

    private static int nums = 4;// 页面数
    private static float d = 0.85f;// 阻尼系数

    public static class AdjacencyMatrixMapper extends Mapper<LongWritable, Text, Text, Text> {

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = PageRankJob.DELIMITER.split(values.toString());
            Text k = new Text(tokens[0]);
            Text v = new Text(tokens[1]);
            context.write(k, v);
        }
    }

    public static class AdjacencyMatrixReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            float[] G = new float[nums];// 概率矩阵列
            Arrays.fill(G, (float) (1 - d) / G.length);

            float[] A = new float[nums];// 近邻矩阵列
            int sum = 0;// 链出数量
            for (Text val : values) {
                int idx = Integer.parseInt(val.toString());
                A[idx - 1] = 1;
                sum++;
            }

            if (sum == 0) {// 分母不能为0
                sum = 1;
            }

            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < A.length; i++) {
                sb.append("," + (float) (G[i] + d * A[i] / sum));
            }

            Text v = new Text(sb.toString().substring(1));
            System.out.println(key + ":" + v.toString());
            context.write(key, v);
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();

        String input = path.get("input");
        String input_pr = path.get("input_pr");
        String output = path.get("tmp1");
        String page = path.get("page");
        String pr = path.get("pr");

        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.mkdirs(input_pr);
        hdfs.copyFile(page, input);
        hdfs.copyFile(pr, input_pr);

        Job job = new Job(conf);
        job.setJarByClass(AdjacencyMatrix.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(AdjacencyMatrixMapper.class);
        job.setReducerClass(AdjacencyMatrixReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(page));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }
}

4). PageRank计算: PageRank.java

pagerank-step1

矩阵解释:

  • 实现邻接与PR矩阵的乘法
  • map以邻接矩阵的行号为key,由于上一步是输出的是列,所以这里需要转成行
  • reduce计算得到未标准化的特征值

新建文件: PageRank.java


package org.conan.myhadoop.pagerank;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class PageRank {

    public static class PageRankMapper extends Mapper<LongWritable, Text, Text, Text> {

        private String flag;// tmp1 or result
        private static int nums = 4;// 页面数

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit split = (FileSplit) context.getInputSplit();
            flag = split.getPath().getParent().getName();// 判断读的数据集
        }

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = PageRankJob.DELIMITER.split(values.toString());

            if (flag.equals("tmp1")) {
                String row = values.toString().substring(0,1);
                String[] vals = PageRankJob.DELIMITER.split(values.toString().substring(2));// 矩阵转置
                for (int i = 0; i < vals.length; i++) {
                    Text k = new Text(String.valueOf(i + 1));
                    Text v = new Text(String.valueOf("A:" + (row) + "," + vals[i]));
                    context.write(k, v);
                }

            } else if (flag.equals("pr")) {
                for (int i = 1; i <= nums; i++) {
                    Text k = new Text(String.valueOf(i));
                    Text v = new Text("B:" + tokens[0] + "," + tokens[1]);
                    context.write(k, v);
                }
            }
        }
    }

    public static class PageRankReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            Map<Integer, Float> mapA = new HashMap<Integer, Float>();
            Map<Integer, Float> mapB = new HashMap<Integer, Float>();
            float pr = 0f;

            for (Text line : values) {
                System.out.println(line);
                String vals = line.toString();

                if (vals.startsWith("A:")) {
                    String[] tokenA = PageRankJob.DELIMITER.split(vals.substring(2));
                    mapA.put(Integer.parseInt(tokenA[0]), Float.parseFloat(tokenA[1]));
                }

                if (vals.startsWith("B:")) {
                    String[] tokenB = PageRankJob.DELIMITER.split(vals.substring(2));
                    mapB.put(Integer.parseInt(tokenB[0]), Float.parseFloat(tokenB[1]));
                }
            }

            Iterator iterA = mapA.keySet().iterator();
            while(iterA.hasNext()){
                int idx = iterA.next();
                float A = mapA.get(idx);
                float B = mapB.get(idx);
                pr += A * B;
            }

            context.write(key, new Text(PageRankJob.scaleFloat(pr)));
            // System.out.println(key + ":" + PageRankJob.scaleFloat(pr));
        }

    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();

        String input = path.get("tmp1");
        String output = path.get("tmp2");
        String pr = path.get("input_pr");

        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(PageRank.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(PageRankMapper.class);
         job.setReducerClass(PageRankReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input), new Path(pr));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);

        hdfs.rmr(pr);
        hdfs.rename(output, pr);
    }
}

5). PR标准化: Normal.java

normal-step1

矩阵解释:

  • 对PR的计算结果标准化,让所以PR值落在(0,1)区间

新建文件:Normal.java


package org.conan.myhadoop.pagerank;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.conan.myhadoop.hdfs.HdfsDAO;

public class Normal {

    public static class NormalMapper extends Mapper<LongWritable, Text, Text, Text> {

        Text k = new Text("1");

        @Override
        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            context.write(k, values);
        }
    }

    public static class NormalReducer extends Reducer<Text, Text, Text, Text> {

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {

            List vList = new ArrayList();

            float sum = 0f;
            for (Text line : values) {
                vList.add(line.toString());

                String[] vals = PageRankJob.DELIMITER.split(line.toString());
                float f = Float.parseFloat(vals[1]);
                sum += f;
            }

            for (String line : vList) {
                String[] vals = PageRankJob.DELIMITER.split(line.toString());
                Text k = new Text(vals[0]);

                float f = Float.parseFloat(vals[1]);
                Text v = new Text(PageRankJob.scaleFloat((float) (f / sum)));
                context.write(k, v);

                System.out.println(k + ":" + v);
            }
        }
    }

    public static void run(Map<String, String> path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = PageRankJob.config();

        String input = path.get("input_pr");
        String output = path.get("result");

        HdfsDAO hdfs = new HdfsDAO(PageRankJob.HDFS, conf);
        hdfs.rmr(output);

        Job job = new Job(conf);
        job.setJarByClass(Normal.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        job.setMapperClass(NormalMapper.class);
        job.setReducerClass(NormalReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }
}

6). 启动程序: PageRankJob.java

新建文件:PageRankJob.java


package org.conan.myhadoop.pagerank;

import java.text.DecimalFormat;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Pattern;

import org.apache.hadoop.mapred.JobConf;

public class PageRankJob {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static void main(String[] args) {
        Map<String, String> path = new HashMap<String, String>();
        path.put("page", "logfile/pagerank/page.csv");// 本地的数据文件
        path.put("pr", "logfile/pagerank/pr.csv");// 本地的数据文件

        path.put("input", HDFS + "/user/hdfs/pagerank");// HDFS的目录
        path.put("input_pr", HDFS + "/user/hdfs/pagerank/pr");// pr存储目
        path.put("tmp1", HDFS + "/user/hdfs/pagerank/tmp1");// 临时目录,存放邻接矩阵
        path.put("tmp2", HDFS + "/user/hdfs/pagerank/tmp2");// 临时目录,计算到得PR,覆盖input_pr

        path.put("result", HDFS + "/user/hdfs/pagerank/result");// 计算结果的PR

        try {

            AdjacencyMatrix.run(path);
            int iter = 3;
            for (int i = 0; i < iter; i++) {// 迭代执行
                PageRank.run(path);
            }
            Normal.run(path);

        } catch (Exception e) {
            e.printStackTrace();
        }
        System.exit(0);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(PageRankJob.class);
        conf.setJobName("PageRank");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static String scaleFloat(float f) {// 保留6位小数
        DecimalFormat df = new DecimalFormat("##0.000000");
        return df.format(f);
    }
}

程序代码已上传到github:

https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myhadoop/pagerank

这样就实现了,PageRank的并行吧!接下来,我们就可以用PageRank做一些有意思的应用了。

转载请注明出处:
http://blog.fens.me/algorithm-pagerank-mapreduce/

打赏作者