• Archive by category "架构设计"

Blog Archives

基于Zookeeper的分步式队列系统集成案例

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

zookeeper-case

前言

软件系统集成一直是工业界的一个难题,像10年以上的遗留系统集成,公司收购后的多系统集成,全球性的分步式系统集成等。虽然基于SOA的软件架构,从理论上都可以解决这些集成的问题,但是具体实施过程,有些集成项目过于复杂而失败。

随着技术的创新和发展,对于分步式集群应用的集成,有了更好的开源软件的支持,像zookeeper就是一个不错的分步式协作软件平台。本文将通过一个案例介绍Zookeeper的强大。

目录

  1. 项目背景:分布式消息中间件
  2. 需求分析:业务系统升级方案
  3. 架构设计:搭建Zookeeper的分步式协作平台
  4. 程序开发:基于Zookeeper的程序设计
  5. 程序运行

1. 项目背景:分布式消息中间件

随着Hadoop的普及,越来越多的公司开始构建自己的Hadoop系统。有时候,公司内部的不同部门或不同的团队,都有自己的Hadoop集群。这种多集群的方式,既能让每个团队拥有个性化的Hadoop,又能避免大集群的高度其中化运维难度。当数据量不是特别巨大的时候,小型集群会有很多适用的场合。

当然,多个小型集群也有缺点,就是资源配置可能造成浪费。每个团队的Hadoop集群,都要配有服务器和运维人员。有些能力强的团队,构建的hadoop集群,可以达到真正的个性化要求;而有一些能力比较差的团队,搭建的Hadoop集群性能会比较糟糕。

还有一些时候,多个团队需要共同完成一个任务,比如,A团队通过Hadoop集群计算的结果,交给B团队继续工作,B完成了自己任务再交给C团队继续做。这就有点像业务系统的工作流一样,一环一环地传下去,直到最后一部分完成。

在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB服务器上部署自己的服务,然后通过消息中间件完成调度任务。对于分步式的多个Hadoop集群系统的协作,同样可以用这种架构来做,只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了。

Zookeeper就可以做为 分步式消息中间件,来完成上面的说的业务需求。ZooKeeper是Hadoop家族的一款高性能的分布式协作的产品,是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。Zookeeper的安装和使用,请参考文章 ZooKeeper伪分布式集群安装及使用

ZooKeeper提供分布式协作服务,并不需要依赖于Hadoop的环境。

2. 需求分析:业务系统升级方案

下面我将从一个案例出发,来解释如何进行分步式协作平台的系统设计。

2.1 案例介绍

某大型软件公司,从事领域为供应链管理,主要业务包括了 采购管理、应付账款管理、应收账款管理、供应商反复管理、退货管理、销售管理、库存管理、电子商务、系统集成等。

biz

每块业务的逻辑都很复杂,由单独部门进行软件开发和维护,部门之间的系统没有直接通信需求,每个部门完成自己的功能就行了,最后通过数据库来共享数据,实现各功能之间的数据交换。

zk1

随着业务的发展,客户对响应速度要求越来越高,通过数据库来共享数据的方式,已经达不到信息交换的要求,系统进行了第一次升级,通过企业服务总线(ESB)统一管理公司内部所有业务。通过WebServices发布服务,通过Message Queue实现业务功能的调度。

zk2

公司业务规模继续扩大,跨国收购了多家公司。业务系统从原来的一个机房的集中式部署,变成了全球性的多机房的分步式部署。这时,Message Queue已经不能满足多机房跨地域的业务系统的功能需求了,需要一种分步式的消息中间件解决方案,来代替原有消息中间件的服务。

系统进行了第二次升级,采用Zookeeper作为分步式中间件调度引擎。

zk3

通过上面的描述,我们可以看出,当一个公司从小到大,从国内业务发展到全球性业务的时候。
为了配合业务发展,IT系统也是越来越复杂的,从最早的主从数据库设计,到ESB企业系统总线的扩展,再到分步式ESB配合分步式消息系统,每一次的升级都需要软件技术的支撑。

2.2 功能需求

全球性采购业务和全球性销售业务,让公司在市场中处于竞争优势。但由于采购和销售分别是由不同部门进行的软件开发和维护,而且业务往来也在不同的国家和地区。所以在每月底结算时,工作量都特别大。

比如,计算利润表 (请不要纠结于公式的准确性)

当月利润 = 当月销售金额 - 当月采购金额 - 当月其他支出

这样一个非常简单的计算公式,但对于跨国公司和部门来说,一点也不简单的。

从系统角度来看,采购部门要统计采购数据(海量数据),销售部门统计销售数据((海量数据),其他部门统计的其他费用支出(汇总的少量数据),最后系统计算得到当月的利润。

这里要说明的是,采购系统是单独的系统,销售是另外单独的系统,及以其他几十个大大小小的系统,如何能让多个系统,配合起来做这道计算题呢??

3. 架构设计:搭建Zookeeper的分步式协作平台

接下来,我们基于zookeeper来构建一个分步式队列的应用,来解决上面的功能需求。下面内容,排除了ESB的部分,只保留zookeeper进行实现。

  • 采购数据,为海量数据,基于Hadoop存储和分析。
  • 销售数据,为海量数据,基于Hadoop存储和分析。
  • 其他费用支出,为少量数据,基于文件或数据库存储和分析。

我们设计一个同步队列,这个队列有3个条件节点,分别对应采购(purchase),销售(sell),其他费用(other)3个部分。当3个节点都被创建后,程序会自动触发计算利润,并创建利润(profit)节点。上面3个节点的创建,无顺序要求。每个节点只能被创建一次。

zk4

系统环境

  • 2个独立的Hadoop集群
  • 2个独立的Java应用
  • 3个Zookeeper集群节点

图标解释:

  • Hadoop App1,Hadoop App2 是2个独立的Hadoop集群应用
  • Java App3,Java App4 是2个独立的Java应用
  • zk1,zk2,zk3是ZooKeeper集群的3个连接点
  • /queue,是znode的队列目录,假设队列长度为3
  • /queue/purchase,是znode队列中,1号排对者,由Hadoop App1提交,用于统计采购金额。
  • /queue/sell,是znode队列中,2号排对者,由Hadoop App2提交,用于统计销售金额。
  • /queue/other,是znode队列中,3号排对者,由Java App3提交,用于统计其他费用支出金额。
  • /queue/profit,当znode队列中满了,触发创建利润节点。
  • 当/qeueu/profit被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束。

补充说明:

  • 创建/queue/purchase,/queue/sell,/queue/other目录时,没有前后顺序,程序提交后,/queue目录下会生成对应该子目录
  • App1可以通过zk2提交,App2也可通过zk3提交。原则上,找最近路由最近的znode节点提交。
  • 每个应用不能重复提出,直到3个任务都提交,计算利润的任务才会被执行。
  • /queue/profit被创建后,zk的应用会监听到这个事件,通知应用,队列已完成!

这里的同步队列的架构更详细的设计思路,请参考文章 ZooKeeper实现分布式队列Queue

4. 程序开发:基于Zookeeper的程序设计

最终的功能需求:计算2013年01月的利润。

4.1 实验环境

在真正企业开发时,我们的实验环境应该与需求是一致的,但我的硬件条件有限,因些做了一个简化的环境设置。

  • 把zookeeper的完全分步式部署的3台服务器集群节点的,改为一台服务器上3个集群节点。
  • 把2个独立Hadoop集群,改为一个集群的2个独立的MapReduce任务。

开发环境:

  • Win7 64bit
  • JDK 1.6
  • Maven3
  • Juno Service Release 2
  • IP:192.168.1.10

Zookeeper服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Zookeeper: 3.4.5
  • IP: 192.168.1.201
  • 3个集群节点

Hadoop服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Hadoop: 1.0.3
  • IP: 192.168.1.210

4.2 实验数据

3组实验数据:

  • 采购数据,purchase.csv
  • 销售数据,sell.csv
  • 其他费用数据,other.csv

4.2.1 采购数据集

一共4列,分别对应 产品ID,产品数量,产品单价,采购日期。


1,26,1168,2013-01-08
2,49,779,2013-02-12
3,80,850,2013-02-05
4,69,1585,2013-01-26
5,88,1052,2013-01-13
6,84,2363,2013-01-19
7,64,1410,2013-01-12
8,53,910,2013-01-11
9,21,1661,2013-01-19
10,53,2426,2013-02-18
11,64,2022,2013-01-07
12,36,2941,2013-01-28
13,99,3819,2013-01-19
14,64,2563,2013-02-16
15,91,752,2013-02-05
16,65,750,2013-02-04
17,19,2426,2013-02-23
18,19,724,2013-02-05
19,87,137,2013-01-25
20,86,2939,2013-01-14
21,92,159,2013-01-23
22,81,2331,2013-03-01
23,88,998,2013-01-20
24,38,102,2013-02-22
25,32,4813,2013-01-13
26,36,1671,2013-01-19

//省略部分数据

4.2.2 销售数据集

一共4列,分别对应 产品ID,销售数量,销售单价,销售日期。


1,14,1236,2013-01-14
2,19,808,2013-03-06
3,26,886,2013-02-23
4,23,1793,2013-02-09
5,27,1206,2013-01-21
6,27,2648,2013-01-30
7,22,1502,2013-01-19
8,20,1050,2013-01-18
9,13,1778,2013-01-30
10,20,2718,2013-03-14
11,22,2175,2013-01-12
12,16,3284,2013-02-12
13,30,4152,2013-01-30
14,22,2770,2013-03-11
15,28,778,2013-02-23
16,22,874,2013-02-22
17,12,2718,2013-03-22
18,12,747,2013-02-23
19,27,172,2013-02-07
20,27,3282,2013-01-22
21,28,224,2013-02-05
22,26,2613,2013-03-30
23,27,1147,2013-01-31
24,16,141,2013-03-20
25,15,5343,2013-01-21
26,16,1887,2013-01-30
27,12,2535,2013-01-12
28,16,469,2013-01-07
29,29,2395,2013-03-30
30,17,1549,2013-01-30
31,25,4173,2013-03-17

//省略部分数据

4.2.3 其他费用数据集

一共2列,分别对应 发生日期,发生金额


2013-01-02,552
2013-01-03,1092
2013-01-04,1794
2013-01-05,435
2013-01-06,960
2013-01-07,1066
2013-01-08,1354
2013-01-09,880
2013-01-10,1992
2013-01-11,931
2013-01-12,1209
2013-01-13,1491
2013-01-14,804
2013-01-15,480
2013-01-16,1891
2013-01-17,156
2013-01-18,1439
2013-01-19,1018
2013-01-20,1506
2013-01-21,1216
2013-01-22,2045
2013-01-23,400
2013-01-24,1795
2013-01-25,1977
2013-01-26,1002
2013-01-27,226
2013-01-28,1239
2013-01-29,702
2013-01-30,1396

//省略部分数据

4.3 程序设计

我们要编写5个文件:

  • 计算采购金额,Purchase.java
  • 计算销售金额,Sell.java
  • 计算其他费用金额,Other.java
  • 计算利润,Profit.java
  • Zookeeper的调度,ZookeeperJob.java

4.3.1 计算采购金额

采购金额,是基于Hadoop的MapReduce统计计算。


public class Purchase {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class PurchaseMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class PurchaseReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("purchase");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化purchase
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Purchase.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(PurchaseMapper.class);
        job.setReducerClass(PurchaseReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("purchase", "logfile/biz/purchase.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/purchase");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.2 计算销售金额

销售金额,是基于Hadoop的MapReduce统计计算。


public class Sell {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class SellMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class SellReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("sell");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化sell
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Sell.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(SellMapper.class);
        job.setReducerClass(SellReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("sell", "logfile/biz/sell.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.3 计算其他费用金额

其他费用金额,是基于本地文件的统计计算。


public class Other {

    public static String file = "logfile/biz/other.csv";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    private static String month = "2013-01";

    public static void main(String[] args) throws IOException {
        calcOther(file);
    }

    public static int calcOther(String file) throws IOException {
        int money = 0;
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));

        String s = null;
        while ((s = br.readLine()) != null) {
            // System.out.println(s);
            String[] tokens = DELIMITER.split(s);
            if (tokens[0].startsWith(month)) {// 1月的数据
                money += Integer.parseInt(tokens[1]);
            }
        }
        br.close();

        System.out.println("Output:" + month + "," + money);
        return money;
    }
}

4.3.4 计算利润

利润,通过zookeeper分步式自动调度计算利润。


public class Profit {

    public static void main(String[] args) throws Exception {
        profit();
    }

    public static void profit() throws Exception {
        int sell = getSell();
        int purchase = getPurchase();
        int other = getOther();
        int profit = sell - purchase - other;
        System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
    }

    public static int getPurchase() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
        return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
    }

    public static int getSell() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
        return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
    }

    public static int getOther() throws IOException {
        return Other.calcOther(Other.file);
    }

}

4.3.5 Zookeeper调度

调度,通过构建分步式队列系统,自动化程序代替人工操作。


public class ZooKeeperJob {

    final public static String QUEUE = "/queue";
    final public static String PROFIT = "/queue/profit";
    final public static String PURCHASE = "/queue/purchase";
    final public static String SELL = "/queue/sell";
    final public static String OTHER = "/queue/other";

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.out.println("Please start a task:");
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            doPurchase(zk);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            doSell(zk);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            doOther(zk);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
                    System.out.println("Queue has Completed!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => " + PROFIT);
        zk.exists(PROFIT, true);

        if (zk.exists(QUEUE, false) == null) {
            System.out.println("create " + QUEUE);
            zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(QUEUE + " is exist!");
        }
    }

    public static void doPurchase(ZooKeeper zk) throws Exception {
        if (zk.exists(PURCHASE, false) == null) {

            Purchase.run(Purchase.path());

            System.out.println("create " + PURCHASE);
            zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(PURCHASE + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doSell(ZooKeeper zk) throws Exception {
        if (zk.exists(SELL, false) == null) {

            Sell.run(Sell.path());

            System.out.println("create " + SELL);
            zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(SELL + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doOther(ZooKeeper zk) throws Exception {
        if (zk.exists(OTHER, false) == null) {

            Other.calcOther(Other.file);

            System.out.println("create " + OTHER);
            zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(OTHER + " is exist!");
        }
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws Exception {
        int size = 3;
        List children = zk.getChildren(QUEUE, true);
        int length = children.size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create " + PROFIT);
            Profit.profit();
            zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

            for (String child : children) {// 清空节点
                zk.delete(QUEUE + "/" + child, -1);
            }
        }
    }
}

5. 运行程序

最后,我们运行整个的程序,包括3个部分。

  • zookeeper服务器
  • hadoop服务器
  • 分步式队列应用

5.1 启动zookeeper服务

启动zookeeper服务器集群:


~ cd toolkit/zookeeper345

# 启动zk集群3个节点
~ bin/zkServer.sh start conf/zk1.cfg
~ bin/zkServer.sh start conf/zk2.cfg
~ bin/zkServer.sh start conf/zk3.cfg

~ jps
4234 QuorumPeerMain
5002 Jps
4275 QuorumPeerMain
4207 QuorumPeerMain

查看zookeeper集群中,各节点的状态


# 查看zk1节点状态
~ bin/zkServer.sh status conf/zk1.cfg
JMX enabled by default
Using config: conf/zk1.cfg
Mode: follower

# 查看zk2节点状态,zk2为leader
~ bin/zkServer.sh status conf/zk2.cfg
JMX enabled by default
Using config: conf/zk2.cfg
Mode: leader

# 查看zk3节点状态
~ bin/zkServer.sh status conf/zk3.cfg
JMX enabled by default
Using config: conf/zk3.cfg
Mode: follower

启动zookeeper客户端:


~ bin/zkCli.sh -server 192.168.1.201:2181

# 查看zk
[zk: 192.168.1.201:2181(CONNECTED) 0] ls /
[queue, queue-fifo, zookeeper]

# /queue路径无子目录
[zk: 192.168.1.201:2181(CONNECTED) 1] ls /queue
[]

5.2 启动Hadoop服务


~ hadoop/hadoop-1.0.3
~ bin/start-all.sh

~ jps
25979 JobTracker
26257 TaskTracker
25576 DataNode
25300 NameNode
12116 Jps
25875 SecondaryNameNode

5.3 启动分步式队列ZookeeperJob

5.3.1 启动统计采购数据程序,设置启动参数1

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
copy from: logfile/biz/purchase.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Output:2013-01,9609887
create /queue/purchase
Queue Complete:1/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 3] ls /queue
[purchase]

5.3.2 启动统计销售数据程序,设置启动参数2

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
copy from: logfile/biz/sell.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Output:2013-01,2950315
create /queue/sell
Queue Complete:2/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[purchase, sell]

5.3.3 启动统计其他费用数据程序,设置启动参数3

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Output:2013-01,34193
create /queue/other
Queue Complete:3/3
create /queue/profit
cat: hdfs://192.168.1.210:9000/user/hdfs/biz/sell/output/part-r-00000
2950315

cat: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase/output/part-r-00000
9609887

Output:2013-01,34193
profit = sell - purchase - other = 2950315 - 9609887 - 34193 = -6693765
Queue has Completed!!!

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[profit]

在最后一步,统计其他费用数据程序运行后,从日志中看到3个条件节点都已满足要求。然后,通过同步的分步式队列自动启动了计算利润的程序,并在日志中打印了2013年1月的利润为-6693765。

本文介绍的源代码,已上传到github: https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myzk/hadoop

通过这个复杂的实验,我们成功地用zookeeper实现了分步式队列,并应用到了业务中。当然,实验中也有一些不是特别的严谨的地方,请同学边做边思考。

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

打赏作者

Passport现实社交网络OAuth登陆

从零开始nodejs系列文章,将介绍如何利Javascript做为服务端脚本,通过Nodejs框架web开发。Nodejs框架是基于V8的引擎,是目前速度最快的Javascript引擎。chrome浏览器就基于V8,同时打开20-30个网页都很流畅。Nodejs标准的web开发框架Express,可以帮助我们迅速建立web站点,比起PHP的开发效率更高,而且学习曲线更低。非常适合小型网站,个性化网站,我们自己的Geek网站!!

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/nodejs-oauth-passport/

nodejs-passport-oauth

前言

随着社交网络的发展,开发一个应用门槛越来越低。从一个完整的应用系统,到一个部署在社交网络平台的APP;从数据库–》应用层–》展示层,变成只需要开发展示层。

很多的社交应用,甚至都放弃了用户注册!仅靠大型社交网站的登陆授权,就可以赚到100W以上的用户量。。。

减少用户管理代码开发及维护,更专注于应用本身,个人开发者已经崛起!!

目录

  1. Passport介绍
  2. OAuth介绍
  3. 登陆Github
  4. 登陆LinkedIn

1. Passport介绍

Passport项目,主要是为了解决登陆认证的问题。

Web应用一般有2种登陆认证的形式:

  • 用户名和密码认证登陆
  • OAuth认证登陆

在上一篇文章中,我们介绍了Passport的项目,通过用户名和密码认证登陆。Express结合Passport实现登陆认证

本文将介绍,通过Passport实现OAuth登陆认证。

2. OAuth介绍

OAuth协议为用户资源的授权提供了一个安全的、开放而又简易的标准。与以往的授权方式不同之处是OAuth的授权不会使第三方触及到用户的帐号信息(如用户名与密码),即第三方无需使用用户的用户名与密码就可以申请获得该用户资源的授权,因此OAuth是安全的。

  • 简单:不管是OAUTH服务提供者还是应用开发者,都很易于理解与使用
  • 安全:没有涉及到用户密钥等信息,更安全更灵活
  • 开放:任何服务提供商都可以实现OAUTH,任何软件开发商都可以使用OAUTH

OAuth认证授权就三个步骤,三句话可以概括:

  • 获取未授权的Request Token
  • 获取用户授权的Request Token
  • 用授权的Request Token换取Access Token

OAuth的介绍,摘自:http://baike.baidu.com/view/3948029.htm

3. Github登陆

通过Passport实现Github登陆。我们还使用上一篇文章中的环境:Express结合Passport实现登陆认证

  • 申请开发github应用
  • Passport程序实现
  • 运行Github登陆认证

1). 申请开发github应用

github-app

2). Passport程序实现

  • 安装Passport的github扩展
  • 增加Github认证策略
  • 定义Github认证的路由配置: 登陆,回调,展示

a. 安装Passport的github扩展


D:\workspace\javascript\nodejs-passport>npm install passport-github

b. 增加Github认证策略

修改app.js


passport.use(new GithubStrategy({//对应从Github申请KEY
    clientID: "XXXX",
    clientSecret: "XXXX",
    callbackURL: "http://localhost:3000/auth/github/callback"
},function(accessToken, refreshToken, profile, done) {
    done(null, profile);
}));

c. 定义Github认证的路由配置: 登陆,回调,展示

  • /auth/github: 通过github,登陆
  • /auth/github/callback: github认证成功后,回调
  • /github: 回调验证后,转向展示示

修改app.js


app.all('/github', isLoggedIn);
app.get("/github",user.github);

app.get("/auth/github", passport.authenticate("github",{ scope : "email"}));
app.get("/auth/github/callback",
    passport.authenticate("github",{
        successRedirect: '/github',
        failureRedirect: '/'
    }));

3).运行Github登陆认证

github-auth

程序日志


D:\workspace\javascript\nodejs-passport>node app.js
Express server listening on port 3000
GET / 200 401ms - 594b
GET /stylesheets/style.css 304 9ms
GET /auth/github 302 8ms - 424b
GET /auth/github/callback?code=7cf818c4590e2aacfe90 302 4052ms - 70b
GET /github 200 4ms - 139b
GET /logout 302 2ms - 58b
GET / 200 3ms - 594b
GET /stylesheets/style.css 304 2ms

4. LinkedIn登陆

  • 申请开发LinkedIn应用
  • Passport程序实现
  • 运行LinkedIn登陆认证

1). 申请开发LinkedIn应用

linkedin-app

2). Passport程序实现

  • 安装Passport的LinkedIn扩展
  • 增加LinkedIn认证策略
  • 定义LinkedIn认证的路由配置: 登陆,回调,展示

a. 安装Passport的LinkedIn扩展


D:\workspace\javascript\nodejs-passport>npm install passport-linkedin

b. 增加LinkedIn认证策略

修改app.js


passport.use(new LinkedinStrategy({
    consumerKey: "XXXX",
    consumerSecret: "XXXX",
    callbackURL: "http://localhost:3000/auth/linkedin/callback",
    userAgent: 'localhost'
},function(accessToken, refreshToken, profile, done) {
    done(null, profile);
}));

c. 定义LinkedIn认证的路由配置: 登陆,回调,展示

  • /auth/linkedin: 通过LinkedIn,登陆
  • /auth/linkedin/callback: github认证成功后,回调
  • /linkedin: 回调验证后,转向展示示

修改app.js


app.all('/github', isLoggedIn);
app.get("/github",user.github);

app.all('/linkedin', isLoggedIn);
app.get("/linkedin",user.linkedin);
app.get("/auth/linkedin", passport.authenticate("linkedin",{}));
app.get("/auth/linkedin/callback",
    passport.authenticate("linkedin",{
        successRedirect: '/linkedin',
        failureRedirect: '/'
    }));

3).运行LinkedIn登陆认证

linkedin-auth

程序日志


D:\workspace\javascript\nodejs-passport>node app.js
Express server listening on port 3000
GET / 200 399ms - 638b
GET /stylesheets/style.css 304 4ms
GET /auth/Linkedin 302 3092ms - 256b
GET /auth/linkedin/callback?oauth_token=75--8f032180-afae-489b-bc3c-3326d80bea6f&oauth_verifier=36091 302 3049ms - 74b
GET /linkedin 200 5ms - 76b
GET /logout 302 1ms - 58b
GET / 200 19ms - 638b
GET /stylesheets/style.css 304 2ms

5. 完整的应用

项目代码我已上传到了github, 项目地址:https://github.com/bsspirit/nodejs-passport

下载及安装


git clone https://github.com/bsspirit/nodejs-passport
npm install

我们非常方便地,实现了Github和LinkedIn登陆认证,是否已经体会到Passport的强大了!把权限这种基础功能,进行合理的封装,是会帮我们节省大量的工作量的。

转载请注明出处:
http://blog.fens.me/nodejs-oauth-passport/

打赏作者

快速搭建Web环境 Angularjs + Express3 + Bootstrap3

AngularJS体验式编程系列文章,将介绍如何用angularjs构建一个强大的web前端系统。angularjs是由Google团队开发的一款非常优秀web前端框架。在当前如此多的web框架下,angularjs能脱颖而出,从架构设计上就高人一等,双向数据绑定,依赖注入,指令,MVC,模板。Angular.js创新地把后台技术融入前端开发,扫去jQuery一度的光芒。用angularjs就像写后台代码,更规范,更结构化,更可控。

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/angularjs-express3-bootstrap3/

angular-basic

前言

Angularjs越用越顺手,不仅代码量比jQuery少很多,而且实现思路特别清晰,构建大型的Web前端项目,真是最适合不过了。

Bootstrap让界面美观大方,就连像我这种不懂UE的人,也能做出专业级的水准。再结合Nodejs的Express做后端,三剑合并,太无敌了,大有统一前端开发的趋势,前途不可估量!

目录

  1. 从零开始手工创建Express3项目
  2. 新建Angularjs目录及文件
  3. 配置bower
  4. 配置Angularjs项目
  5. 增加Bootstrap
  6. 完整的项目

1. 从零开始手工创建Express3项目

系统环境:

  • Win7 64bit 旗舰版
  • node v0.10.5
  • npm 1.2.19
  • bower 1.1.2

本文截图中使用的开发工具是WebStorm,请参考文章:AngularJS最理想开发工具WebStorm

创建express项目


~ D:\workspace\javascript>express -e angular-basic
~ D:\workspace\javascript>cd angular-basic && npm install

生成的express项目目录

express

修改app.js的配置

  • 修改ejs: 文件扩展名ejs为html
  • 设置angular: 启动路径为”/”
  • 设置angular: 启动文件为app/index.html

~ vi app.js

var express = require('express')
    , path = require('path')
    , ejs = require('ejs')
    , app = express()
    , server = require('http').createServer(app);

app.set('port', process.env.PORT || 3000);
app.set('views', __dirname + '/views');
app.engine('.html', ejs.__express);
app.set('view engine', 'html'); //替换文件扩展名ejs为html
app.use(express.favicon());
app.use(express.logger('dev'));
app.use(express.bodyParser());
app.use(express.methodOverride());
app.use(app.router);
app.use(express.static(path.join(__dirname, 'app')));

if (app.get('env') === 'development') {
    app.use(express.errorHandler());
}

// angular启动页
app.get('/', function (req, res) {
    res.sendfile('app/index.html');
});

server.listen(app.get('port'), function () {
    console.log('Express server listening on port ' + app.get('port'));
});

如何单独创建Express3的项目,请参考文章:Nodejs开发框架Express3.0开发手记–从零开始

2. 新建Angularjs目录及文件

创建Angularjs需要的目录及文件


D:\workspace\javascript\angular-basic>mkdir app
D:\workspace\javascript\angular-basic>mkdir app\scripts
D:\workspace\javascript\angular-basic>mkdir app\scripts\angular
D:\workspace\javascript\angular-basic>mkdir app\styles
D:\workspace\javascript\angular-basic>mkdir app\views
D:\workspace\javascript\angular-basic>mkdir app\views\component
D:\workspace\javascript\angular-basic>mkdir app\views\tpl

D:\workspace\javascript\angular-basic>touch app\index.html
D:\workspace\javascript\angular-basic>touch app\scripts\angular\app.js
D:\workspace\javascript\angular-basic>touch app\scripts\angular\controllers.js
D:\workspace\javascript\angular-basic>touch app\styles\main.css
D:\workspace\javascript\angular-basic>touch app\views\tpl\welcome.html

D:\workspace\javascript\angular-basic>echo "aaaa" > app\index.html

创建的Angularjs目录及文件

angular

目录解释:

  • app目录: Angular项目的根目录
  • scripts目录: 存放Javascript脚本目录
  • scripts\angular目录: 存放Angular Javascript脚本目录
  • styles目录: 存放css的目录
  • views目录: 存放html的目录
  • views\component目录: 存放html的自定义组件目录
  • views\tpl目录: 存放html的目录

文件解释:

  • app\index.html: Angular项目的入口文件
  • styles\main.css: Angular项目的css文件
  • scripts\angular\app.js: Angular项目全局配置文件
  • scripts\angular\controllers.js: Angular项目全局控制器文件/li>
  • views\tpl\welcome.html: 欢迎页

删除不需要的文件目录


D:\workspace\javascript\angular-basic>rm -rf public
D:\workspace\javascript\angular-basic>rm -rf routes

启动node服务器,检查入口页的配置


D:\workspace\javascript\angular-basic>node app.js
Express server listening on port 3000
GET / 200 11ms - 9b

indexpage

界面显示”aaaa”,说明node启动express,已经指向到app\index.html的页面。

3. 配置bower

接下来,我要通过bower来安装Angularjs和Bootstrap,以及其他依赖的前端库。关于bower的详细使用,请参考文章:bower解决js的依赖管理

新建文件:

    • .bowerrc: bower的环境设置,用于指定bower的依赖库的存放位置
    • bower.json: bower的依赖管理

新建文件: .bowerrc


~ vi .bowerrc

{
    "directory": "app/bower_components"
}

新建文件: bower.json


~ vi bower.json

{
    "name": "angular-basic",
    "version": "0.0.1",
    "dependencies": {
        "angular": "~1.2.12-build.2226",
        "angular-route": "~1.2.12-build.2226"
    },
    "devDependencies": {
    }
}

运行bower,下载Angular依赖库


D:\workspace\javascript\angular-basic>bower install
bower angular-route#~1.2.12-build.2226           cached git://github.com/angular/bower-angular-route.git#1.2.13-build.2242
bower angular-route#~1.2.12-build.2226         validate 1.2.13-build.2242 against git://github.com/angular/bower-angular-route.git#~1.2.12-build.2226
bower angular#~1.2.12-build.2226                 cached git://github.com/angular/bower-angular.git#1.2.13-build.2242
bower angular#~1.2.12-build.2226               validate 1.2.13-build.2242 against git://github.com/angular/bower-angular.git#~1.2.12-build.2226
bower angular#1.2.13-build.2242+sha.e645f7c      cached git://github.com/angular/bower-angular.git#1.2.13-build.2242
bower angular#1.2.13-build.2242+sha.e645f7c    validate 1.2.13-build.2242 against git://github.com/angular/bower-angular.git#1.2.13-build.2242+sha.e645f7c
bower angular-route#~1.2.12-build.2226          install angular-route#1.2.13-build.2242
bower angular#~1.2.12-build.2226                install angular#1.2.13-build.2242

angular-route#1.2.13-build.2242 app\bower_components\angular-route
└── angular#1.2.13-build.2242

angular#1.2.13-build.2242 app\bower_components\angular

下载最新版本的angular和angular-route,类库存放在app/bower_components目录下面。

bower-angular

4. 配置Angularjs项目

  • 修改index.html: 在入口文件,页面模板
  • 修改welcome.html: 欢迎页
  • 修改app.js: 全局配置
  • 修改controller.js: 控制器

修改index.html


<!DOCTYPE html>
<html lang="zh-cn">
<head>
<meta charset="utf-8">
<title>Angular-basic</title>
<meta name="description" content="Copyright http://blog.fens.me">
<link rel="stylesheet" href="styles/main.css">
</head>
<body ng-app="app">

<ul>
<li><a href="http://blog.fens.me/angularjs-express3-bootstrap3/">快速搭建Web环境 Angularjs + Express3 + Bootstrap3</a></li>
<li>http://blog.fens.me/angularjs-express3-bootstrap3</li>
</ul>

<div ng-view></div>

<script src="bower_components/angular/angular.min.js"></script>
<script src="bower_components/angular-route/angular-route.min.js"></script>
<script src="scripts/angular/app.js"></script>
<script src="scripts/angular/controllers.js"></script>

</body>
</html>

修改welcome.html


Welcome {{ username }}

修改app.js


'use strict';

var app = angular.module('app', ['ngRoute']);

app.config(['$routeProvider', '$locationProvider', function ($routeProvider, $locationProvider) {
    $routeProvider
        .when('/', {templateUrl: '/views/tpl/welcome.html', controller: 'WelcomeCtrl'})
        .otherwise({redirectTo: '/'});
    $locationProvider.html5Mode(true);
}]);

修改controller.js


'use strict';

function WelcomeCtrl($scope){
    $scope.username = 'Conan_Z';
}

重新启动node,查看Angular项目。


D:\workspace\javascript\angular-basic>node app.js
Express server listening on port 3000
GET / 304 8ms
GET /styles/main.css 304 3ms
GET /scripts/angular/app.js 304 6ms
GET /scripts/angular/controllers.js 304 11ms
GET /bower_components/angular-route/angular-route.min.js 200 18ms - 3.82kb
GET /bower_components/angular/angular.min.js 200 19ms - 98.03kb
GET /views/tpl/welcome.html 304 9ms
GET /bower_components/angular-route/angular-route.min.js.map 200 21ms - 9.61kb
GET /bower_components/angular/angular.min.js.map 200 26ms - 264.16kb

界面显示:

angular-start

index.html中配置的链接已经显示,同时welcome.html页面中配置的Welcome Conan_Z,也显示出来了。关于路由和模板配置,请参考文章:AngularJS路由和模板

5. 增加Bootstrap

接下来,增加Bootstrap-v3,让界面好看起来。我们还是有bower来管理Bootstrap的依赖。

通过命令行,增加类库,并写入的bower.json文件


D:\workspace\javascript\angular-basic>bower install bootstrap --save
D:\workspace\javascript\angular-basic>bower install angular-bootstrap --save
  • 修改index.html: 增加css, js的引用
  • 修改welcome.html: 增加bootstrap的效果

修改index.html


<!DOCTYPE html>
<html lang="zh-cn">
<head>
<meta charset="utf-8">
<title>Angular-basic</title>
<meta name="description" content="Copyright http://blog.fens.me">
<link rel="stylesheet" href="bower_components/bootstrap/dist/css/bootstrap.min.css">
<link rel="stylesheet" href="styles/main.css">
</head>
<body ng-app="app">

<div class="container">
<h2 class="text-primary">
<a href="http://blog.fens.me/angularjs-express3-bootstarp3/">快速搭建Web环境 Angularjs + Express3 + Bootstarp3</a>
</h2>
<p>http://blog.fens.me/angularjs-express3-bootstarp3</p>

<div class="row">
<div class=".col-lg-12">
<div ng-view></div>
</div>
</div>
</div>

<script src="bower_components/angular/angular.min.js"></script>
<script src="bower_components/bootstrap/dist/js/bootstrap.min.js"></script>
<script src="bower_components/angular-route/angular-route.min.js"></script>
<script src="bower_components/angular-bootstrap/ui-bootstrap-tpls.min.js"></script>
<script src="scripts/angular/app.js"></script>
<script src="scripts/angular/controllers.js"></script>

</body>
</html>

修改welcome.html


<hr/>
<form class="form-inline" role="form">
<div class="form-group">
<label>Welcome</label>
<input type="text" class="form-control" ng-model="username" placeholder="Enter email">
</div>
</form>
<p> {{ username }}</p>

angular-bootstrap

这样就用手动的方式的搭建了:Angularjs + Express3 + Bootstrap3的组合。

6. 完整的项目

项目代码已上传的github,项目地址: https://github.com/bsspirit/angular-basic

项目下载及安装


git clone https://github.com/bsspirit/angular-basic
npm install
bower install

项目运行


node app.js

当然,对于大型的Angular项目,我们可以选择用Yeoman的种子构建,请参考文章:AngularJS从构建项目开始

但有时Yeoman的项目,更新不够及时,比如bootstrap已到v3了,种子项目还是bootstrap的v2,而且v3不兼容v2。这时也许手动构建自己的项目,才是更好的选择。

转载请注明出处:
http://blog.fens.me/angularjs-express3-bootstrap3/

打赏作者

Async多任务时间管理

从零开始nodejs系列文章,将介绍如何利Javascript做为服务端脚本,通过Nodejs框架web开发。Nodejs框架是基于V8的引擎,是目前速度最快的Javascript引擎。chrome浏览器就基于V8,同时打开20-30个网页都很流畅。Nodejs标准的web开发框架Express,可以帮助我们迅速建立web站点,比起PHP的开发效率更高,而且学习曲线更低。非常适合小型网站,个性化网站,我们自己的Geek网站!!

关于作者

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/nodejs-async-timer/

nodejs-async-timer

前言

做服务器端开发时,经常会遇到时间管理的功能需求,比如每2秒刷新一次,每三分钟做一次统计计算,周一至周五9点30启动一个定时任务等等。

很多时候我们会把这些定时任务,交给linux系统的Crontab来实现。不过,有时为了增加系统的灵活性,我们需要在服务器后台实现。

对于单线程的Nodejs,如何控制多任务的时间管理呢?

目录

  1. 需求描述
  2. Nodejs的实现方案setInterval
  3. Async多任务时间管理

1. 需求描述

基于Nodejs的express3构建的web框架,需要在周一至周五,早上9点15分时,分别启动程序A和程序B,程序C。下午16点程序A,B,C停止。

程序A: 每1秒去redis取数据一次,保留在Nodejs的全局变量G中。
程序B: 每10秒去mysql取数据一次,通过websocket直接访问给客户端。
程序C: 每5秒对全局变量G,进行平均值计算,然后通过websocket直接访问给客户端。

2. Nodejs的实现方案setInterval

系统环境:

  • win7 64bit
  • Nodejs:v0.10.5
  • Npm:1.2.19

出初化项目:


~ cd D:\workspace\javascript\nodejs-async\demo
~ express -e timers
~ cd timers && npm install
~ npm install moment
~ npm install twix

编辑文件:app.js,在文件最面下增加新代码


...

//moment,twix时间工具
var  moment = require('moment')
    ,twix = require('twix');

//判断程序启动时间
function isTime(){
    var hms = 'HHmmss';
    return moment("091500",hms).twix(moment("160000",hms)).contains(moment());
}

//打印日志
if(isTime()){
    console.log("===============Working time===================");
}

//日志时间格式化
function now() {
    return moment().format("HH:mm:ss");
}

//全局变量G
var G = 0;

//模拟程序A
function A() {
    console.log(now() + " A(s1)=> {G:" + (G++) + "} Cache G");
}

//模拟程序B
function B() {
    console.log(now() + " B(s10)=> {B:10} TO client");
}

//模拟程序C
function C() {
    console.log(now() + " C(s5)=> {G:" + (G / 5) + "} TO client");
    G = 0;
}

//分别对A,B,C程序进行时间管理
setInterval(function () {
    if(isTime()){
        A()
    };
}, 1000);

setInterval(function () {
    if(isTime()){
        C();
    }
}, 5 * 1000);

setInterval(function () {
    if(isTime()) {
        B();
    }
}, 10 * 1000);

运行nodejs,查看日志输出


D:\workspace\javascript\nodejs-async\demo\timers>node app.js
===============Working time===================
Express server listening on port 3000
15:02:24 A(s1)=> {G:0} Cache G
15:02:25 A(s1)=> {G:1} Cache G
15:02:26 A(s1)=> {G:2} Cache G
15:02:27 A(s1)=> {G:3} Cache G
15:02:28 A(s1)=> {G:4} Cache G
15:02:28 C(s5)=> {G:1} TO client
15:02:29 A(s1)=> {G:0} Cache G
15:02:30 A(s1)=> {G:1} Cache G
15:02:31 A(s1)=> {G:2} Cache G
15:02:32 A(s1)=> {G:3} Cache G
15:02:33 A(s1)=> {G:4} Cache G
15:02:33 C(s5)=> {G:1} TO client
15:02:33 B(s10)=> {B:10} TO client
15:02:34 A(s1)=> {G:0} Cache G
15:02:35 A(s1)=> {G:1} Cache G
15:02:36 A(s1)=> {G:2} Cache G
15:02:37 A(s1)=> {G:3} Cache G
15:02:38 A(s1)=> {G:4} Cache G
15:02:38 C(s5)=> {G:1} TO client
15:02:39 A(s1)=> {G:0} Cache G
15:02:40 A(s1)=> {G:1} Cache G
15:02:41 A(s1)=> {G:2} Cache G
15:02:42 A(s1)=> {G:3} Cache G
  • 程序A,每1秒运行一次,给G+1。
  • 程序B,每10秒运行一次,输出到客户端。
  • 程序C,每5秒运行一次,取G的平均值,给G赋值为0,输出到客户端。

虽然完成了功能需求,但是代码不美观!如果再增加任务D,E,F….代码不好维护。

3. Async多任务时间管理

下面我们用async包,对上面的浏览进行封装。

Async包函数的详细介绍,请参考文章:Nodejs异步流程控制Async

安装async依赖


~ npm install async

新建一个文件:async-app.js,


~ cp app.js async-app.js

也就是重新复制一份app.js, 把setInterval的函数都去掉。

增加async的时间管理代码


var async = require('async');

var arr = [
    {fun: A, delay: 1000, test: isTime},
    {fun: B, delay: 10 * 1000, test: isTime},
    {fun: C, delay: 5 * 1000, test: isTime}
];

async.each(arr, function (item, callback) {
    async.whilst(item.test,function(cb) {
            item.fun();
            setTimeout(cb, item.delay);
        },function(err) {
            console.log("Not working time!");
        }
    );
}, function (err) {
    log('Error: ' + err);
});

运行nodejs,查看日志输出


D:\workspace\javascript\nodejs-async\demo\timers>node async-app.js
===============Working time===================
15:13:57 A(s1)=> {G:0} Cache G
15:13:57 B(s10)=> {B:10} TO client
15:13:57 C(s5)=> {G:0.2} TO client
Express server listening on port 3000
15:13:58 A(s1)=> {G:0} Cache G
15:13:59 A(s1)=> {G:1} Cache G
15:14:00 A(s1)=> {G:2} Cache G
15:14:01 A(s1)=> {G:3} Cache G
15:14:02 C(s5)=> {G:0.8} TO client
15:14:02 A(s1)=> {G:0} Cache G
15:14:03 A(s1)=> {G:1} Cache G
15:14:04 A(s1)=> {G:2} Cache G
15:14:05 A(s1)=> {G:3} Cache G
15:14:06 A(s1)=> {G:4} Cache G
15:14:07 B(s10)=> {B:10} TO client
15:14:07 C(s5)=> {G:1} TO client
15:14:07 A(s1)=> {G:0} Cache G
15:14:08 A(s1)=> {G:1} Cache G
15:14:09 A(s1)=> {G:2} Cache G
15:14:10 A(s1)=> {G:3} Cache G
15:14:11 A(s1)=> {G:4} Cache G
15:14:12 C(s5)=> {G:1} TO client
15:14:12 A(s1)=> {G:0} Cache G
  • 构建一个arr数组,封装调用的A,B,C的参数。
  • 使用async.each函数,对arr的item异步并行
  • 使用async.whilst函数,对任务启动时间进行判断,并根据delay运行任务

程序运行结果与setInterval一样。而代码更利于维护,一旦需要增减任务,简单地修改arr的数组就行了,其他的代码都不用动!!

很难想象,其他的语言能像js一样,有如此之重构的能力。越来越感到惊喜!

转载请注明出处:
http://blog.fens.me/nodejs-async-timer/

打赏作者

Hive学习路线图

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-hive-roadmap/

hadoop-hive-roadmap-title

前言

Hive是Hadoop家族中一款数据仓库产品,Hive最大的特点就是提供了类SQL的语法,封装了底层的MapReduce过程,让有SQL基础的业务人员,也可以直接利用Hadoop进行大数据的操作。就是这一个点,解决了原数据分析人员对于大数据分析的瓶颈。

让我们把Hive的环境构建起来,帮助非开发人员也能更好地了解大数据。

目录

  1. Hive介绍
  2. Hive学习路线图
  3. 我的使用经历
  4. Hive的使用案例

1. Hive介绍

Hive起源于Facebook,它使得针对Hadoop进行SQL查询成为可能,从而非程序员也可以方便地使用。Hive是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张数据库表,并提供完整的SQL查询功能,可以将SQL语句转换为MapReduce任务运行。

Hive是建立在 Hadoop 上的数据仓库基础构架。它提供了一系列的工具,可以用来进行数据提取转化加载(ETL),这是一种可以存储、查询和分析存储在 Hadoop 中的大规模数据的机制。Hive 定义了简单的类 SQL 查询语言,称为 HQL,它允许熟悉 SQL 的用户查询数据。同时,这个语言也允许熟悉 MapReduce 开发者的开发自定义的 mapper 和 reducer 来处理内建的 mapper 和 reducer 无法完成的复杂的分析工作。

详细地Hive的安装和使用介绍,请参考文章:Hive安装及使用攻略

2. Hive学习路线图

hadoop-hive-roadmap

Hive的知识点,我已经列在图中,希望帮助其他人更好的了解Hive。

接下来,是我的使用经历,谁都没有捷径。把心踏实下来,就不那么难了。

3. 我的使用经历

我使用Hive有两个考虑:

  • 1. 帮助无开发经验的数据分析人员,有能力处理大数据
  • 2. 构建标准化的MapReduce开发过程

1). 帮助无开发经验的数据分析人员,有能力处理大数据

完全符合与Hive的设计理念,一直在强调,无需多言。

2). 构建标准化的MapReduce开发过程

这个方面是我们需要努力的方向。

首先,Hive已经用类SQL的语法封装了MapReduce过程,这个封装过程就是MapReduce的标准化的过程。

我们在做业务或者工具时,会针对场景用逻辑封装,这是第二层封装是在Hive之上的封装。在第二层封装时,我们要尽可能多的屏蔽Hive的细节,让接口单一化,低少灵活性,再次精简HQL的语法结构。只满足我们的系统要求,专用的接口。

在使用二次封装的接口时,我们已经可以不用知道Hive是什么, 更不用知道Hadoop是什么。我们只需要知道,SQL查询(SQL92标准),怎么写效率高,怎么写可以完成业务需要就可以了。

当我们完成了Hive的二次封装后,我们可以构建标准化的MapReduce开发过程。

hive-architect-2

通过上图的思路,我们可以统一企业内部各种应用对于Hive的依赖,并且当人员素质升高后,有可以剥离Hive,用更优秀的底层解决方案来替换,如果封装的接口的不变,甚至替换Hive时业务使用都不知道,我们已经替换了Hive。

这个过程是需要经历的,也是有意义的。当我在考虑构建Hadoop分析工具时,以Hive作为Hadoop访问接口是最有效的。

3). 有关Hive的运维:
因为Hive是基于Hadoop构建的,简单地说就是一套Hadoop的访问接口,Hive本身并没有太多的东西,所以运维上面我们注意下面几个问题就行了。

  • 1. 使用单独的数据库存储元数据
  • 2. 定义合理的表分区和键
  • 3. 设置合理的bucket数据量
  • 4. 进行表压缩
  • 5. 定义外部表使用规范
  • 6. 合理的控制Mapper, Reducer数量

4. Hive的使用案例

已经整理成文章的案例

相关文章:
Hadoop家族产品学习路线图

转载请注明出处:
http://blog.fens.me/hadoop-hive-roadmap/

打赏作者