• Posts tagged "zookeeper"

Blog Archives

基于Zookeeper的分步式队列系统集成案例

Hadoop家族系列文章,主要介绍Hadoop家族产品,常用的项目包括Hadoop, Hive, Pig, HBase, Sqoop, Mahout, Zookeeper, Avro, Ambari, Chukwa,新增加的项目包括,YARN, Hcatalog, Oozie, Cassandra, Hama, Whirr, Flume, Bigtop, Crunch, Hue等。

从2011年开始,中国进入大数据风起云涌的时代,以Hadoop为代表的家族软件,占据了大数据处理的广阔地盘。开源界及厂商,所有数据软件,无一不向Hadoop靠拢。Hadoop也从小众的高富帅领域,变成了大数据开发的标准。在Hadoop原有技术基础之上,出现了Hadoop家族产品,通过“大数据”概念不断创新,推出科技进步。

作为IT界的开发人员,我们也要跟上节奏,抓住机遇,跟着Hadoop一起雄起!

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

zookeeper-case

前言

软件系统集成一直是工业界的一个难题,像10年以上的遗留系统集成,公司收购后的多系统集成,全球性的分步式系统集成等。虽然基于SOA的软件架构,从理论上都可以解决这些集成的问题,但是具体实施过程,有些集成项目过于复杂而失败。

随着技术的创新和发展,对于分步式集群应用的集成,有了更好的开源软件的支持,像zookeeper就是一个不错的分步式协作软件平台。本文将通过一个案例介绍Zookeeper的强大。

目录

  1. 项目背景:分布式消息中间件
  2. 需求分析:业务系统升级方案
  3. 架构设计:搭建Zookeeper的分步式协作平台
  4. 程序开发:基于Zookeeper的程序设计
  5. 程序运行

1. 项目背景:分布式消息中间件

随着Hadoop的普及,越来越多的公司开始构建自己的Hadoop系统。有时候,公司内部的不同部门或不同的团队,都有自己的Hadoop集群。这种多集群的方式,既能让每个团队拥有个性化的Hadoop,又能避免大集群的高度其中化运维难度。当数据量不是特别巨大的时候,小型集群会有很多适用的场合。

当然,多个小型集群也有缺点,就是资源配置可能造成浪费。每个团队的Hadoop集群,都要配有服务器和运维人员。有些能力强的团队,构建的hadoop集群,可以达到真正的个性化要求;而有一些能力比较差的团队,搭建的Hadoop集群性能会比较糟糕。

还有一些时候,多个团队需要共同完成一个任务,比如,A团队通过Hadoop集群计算的结果,交给B团队继续工作,B完成了自己任务再交给C团队继续做。这就有点像业务系统的工作流一样,一环一环地传下去,直到最后一部分完成。

在业务系统中,我们经常会用SOA的架构来解决这种问题,每个团队在ESB服务器上部署自己的服务,然后通过消息中间件完成调度任务。对于分步式的多个Hadoop集群系统的协作,同样可以用这种架构来做,只要把消息中间件引擎换成支持分步式的消息中间件的引擎就行了。

Zookeeper就可以做为 分步式消息中间件,来完成上面的说的业务需求。ZooKeeper是Hadoop家族的一款高性能的分布式协作的产品,是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。Zookeeper的安装和使用,请参考文章 ZooKeeper伪分布式集群安装及使用

ZooKeeper提供分布式协作服务,并不需要依赖于Hadoop的环境。

2. 需求分析:业务系统升级方案

下面我将从一个案例出发,来解释如何进行分步式协作平台的系统设计。

2.1 案例介绍

某大型软件公司,从事领域为供应链管理,主要业务包括了 采购管理、应付账款管理、应收账款管理、供应商反复管理、退货管理、销售管理、库存管理、电子商务、系统集成等。

biz

每块业务的逻辑都很复杂,由单独部门进行软件开发和维护,部门之间的系统没有直接通信需求,每个部门完成自己的功能就行了,最后通过数据库来共享数据,实现各功能之间的数据交换。

zk1

随着业务的发展,客户对响应速度要求越来越高,通过数据库来共享数据的方式,已经达不到信息交换的要求,系统进行了第一次升级,通过企业服务总线(ESB)统一管理公司内部所有业务。通过WebServices发布服务,通过Message Queue实现业务功能的调度。

zk2

公司业务规模继续扩大,跨国收购了多家公司。业务系统从原来的一个机房的集中式部署,变成了全球性的多机房的分步式部署。这时,Message Queue已经不能满足多机房跨地域的业务系统的功能需求了,需要一种分步式的消息中间件解决方案,来代替原有消息中间件的服务。

系统进行了第二次升级,采用Zookeeper作为分步式中间件调度引擎。

zk3

通过上面的描述,我们可以看出,当一个公司从小到大,从国内业务发展到全球性业务的时候。
为了配合业务发展,IT系统也是越来越复杂的,从最早的主从数据库设计,到ESB企业系统总线的扩展,再到分步式ESB配合分步式消息系统,每一次的升级都需要软件技术的支撑。

2.2 功能需求

全球性采购业务和全球性销售业务,让公司在市场中处于竞争优势。但由于采购和销售分别是由不同部门进行的软件开发和维护,而且业务往来也在不同的国家和地区。所以在每月底结算时,工作量都特别大。

比如,计算利润表 (请不要纠结于公式的准确性)

当月利润 = 当月销售金额 - 当月采购金额 - 当月其他支出

这样一个非常简单的计算公式,但对于跨国公司和部门来说,一点也不简单的。

从系统角度来看,采购部门要统计采购数据(海量数据),销售部门统计销售数据((海量数据),其他部门统计的其他费用支出(汇总的少量数据),最后系统计算得到当月的利润。

这里要说明的是,采购系统是单独的系统,销售是另外单独的系统,及以其他几十个大大小小的系统,如何能让多个系统,配合起来做这道计算题呢??

3. 架构设计:搭建Zookeeper的分步式协作平台

接下来,我们基于zookeeper来构建一个分步式队列的应用,来解决上面的功能需求。下面内容,排除了ESB的部分,只保留zookeeper进行实现。

  • 采购数据,为海量数据,基于Hadoop存储和分析。
  • 销售数据,为海量数据,基于Hadoop存储和分析。
  • 其他费用支出,为少量数据,基于文件或数据库存储和分析。

我们设计一个同步队列,这个队列有3个条件节点,分别对应采购(purchase),销售(sell),其他费用(other)3个部分。当3个节点都被创建后,程序会自动触发计算利润,并创建利润(profit)节点。上面3个节点的创建,无顺序要求。每个节点只能被创建一次。

zk4

系统环境

  • 2个独立的Hadoop集群
  • 2个独立的Java应用
  • 3个Zookeeper集群节点

图标解释:

  • Hadoop App1,Hadoop App2 是2个独立的Hadoop集群应用
  • Java App3,Java App4 是2个独立的Java应用
  • zk1,zk2,zk3是ZooKeeper集群的3个连接点
  • /queue,是znode的队列目录,假设队列长度为3
  • /queue/purchase,是znode队列中,1号排对者,由Hadoop App1提交,用于统计采购金额。
  • /queue/sell,是znode队列中,2号排对者,由Hadoop App2提交,用于统计销售金额。
  • /queue/other,是znode队列中,3号排对者,由Java App3提交,用于统计其他费用支出金额。
  • /queue/profit,当znode队列中满了,触发创建利润节点。
  • 当/qeueu/profit被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束。

补充说明:

  • 创建/queue/purchase,/queue/sell,/queue/other目录时,没有前后顺序,程序提交后,/queue目录下会生成对应该子目录
  • App1可以通过zk2提交,App2也可通过zk3提交。原则上,找最近路由最近的znode节点提交。
  • 每个应用不能重复提出,直到3个任务都提交,计算利润的任务才会被执行。
  • /queue/profit被创建后,zk的应用会监听到这个事件,通知应用,队列已完成!

这里的同步队列的架构更详细的设计思路,请参考文章 ZooKeeper实现分布式队列Queue

4. 程序开发:基于Zookeeper的程序设计

最终的功能需求:计算2013年01月的利润。

4.1 实验环境

在真正企业开发时,我们的实验环境应该与需求是一致的,但我的硬件条件有限,因些做了一个简化的环境设置。

  • 把zookeeper的完全分步式部署的3台服务器集群节点的,改为一台服务器上3个集群节点。
  • 把2个独立Hadoop集群,改为一个集群的2个独立的MapReduce任务。

开发环境:

  • Win7 64bit
  • JDK 1.6
  • Maven3
  • Juno Service Release 2
  • IP:192.168.1.10

Zookeeper服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Zookeeper: 3.4.5
  • IP: 192.168.1.201
  • 3个集群节点

Hadoop服务器环境:

  • Linux Ubuntu 12.04 LTS 64bit
  • Java 1.6.0_29
  • Hadoop: 1.0.3
  • IP: 192.168.1.210

4.2 实验数据

3组实验数据:

  • 采购数据,purchase.csv
  • 销售数据,sell.csv
  • 其他费用数据,other.csv

4.2.1 采购数据集

一共4列,分别对应 产品ID,产品数量,产品单价,采购日期。


1,26,1168,2013-01-08
2,49,779,2013-02-12
3,80,850,2013-02-05
4,69,1585,2013-01-26
5,88,1052,2013-01-13
6,84,2363,2013-01-19
7,64,1410,2013-01-12
8,53,910,2013-01-11
9,21,1661,2013-01-19
10,53,2426,2013-02-18
11,64,2022,2013-01-07
12,36,2941,2013-01-28
13,99,3819,2013-01-19
14,64,2563,2013-02-16
15,91,752,2013-02-05
16,65,750,2013-02-04
17,19,2426,2013-02-23
18,19,724,2013-02-05
19,87,137,2013-01-25
20,86,2939,2013-01-14
21,92,159,2013-01-23
22,81,2331,2013-03-01
23,88,998,2013-01-20
24,38,102,2013-02-22
25,32,4813,2013-01-13
26,36,1671,2013-01-19

//省略部分数据

4.2.2 销售数据集

一共4列,分别对应 产品ID,销售数量,销售单价,销售日期。


1,14,1236,2013-01-14
2,19,808,2013-03-06
3,26,886,2013-02-23
4,23,1793,2013-02-09
5,27,1206,2013-01-21
6,27,2648,2013-01-30
7,22,1502,2013-01-19
8,20,1050,2013-01-18
9,13,1778,2013-01-30
10,20,2718,2013-03-14
11,22,2175,2013-01-12
12,16,3284,2013-02-12
13,30,4152,2013-01-30
14,22,2770,2013-03-11
15,28,778,2013-02-23
16,22,874,2013-02-22
17,12,2718,2013-03-22
18,12,747,2013-02-23
19,27,172,2013-02-07
20,27,3282,2013-01-22
21,28,224,2013-02-05
22,26,2613,2013-03-30
23,27,1147,2013-01-31
24,16,141,2013-03-20
25,15,5343,2013-01-21
26,16,1887,2013-01-30
27,12,2535,2013-01-12
28,16,469,2013-01-07
29,29,2395,2013-03-30
30,17,1549,2013-01-30
31,25,4173,2013-03-17

//省略部分数据

4.2.3 其他费用数据集

一共2列,分别对应 发生日期,发生金额


2013-01-02,552
2013-01-03,1092
2013-01-04,1794
2013-01-05,435
2013-01-06,960
2013-01-07,1066
2013-01-08,1354
2013-01-09,880
2013-01-10,1992
2013-01-11,931
2013-01-12,1209
2013-01-13,1491
2013-01-14,804
2013-01-15,480
2013-01-16,1891
2013-01-17,156
2013-01-18,1439
2013-01-19,1018
2013-01-20,1506
2013-01-21,1216
2013-01-22,2045
2013-01-23,400
2013-01-24,1795
2013-01-25,1977
2013-01-26,1002
2013-01-27,226
2013-01-28,1239
2013-01-29,702
2013-01-30,1396

//省略部分数据

4.3 程序设计

我们要编写5个文件:

  • 计算采购金额,Purchase.java
  • 计算销售金额,Sell.java
  • 计算其他费用金额,Other.java
  • 计算利润,Profit.java
  • Zookeeper的调度,ZookeeperJob.java

4.3.1 计算采购金额

采购金额,是基于Hadoop的MapReduce统计计算。


public class Purchase {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class PurchaseMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class PurchaseReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("purchase");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化purchase
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Purchase.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(PurchaseMapper.class);
        job.setReducerClass(PurchaseReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("purchase", "logfile/biz/purchase.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/purchase");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/purchase/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.2 计算销售金额

销售金额,是基于Hadoop的MapReduce统计计算。


public class Sell {

    public static final String HDFS = "hdfs://192.168.1.210:9000";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    public static class SellMapper extends Mapper {

        private String month = "2013-01";
        private Text k = new Text(month);
        private IntWritable v = new IntWritable();
        private int money = 0;

        public void map(LongWritable key, Text values, Context context) throws IOException, InterruptedException {
            System.out.println(values.toString());
            String[] tokens = DELIMITER.split(values.toString());
            if (tokens[3].startsWith(month)) {// 1月的数据
                money = Integer.parseInt(tokens[1]) * Integer.parseInt(tokens[2]);//单价*数量
                v.set(money);
                context.write(k, v);
            }
        }
    }

    public static class SellReducer extends Reducer {
        private IntWritable v = new IntWritable();
        private int money = 0;

        @Override
        public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
            for (IntWritable line : values) {
                // System.out.println(key.toString() + "\t" + line);
                money += line.get();
            }
            v.set(money);
            context.write(null, v);
            System.out.println("Output:" + key + "," + money);
        }

    }

    public static void run(Map path) throws IOException, InterruptedException, ClassNotFoundException {
        JobConf conf = config();
        String local_data = path.get("sell");
        String input = path.get("input");
        String output = path.get("output");

        // 初始化sell
        HdfsDAO hdfs = new HdfsDAO(HDFS, conf);
        hdfs.rmr(input);
        hdfs.mkdirs(input);
        hdfs.copyFile(local_data, input);

        Job job = new Job(conf);
        job.setJarByClass(Sell.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.setMapperClass(SellMapper.class);
        job.setReducerClass(SellReducer.class);

        job.setInputFormatClass(TextInputFormat.class);
        job.setOutputFormatClass(TextOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path(input));
        FileOutputFormat.setOutputPath(job, new Path(output));

        job.waitForCompletion(true);
    }

    public static JobConf config() {// Hadoop集群的远程配置信息
        JobConf conf = new JobConf(Purchase.class);
        conf.setJobName("purchase");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }

    public static Map path(){
        Map path = new HashMap();
        path.put("sell", "logfile/biz/sell.csv");// 本地的数据文件
        path.put("input", HDFS + "/user/hdfs/biz/sell");// HDFS的目录
        path.put("output", HDFS + "/user/hdfs/biz/sell/output"); // 输出目录
        return path;
    }

    public static void main(String[] args) throws Exception {
        run(path());
    }

}

4.3.3 计算其他费用金额

其他费用金额,是基于本地文件的统计计算。


public class Other {

    public static String file = "logfile/biz/other.csv";
    public static final Pattern DELIMITER = Pattern.compile("[\t,]");
    private static String month = "2013-01";

    public static void main(String[] args) throws IOException {
        calcOther(file);
    }

    public static int calcOther(String file) throws IOException {
        int money = 0;
        BufferedReader br = new BufferedReader(new FileReader(new File(file)));

        String s = null;
        while ((s = br.readLine()) != null) {
            // System.out.println(s);
            String[] tokens = DELIMITER.split(s);
            if (tokens[0].startsWith(month)) {// 1月的数据
                money += Integer.parseInt(tokens[1]);
            }
        }
        br.close();

        System.out.println("Output:" + month + "," + money);
        return money;
    }
}

4.3.4 计算利润

利润,通过zookeeper分步式自动调度计算利润。


public class Profit {

    public static void main(String[] args) throws Exception {
        profit();
    }

    public static void profit() throws Exception {
        int sell = getSell();
        int purchase = getPurchase();
        int other = getOther();
        int profit = sell - purchase - other;
        System.out.printf("profit = sell - purchase - other = %d - %d - %d = %d\n", sell, purchase, other, profit);
    }

    public static int getPurchase() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Purchase.HDFS, Purchase.config());
        return Integer.parseInt(hdfs.cat(Purchase.path().get("output") + "/part-r-00000").trim());
    }

    public static int getSell() throws Exception {
        HdfsDAO hdfs = new HdfsDAO(Sell.HDFS, Sell.config());
        return Integer.parseInt(hdfs.cat(Sell.path().get("output") + "/part-r-00000").trim());
    }

    public static int getOther() throws IOException {
        return Other.calcOther(Other.file);
    }

}

4.3.5 Zookeeper调度

调度,通过构建分步式队列系统,自动化程序代替人工操作。


public class ZooKeeperJob {

    final public static String QUEUE = "/queue";
    final public static String PROFIT = "/queue/profit";
    final public static String PURCHASE = "/queue/purchase";
    final public static String SELL = "/queue/sell";
    final public static String OTHER = "/queue/other";

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            System.out.println("Please start a task:");
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            doPurchase(zk);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            doSell(zk);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            doOther(zk);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals(PROFIT)) {
                    System.out.println("Queue has Completed!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => " + PROFIT);
        zk.exists(PROFIT, true);

        if (zk.exists(QUEUE, false) == null) {
            System.out.println("create " + QUEUE);
            zk.create(QUEUE, QUEUE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(QUEUE + " is exist!");
        }
    }

    public static void doPurchase(ZooKeeper zk) throws Exception {
        if (zk.exists(PURCHASE, false) == null) {

            Purchase.run(Purchase.path());

            System.out.println("create " + PURCHASE);
            zk.create(PURCHASE, PURCHASE.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(PURCHASE + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doSell(ZooKeeper zk) throws Exception {
        if (zk.exists(SELL, false) == null) {

            Sell.run(Sell.path());

            System.out.println("create " + SELL);
            zk.create(SELL, SELL.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(SELL + " is exist!");
        }
        isCompleted(zk);
    }

    public static void doOther(ZooKeeper zk) throws Exception {
        if (zk.exists(OTHER, false) == null) {

            Other.calcOther(Other.file);

            System.out.println("create " + OTHER);
            zk.create(OTHER, OTHER.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println(OTHER + " is exist!");
        }
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws Exception {
        int size = 3;
        List children = zk.getChildren(QUEUE, true);
        int length = children.size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create " + PROFIT);
            Profit.profit();
            zk.create(PROFIT, PROFIT.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

            for (String child : children) {// 清空节点
                zk.delete(QUEUE + "/" + child, -1);
            }
        }
    }
}

5. 运行程序

最后,我们运行整个的程序,包括3个部分。

  • zookeeper服务器
  • hadoop服务器
  • 分步式队列应用

5.1 启动zookeeper服务

启动zookeeper服务器集群:


~ cd toolkit/zookeeper345

# 启动zk集群3个节点
~ bin/zkServer.sh start conf/zk1.cfg
~ bin/zkServer.sh start conf/zk2.cfg
~ bin/zkServer.sh start conf/zk3.cfg

~ jps
4234 QuorumPeerMain
5002 Jps
4275 QuorumPeerMain
4207 QuorumPeerMain

查看zookeeper集群中,各节点的状态


# 查看zk1节点状态
~ bin/zkServer.sh status conf/zk1.cfg
JMX enabled by default
Using config: conf/zk1.cfg
Mode: follower

# 查看zk2节点状态,zk2为leader
~ bin/zkServer.sh status conf/zk2.cfg
JMX enabled by default
Using config: conf/zk2.cfg
Mode: leader

# 查看zk3节点状态
~ bin/zkServer.sh status conf/zk3.cfg
JMX enabled by default
Using config: conf/zk3.cfg
Mode: follower

启动zookeeper客户端:


~ bin/zkCli.sh -server 192.168.1.201:2181

# 查看zk
[zk: 192.168.1.201:2181(CONNECTED) 0] ls /
[queue, queue-fifo, zookeeper]

# /queue路径无子目录
[zk: 192.168.1.201:2181(CONNECTED) 1] ls /queue
[]

5.2 启动Hadoop服务


~ hadoop/hadoop-1.0.3
~ bin/start-all.sh

~ jps
25979 JobTracker
26257 TaskTracker
25576 DataNode
25300 NameNode
12116 Jps
25875 SecondaryNameNode

5.3 启动分步式队列ZookeeperJob

5.3.1 启动统计采购数据程序,设置启动参数1

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
copy from: logfile/biz/purchase.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/purchase
Output:2013-01,9609887
create /queue/purchase
Queue Complete:1/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 3] ls /queue
[purchase]

5.3.2 启动统计销售数据程序,设置启动参数2

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Delete: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Create: hdfs://192.168.1.210:9000/user/hdfs/biz/sell
copy from: logfile/biz/sell.csv to hdfs://192.168.1.210:9000/user/hdfs/biz/sell
Output:2013-01,2950315
create /queue/sell
Queue Complete:2/3

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[purchase, sell]

5.3.3 启动统计其他费用数据程序,设置启动参数3

只显示用户日志,忽略系统日志。


WATCH => /queue/profit
/queue is exist!
Output:2013-01,34193
create /queue/other
Queue Complete:3/3
create /queue/profit
cat: hdfs://192.168.1.210:9000/user/hdfs/biz/sell/output/part-r-00000
2950315

cat: hdfs://192.168.1.210:9000/user/hdfs/biz/purchase/output/part-r-00000
9609887

Output:2013-01,34193
profit = sell - purchase - other = 2950315 - 9609887 - 34193 = -6693765
Queue has Completed!!!

在zk中查看queue目录


[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[profit]

在最后一步,统计其他费用数据程序运行后,从日志中看到3个条件节点都已满足要求。然后,通过同步的分步式队列自动启动了计算利润的程序,并在日志中打印了2013年1月的利润为-6693765。

本文介绍的源代码,已上传到github: https://github.com/bsspirit/maven_hadoop_template/tree/master/src/main/java/org/conan/myzk/hadoop

通过这个复杂的实验,我们成功地用zookeeper实现了分步式队列,并应用到了业务中。当然,实验中也有一些不是特别的严谨的地方,请同学边做边思考。

######################################################
看文字不过瘾,作者视频讲解,请访问网站:http://onbook.me/video
######################################################

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-case/

打赏作者

ZooKeeper实现分布式FIFO队列

让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/zookeeper-queue-fifo

zookeeper-fifo-queue

前言

ZooKeeper是一个强大的分布式协作系统,用ZooKeeper可以方便地实现先进先出(FIFO)队列。给“队列”的技术现实多一种选择,标准化我们的程序结构。另一篇,分步式同步队列实现,请参考:ZooKeeper实现分布式队列Queue

关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及使用

目录

  1. 分布式先进先出(FIFO)队列
  2. 设计思路
  3. 程序实现

1. 分布式先进先出(FIFO)队列

在计算机科学中,消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式。消息队列提供了异步的通信协议,消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

先进先出(FIFO)队列,是消息队列最基本的一种实现形式,先发出的先消费。

2. 设计思路

实现的思路也非常简单,在/queue-fifo的目录下创建 SEQUENTIAL 类型的子目录 /x(i),这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证FIFO。

zookeeper-queue-fifo

应用实例
zookeeper-queue-fifo-app
图标解释

  1. app1,app2,app3是3个独立的业务系统
  2. zk1,zk2,zk3是ZooKeeper集群的3个连接点
  3. /queue-fifo,是znode的队列,按顺序存储数据
  4. /queue-fifo/x1,是znode队列中,1号排对者,由app1提交
  5. /queue-fifo/x2,是znode队列中,2号排对者,由app2提交
  6. app3是消费者,通过zk3连接到znode队列中,找到/queue-fifo中顺序最少的节点消费,删除消费后的节点(红色线表示)

注:

  • 1). app1可以通过zk2提交,app2也可通过zk3提交
  • 2). app1可以提交3次请求,生成x1,x2,x3多个节点
  • 3). app1可以作为消费者,消费队列数据

3. 程序实现

1). 单节点模拟实验

模拟app1,通过zk1,生产2个节点,然后再消费3个节点。


    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        produce(zk, 1);
        produce(zk, 2);
        cosume(zk);
        cosume(zk);
        cosume(zk);
        zk.close();
    }

创建一个与服务器的连接


    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, null);
        return zk;
    }

出始化队列


    public static ZooKeeper connection(String host) throws IOException {
        return new ZooKeeper(host, 60000, new Watcher() {
            public void process(WatchedEvent event) {
            }
        });
    }

生产者


    public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue-fifo/x" + x + " x" + x);
        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

消费者


    public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
        List list = zk.getChildren("/queue-fifo", true);
        if (list.size() > 0) {
            long min = Long.MAX_VALUE;
            for (String num : list) {
                if (min > Long.parseLong(num.substring(1))) {
                    min = Long.parseLong(num.substring(1));
                }
            }
            System.out.println("delete /queue/x" + min);
            zk.delete("/queue-fifo/x" + min, 0);
        } else {
            System.out.println("No node to cosume");
        }
    }

启动main函数


    public static void main(String[] args) throws Exception {
            doOne();
    }

运行结果:


/queue-fifo is exist!
create /queue-fifo/x1 x1
create /queue-fifo/x2 x2
delete /queue/x10000000032
delete /queue/x20000000033
No node to cosume

完全符合我的们预期,由于produce时,我们创建的节点模式是EPHEMERAL_SEQUENTIAL,所以系统会在x(i)(n),随机生成n=0000000032,输出为x10000000032。

接下来我们看分布式环境。

2). 分布式模拟实验
app1通过zk1生产x1, app2通过zk2生产x2, app3通过zk3消费3个节点


public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            produce(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            produce(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            cosume(zk);
            cosume(zk);
            cosume(zk);
            break;
        }
    }

启动main函数


    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3
run1

run1: 执行app1–>zk1


#日志输出
/queue-fifo is exist!
create /queue-fifo/x1 x1

run2: 执行app2–>zk2


#日志输出
/queue-fifo is exist!
create /queue-fifo/x2 x2

run3: 执行app3–>zk3


#日志输出
/queue-fifo is exist!
delete /queue/x10000000034
delete /queue/x20000000035
No node to cosume

我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。

下面贴一下完整的代码:


package org.conan.zookeeper.demo;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class FIFOZooKeeper {

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        produce(zk, 1);
        produce(zk, 2);
        cosume(zk);
        cosume(zk);
        cosume(zk);
        zk.close();
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            produce(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            produce(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            cosume(zk);
            cosume(zk);
            cosume(zk);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        return new ZooKeeper(host, 60000, new Watcher() {
            public void process(WatchedEvent event) {
            }
        });
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        if (zk.exists("/queue-fifo", false) == null) {
            System.out.println("create /queue-fifo task-queue-fifo");
            zk.create("/queue-fifo", "task-queue-fifo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue-fifo is exist!");
        }
    }

    public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue-fifo/x" + x + " x" + x);
        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
        List list = zk.getChildren("/queue-fifo", true);
        if (list.size() > 0) {
            long min = Long.MAX_VALUE;
            for (String num : list) {
                if (min > Long.parseLong(num.substring(1))) {
                    min = Long.parseLong(num.substring(1));
                }
            }
            System.out.println("delete /queue/x" + min);
            zk.delete("/queue-fifo/x" + min, 0);
        } else {
            System.out.println("No node to cosume");
        }
    }

}

转载请注明出处:
http://blog.fens.me/zookeeper-queue-fifo

打赏作者

ZooKeeper实现分布式队列Queue

让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/zookeeper-queue
zookeeper-queue

前言
ZooKeeper是一个分步式的协作系统,何为协作,ZooKeeper价值又有何体现。通过这篇文章的分布式队列的案例,你将了解到ZooKeeper的强大。关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及使用

目录

  1. 分布式队列
  2. 设计思路
  3. 程序实现

1. 分布式队列

队列有很多种产品,大都是消息系统所实现的,像ActiveMQ,JBossMQ,RabbitMQ,IBM-MQ等。分步式队列产品并不太多,像Beanstalkd。

本文实现的分布式对列,是基于ZooKeeper现实的一种同步的分步式队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。

2. 设计思路

创建一个父目录 /queue,每个成员都监控(Watch)标志位目录/queue/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /queue/x(i)的临时目录节点,然后每个成员获取 /queue 目录的所有目录节点,也就是 x(i)。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /queue/start 的出现,如果已经相等就创建 /queue/start。

产品流程图
zookeeper-queue

应用实例
zookeeper-queue-dataprocess

图标解释

  1. app1,app2,app3,app4是4个独立的业务系统
  2. zk1,zk2,zk3是ZooKeeper集群的3个连接点
  3. /queue,是znode的队列,假设队列长度为3
  4. /queue/x1,是znode队列中,1号排对者,由app1提交,同步请求,app1挂载等待
  5. /queue/x2,是znode队列中,2号排对者,由app2提交,同步请求,app2挂起等待
  6. /queue/x3,是znode队列中,3号排对者,由app3提交,同步请求,app3挂起等待
  7. /queue/start,当znode队列中满了,触发创建开始节点
  8. 当/qeueu/start被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束

注:

  • 1). 创建/queue/x1,/queue/x2,/queue/x3没有前后顺序,提交后程序就同步挂起。
  • 2). app1可以通过zk2提交,app2也可通过zk3提交
  • 3). app1可以提交3次请求,生成x1,x2,x3使用队列充满
  • 4). /queue/start被创建后,zk1会监听到这个事件,再告诉app1,队列已完成!

3. 程序实现

1). 单节点模拟实验

模拟app1,通过zk1,提交3个请求


   public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        joinQueue(zk, 1);
        joinQueue(zk, 2);
        joinQueue(zk, 3);
        zk.close();
    }

创建一个与服务器的连接


    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监听/queue/start创建的事件
            public void process(WatchedEvent event) {
                if (event.getPath() != null && event.getPath().equals("/queue/start") && event.getType() == Event.EventType.NodeCreated) {
                    System.out.println("Queue has Completed.Finish testing!!!");
                }
            }
        });
        return zk;
    }

出始化队列


    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => /queue/start");
        zk.exists("/queue/start", true);

        if (zk.exists("/queue", false) == null) {
            System.out.println("create /queue task-queue");
            zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue is exist!");
        }
    }

增加队列节点


    public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue/x" + x + " x" + x);
        zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        isCompleted(zk);
    }

检查队列是否完整


    public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
        int size = 3;
        int length = zk.getChildren("/queue", true).size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create /queue/start start");
            zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } 
    }

启动函数main


public static void main(String[] args) throws Exception {
    doOne();
}

运行结果:


WATCH => /queue/start
/queue is exist!
create /queue/x1 x1
Queue Complete:1/3
create /queue/x2 x2
Queue Complete:2/3
create /queue/x3 x3
Queue Complete:3/3
create /queue/start start
Queue has Completed.Finish testing!!!

完全符合我的们预期。接下来我们看分布式环境

2). 分布式模拟实验

模拟app1通过zk1提交x1,app2通过zk2提交x2,app3通过zk3提交x3


    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            joinQueue(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            joinQueue(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            joinQueue(zk, 3);
            break;
        }
    }

注:

  • 1). 为了简单起见,我们没有增加复杂的多线程控制的机制。
  • 2). 没有调用zk.close()方法,也就是说,app1执行完单独的提交,app1就结束了,但zk1还存在着,所以/queue/x1存在于队列。
  • 3). 程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3

zk-run1

执行app1–>zk1


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x1 x1
Queue Complete:1/3

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 4] ls /queue
[x10000000011]

执行app2–>zk2


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x2 x2
Queue Complete:2/3

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[x20000000012, x10000000011]

执行app3–>zk3


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x3 x3
Queue Complete:3/3
create /queue/start start
Queue has Completed.Finish testing!!!

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[x30000000016, x10000000014, start, x20000000015]

/queue/stats被建立,打印出“Queue has Completed.Finish testing!!!”,代表调用app4完成!

我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。

下面贴一下完整的代码:


package org.conan.zookeeper.demo;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;

public class QueueZooKeeper {

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        joinQueue(zk, 1);
        joinQueue(zk, 2);
        joinQueue(zk, 3);
        zk.close();
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            joinQueue(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            joinQueue(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            joinQueue(zk, 3);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals("/queue/start")) {
                    System.out.println("Queue has Completed.Finish testing!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => /queue/start");
        zk.exists("/queue/start", true);

        if (zk.exists("/queue", false) == null) {
            System.out.println("create /queue task-queue");
            zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue is exist!");
        }
    }

    public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue/x" + x + " x" + x);
        zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
        int size = 3;
        int length = zk.getChildren("/queue", true).size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create /queue/start start");
            zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } 
    }

}

转载请注明出处:
http://blog.fens.me/zookeeper-queue

打赏作者

ZooKeeper伪分布式集群安装及使用

让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-intro/

zookeeper

前言

ZooKeeper是Hadoop家族的一款高性能的分布式协作的产品。在单机中,系统协作大都是进程级的操作。分布式系统中,服务协作都是跨服务器才能完成的。在ZooKeeper之前,我们对于协作服务大都使用消息中间件,随着分布式系统的普及,用消息中间件完成协作,会有大量的程序开发。ZooKeeper直接面向于分布式系统,可以减少我们自己的开发,帮助我们更好完成分布式系统的数据管理问题。

目录

  1. zookeeper介绍
  2. zookeeper单节点安装
  3. zookeeper伪分布式集群安装
  4. zookeeper命令行操作
  5. Java编程现实命令行操作

1. zookeeper介绍

ZooKeeper是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper本身可以以Standalone模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower),基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。

ZooKeeper是作为分布式协调服务,是不需要依赖于Hadoop的环境,也可以为其他的分布式环境提供服务。

2. zookeeper单节点安装Standalones模式

系统环境:

  • Linux Ubuntu 12.04.2 LTS 64bit server
  • Java: 1.6.0_29 64-Bit Server VM
~ uname -a
Linux conan 3.5.0-23-generic #35~precise1-Ubuntu SMP Fri Jan 25 17:13:26 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux

~ cat /etc/issue
Ubuntu 12.04.2 LTS \n \l

~ java -version
java version "1.6.0_29"
Java(TM) SE Runtime Environment (build 1.6.0_29-b11)
Java HotSpot(TM) 64-Bit Server VM (build 20.4-b02, mixed mode)

下载zookeeper

~ mkdir /home/conan/toolkit
~ cd /home/conan/toolkit
~ wget http://apache.dataguru.cn/zookeeper/stable/zookeeper-3.4.5.tar.gz
~ tar xvf zookeeper-3.4.5.tar.gz
~ mv zookeeper-3.4.5 zookeeper345
~ cd zookeeper345
~ ls -l
drwxr-xr-x  2 conan conan    4096 Aug 12 04:34 bin
-rw-r--r--  1 conan conan   75988 Aug 12 04:34 build.xml
-rw-r--r--  1 conan conan   70223 Aug 12 04:34 CHANGES.txt
drwxr-xr-x  2 conan conan    4096 Aug 12 04:34 conf
drwxr-xr-x 10 conan conan    4096 Aug 12 04:34 contrib
drwxr-xr-x  2 conan conan    4096 Aug 12 04:34 dist-maven
drwxr-xr-x  6 conan conan    4096 Aug 12 04:34 docs
-rw-r--r--  1 conan conan    1953 Aug 12 04:34 ivysettings.xml
-rw-r--r--  1 conan conan    3120 Aug 12 04:34 ivy.xml
drwxr-xr-x  4 conan conan    4096 Aug 12 04:34 lib
-rw-r--r--  1 conan conan   11358 Aug 12 04:34 LICENSE.txt
-rw-r--r--  1 conan conan     170 Aug 12 04:34 NOTICE.txt
-rw-r--r--  1 conan conan    1770 Aug 12 04:34 README_packaging.txt
-rw-r--r--  1 conan conan    1585 Aug 12 04:34 README.txt
drwxr-xr-x  5 conan conan    4096 Aug 12 04:34 recipes
drwxr-xr-x  8 conan conan    4096 Aug 12 04:34 src
-rw-r--r--  1 conan conan 1315806 Aug 12 04:34 zookeeper-3.4.5.jar
-rw-r--r--  1 conan conan     833 Aug 12 04:34 zookeeper-3.4.5.jar.asc
-rw-r--r--  1 conan conan      33 Aug 12 04:34 zookeeper-3.4.5.jar.md5
-rw-r--r--  1 conan conan      41 Aug 12 04:34 zookeeper-3.4.5.jar.sha1

修改配置文件conf/zoo.cfg


~ mkdir /home/conan/zoo/zk0
~ cp conf/zoo_sample.cfg conf/zoo.cfg

~ vi conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/conan/zoo/zk0
clientPort=2181

非常简单,我们已经配置好了的zookeeper单节点

启动zookeeper


~ bin/zkServer.sh
JMX enabled by default
Using config: /home/conan/zoo/zk0/zookeeper345/bin/../conf/zoo.cfg
Usage: bin/zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}
conan@conan:~/zoo/zk0/zookeeper345$ bin/zkServer.sh start
JMX enabled by default
Using config: /home/conan/zoo/zk0/zookeeper345/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

#zk的服务显示为QuorumPeerMain
~ jps
5321 QuorumPeerMain
5338 Jps

#查看运行状态
~ bin/zkServer.sh status
JMX enabled by default
Using config: /home/conan/zoo/zk0/zookeeper345/bin/../conf/zoo.cfg
Mode: standalone

单节点的时,Mode会显示为standalone

停止ZooKeeper服务


~ bin/zkServer.sh stop
JMX enabled by default
Using config: /home/conan/zoo/zk0/zookeeper345/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

3. zookeeper伪分布式集群安装

所谓 “伪分布式集群” 就是在,在一台PC中,启动多个ZooKeeper的实例。“完全分布式集群” 是每台PC,启动一个ZooKeeper实例。

由于我的测试环境PC数量有限,所以在一台PC中,启动3个ZooKeeper的实例。

创建环境目录


~ mkdir /home/conan/zoo/zk1
~ mkdir /home/conan/zoo/zk2
~ mkdir /home/conan/zoo/zk3

#新建myid文件
~ echo "1" > /home/conan/zoo/zk1/myid
~ echo "2" > /home/conan/zoo/zk2/myid
~ echo "3" > /home/conan/zoo/zk3/myid

分别修改配置文件
修改:dataDir,clientPort
增加:集群的实例,server.X,”X”表示每个目录中的myid的值


~ vi /home/conan/toolkit/zookeeper345/conf/zk1.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/conan/zoo/zk1
clientPort=2181
server.1=192.168.1.201:2888:3888
server.2=192.168.1.201:2889:3889
server.3=192.168.1.201:2890:3890

~ vi /home/conan/toolkit/zookeeper345/conf/zk2.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/conan/zoo/zk2
clientPort=2182
server.1=192.168.1.201:2888:3888
server.2=192.168.1.201:2889:3889
server.3=192.168.1.201:2890:3890

~ vi /home/conan/toolkit/zookeeper345/conf/zk3.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/conan/zoo/zk3
clientPort=2183
server.1=192.168.1.201:2888:3888
server.2=192.168.1.201:2889:3889
server.3=192.168.1.201:2890:3890

3个节点的ZooKeeper集群配置完成,接下来我们的启动服务。

启动集群


~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh start zk1.cfg
~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh start zk2.cfg
~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh start zk3.cfg

~ jps
5422 QuorumPeerMain
5395 QuorumPeerMain
5463 QuorumPeerMain
5494 Jps

#查看节点状态
~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh status zk1.cfg
JMX enabled by default
Using config: /home/conan/toolkit/zookeeper345/bin/../conf/zk1.cfg
Mode: follower

~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh status zk2.cfg
JMX enabled by default
Using config: /home/conan/toolkit/zookeeper345/bin/../conf/zk2.cfg
Mode: leader

~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh status zk3.cfg
JMX enabled by default
Using config: /home/conan/toolkit/zookeeper345/bin/../conf/zk3.cfg
Mode: follower

我们可以看到zk2是leader,zk1和zk3是follower

查看ZooKeeper物理文件目录结构


~ tree  -L 3 /home/conan/zoo
/home/conan/zoo
├── zk0
├── zk1
│   ├── myid
│   ├── version-2
│   │   ├── acceptedEpoch
│   │   ├── currentEpoch
│   │   ├── log.100000001
│   │   └── snapshot.0
│   └── zookeeper_server.pid
├── zk2
│   ├── myid
│   ├── version-2
│   │   ├── acceptedEpoch
│   │   ├── currentEpoch
│   │   ├── log.100000001
│   │   └── snapshot.0
│   └── zookeeper_server.pid
└── zk3
    ├── myid
    ├── version-2
    │   ├── acceptedEpoch
    │   ├── currentEpoch
    │   ├── log.100000001
    │   └── snapshot.100000000
    └── zookeeper_server.pid

7 directories, 18 files

4. zookeeper命令行操作

我们通过客户端连接ZooKeeper的集群,我们可以任意的zookeeper是进行连接。


~ /home/conan/toolkit/zookeeper345/bin/zkCli.sh -server 192.168.1.201:2181

Connecting to 192.168.1.201
2013-08-12 05:25:39,260 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
2013-08-12 05:25:39,267 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=conan
2013-08-12 05:25:39,269 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.6.0_29
2013-08-12 05:25:39,269 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Sun Microsystems Inc.
2013-08-12 05:25:39,270 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/home/conan/toolkit/jdk16/jre
2013-08-12 05:25:39,270 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/home/conan/toolkit/zookeeper345/bin/../build/classes:/home/conan/toolkit/zookeeper345/bin/../build/lib/*.jar:/home/conan/toolkit/zookeeper345/bin/../lib/slf4j-log4j12-1.6.1.jar:/home/conan/toolkit/zookeeper345/bin/../lib/slf4j-api-1.6.1.jar:/home/conan/toolkit/zookeeper345/bin/../lib/netty-3.2.2.Final.jar:/home/conan/toolkit/zookeeper345/bin/../lib/log4j-1.2.15.jar:/home/conan/toolkit/zookeeper345/bin/../lib/jline-0.9.94.jar:/home/conan/toolkit/zookeeper345/bin/../zookeeper-3.4.5.jar:/home/conan/toolkit/zookeeper345/bin/../src/java/lib/*.jar:/home/conan/toolkit/zookeeper345/bin/../conf:
2013-08-12 05:25:39,271 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/home/conan/toolkit/jdk16/jre/lib/amd64/server:/home/conan/toolkit/jdk16/jre/lib/amd64:/home/conan/toolkit/jdk16/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2013-08-12 05:25:39,275 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2013-08-12 05:25:39,276 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=
2013-08-12 05:25:39,276 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
2013-08-12 05:25:39,277 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
2013-08-12 05:25:39,281 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=3.5.0-23-generic
2013-08-12 05:25:39,282 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=conan
2013-08-12 05:25:39,282 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/home/conan
2013-08-12 05:25:39,283 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/home/conan/zoo
2013-08-12 05:25:39,284 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=192.168.1.201 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@22ba6c83
Welcome to ZooKeeper!
JLine support is enabled
[zk: 192.168.1.201(CONNECTING) 0] 2013-08-12 05:25:39,336 [myid:] - INFO  [main-SendThread(192.168.1.201:2181):ClientCnxn$SendThread@966] - Opening socket connection to server 192.168.1.201/192.168.1.201:2181. Will not attempt to authenticate using SASL (Unable to locate a login configuration)
2013-08-12 05:25:39,345 [myid:] - INFO  [main-SendThread(192.168.1.201:2181):ClientCnxn$SendThread@849] - Socket connection established to 192.168.1.201/192.168.1.201:2181, initiating session
2013-08-12 05:25:39,384 [myid:] - INFO  [main-SendThread(192.168.1.201:2181):ClientCnxn$SendThread@1207] - Session establishment complete on server 192.168.1.201/192.168.1.201:2181, sessionid = 0x1406f3c1ef90001, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

[zk: 192.168.1.201(CONNECTED) 0]

集群已连接,下面我们要使用一下,ZooKeeper的命令行操作。

命令行操作
通过help打印命令行帮助


[zk: 192.168.1.201(CONNECTED) 1] help
ZooKeeper -server host:port cmd args
        connect host:port
        get path [watch]
        ls path [watch]
        set path data [version]
        rmr path
        delquota [-n|-b] path
        quit
        printwatches on|off
        create [-s] [-e] path data acl
        stat path [watch]
        close
        ls2 path [watch]
        history
        listquota path
        setAcl path acl
        getAcl path
        sync path
        redo cmdno
        addauth scheme auth
        delete path [version]
        setquota -n|-b val path

ZooKeeper的结构,很像是目录结构,我们看到了像ls这样熟悉的命令。


#ls,查看/目录内容
[zk: 192.168.1.201(CONNECTED) 1] ls /
[zookeeper]

#create,创建一个znode节点
[zk: 192.168.1.201(CONNECTED) 2] create /node conan
Created /node

#ls,再查看/目录
[zk: 192.168.1.201(CONNECTED) 3] ls /
[node, zookeeper]

#get,查看/node的数据信息
[zk: 192.168.1.201(CONNECTED) 4] get /node
conan
cZxid = 0x100000006
ctime = Mon Aug 12 05:32:49 CST 2013
mZxid = 0x100000006
mtime = Mon Aug 12 05:32:49 CST 2013
pZxid = 0x100000006
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

#set,修改数据
[zk: 192.168.1.201(CONNECTED) 5] set /node fens.me
cZxid = 0x100000006
ctime = Mon Aug 12 05:32:49 CST 2013
mZxid = 0x100000007
mtime = Mon Aug 12 05:34:32 CST 2013
pZxid = 0x100000006
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0

#get,再查看/node的数据信息,已改为fens.me
[zk: 192.168.1.201(CONNECTED) 6] get /node
fens.me
cZxid = 0x100000006
ctime = Mon Aug 12 05:32:49 CST 2013
mZxid = 0x100000007
mtime = Mon Aug 12 05:34:32 CST 2013
pZxid = 0x100000006
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0

#delete,删除/node
[zk: 192.168.1.201(CONNECTED) 7] delete /node
[zk: 192.168.1.201(CONNECTED) 8] ls /
[zookeeper]

#quit,退出客户端连接
[zk: 192.168.1.201(CONNECTED) 19] quit
Quitting...
2013-08-12 05:40:29,304 [myid:] - INFO  [main:ZooKeeper@684] - Session: 0x1406f3c1ef90002 closed
2013-08-12 05:40:29,305 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@509] - EventThread shut down

5. Java编程现实命令行操作


package org.conan.zookeeper.demo;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class BasicDemo1 {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        // 创建一个与服务器的连接
        ZooKeeper zk = new ZooKeeper("192.168.1.201:2181", 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                System.out.println("EVENT:" + event.getType());
            }
        });

        // 查看根节点
        System.out.println("ls / => " + zk.getChildren("/", true));

        // 创建一个目录节点
        if (zk.exists("/node", true) == null) {
            zk.create("/node", "conan".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("create /node conan");
            // 查看/node节点数据
            System.out.println("get /node => " + new String(zk.getData("/node", false, null)));
            // 查看根节点
            System.out.println("ls / => " + zk.getChildren("/", true));
        }

        // 创建一个子目录节点
        if (zk.exists("/node/sub1", true) == null) {
            zk.create("/node/sub1", "sub1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("create /node/sub1 sub1");
            // 查看node节点
            System.out.println("ls /node => " + zk.getChildren("/node", true));
        }

        // 修改节点数据
        if (zk.exists("/node", true) != null) {
            zk.setData("/node", "changed".getBytes(), -1);
            // 查看/node节点数据
            System.out.println("get /node => " + new String(zk.getData("/node", false, null)));
        }

        // 删除节点
        if (zk.exists("/node/sub1", true) != null) {
            zk.delete("/node/sub1", -1);
            zk.delete("/node", -1);
            // 查看根节点
            System.out.println("ls / => " + zk.getChildren("/", true));
        }

        // 关闭连接
        zk.close();
    }
}

运行结果:

2013-08-12 15:33:29,699 [myid:] - INFO  [main:Environment@97] - Client environment:zookeeper.version=3.3.1-942149, built on 05/07/2010 17:14 GMT
2013-08-12 15:33:29,702 [myid:] - INFO  [main:Environment@97] - Client environment:host.name=PC201304202140
2013-08-12 15:33:29,702 [myid:] - INFO  [main:Environment@97] - Client environment:java.version=1.6.0_45
2013-08-12 15:33:29,702 [myid:] - INFO  [main:Environment@97] - Client environment:java.vendor=Sun Microsystems Inc.
2013-08-12 15:33:29,702 [myid:] - INFO  [main:Environment@97] - Client environment:java.home=D:\toolkit\java\jdk6\jre
2013-08-12 15:33:29,703 [myid:] - INFO  [main:Environment@97] - Client environment:java.class.path=D:\workspace\java\zkdemo\target\classes;C:\Users\Administrator\.m2\repository\org\apache\hadoop\zookeeper\3.3.1\zookeeper-3.3.1.jar;C:\Users\Administrator\.m2\repository\log4j\log4j\1.2.15\log4j-1.2.15.jar;C:\Users\Administrator\.m2\repository\javax\mail\mail\1.4\mail-1.4.jar;C:\Users\Administrator\.m2\repository\javax\activation\activation\1.1\activation-1.1.jar;C:\Users\Administrator\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\Administrator\.m2\repository\junit\junit\3.8.1\junit-3.8.1.jar
2013-08-12 15:33:29,703 [myid:] - INFO  [main:Environment@97] - Client environment:java.library.path=D:\toolkit\java\jdk6\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;D:\toolkit\Rtools\bin;D:\toolkit\Rtools\gcc-4.6.3\bin;C:\Program Files (x86)\Common Files\NetSarang;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;D:\toolkit\Git\cmd;D:\toolkit\Git\bin;C:\Program Files (x86)\Microsoft SQL Server\100\Tools\Binn\;C:\Program Files\Microsoft SQL Server\100\Tools\Binn\;C:\Program Files\Microsoft SQL Server\100\DTS\Binn\;c:\Program Files (x86)\Common Files\Ulead Systems\MPEG;C:\Program Files (x86)\QuickTime\QTSystem\;D:\toolkit\MiKTex\miktex\bin\x64\;D:\toolkit\sshclient;D:\toolkit\ant19\bin;D:\toolkit\eclipse;D:\toolkit\gradle15\bin;D:\toolkit\java\jdk6\bin;D:\toolkit\maven3\bin;D:\toolkit\mysql56\bin;D:\toolkit\python27;D:\toolkit\putty;C:\Program Files\R\R-3.0.1\bin\x64;D:\toolkit\mongodb243\bin;D:\toolkit\php54;D:\toolkit\nginx140;D:\toolkit\nodejs;D:\toolkit\npm12\bin;D:\toolkit\java\jdk6\jre\bin\server;.
2013-08-12 15:33:29,703 [myid:] - INFO  [main:Environment@97] - Client environment:java.io.tmpdir=C:\Users\ADMINI~1\AppData\Local\Temp\
2013-08-12 15:33:29,704 [myid:] - INFO  [main:Environment@97] - Client environment:java.compiler=
2013-08-12 15:33:29,704 [myid:] - INFO  [main:Environment@97] - Client environment:os.name=Windows 7
2013-08-12 15:33:29,704 [myid:] - INFO  [main:Environment@97] - Client environment:os.arch=amd64
2013-08-12 15:33:29,705 [myid:] - INFO  [main:Environment@97] - Client environment:os.version=6.1
2013-08-12 15:33:29,705 [myid:] - INFO  [main:Environment@97] - Client environment:user.name=Administrator
2013-08-12 15:33:29,705 [myid:] - INFO  [main:Environment@97] - Client environment:user.home=C:\Users\Administrator
2013-08-12 15:33:29,706 [myid:] - INFO  [main:Environment@97] - Client environment:user.dir=D:\workspace\java\zkdemo
2013-08-12 15:33:29,707 [myid:] - INFO  [main:ZooKeeper@373] - Initiating client connection, connectString=192.168.1.201:2181 sessionTimeout=60000 watcher=org.conan.zookeeper.demo.BasicDemo1$1@3dfeca64
2013-08-12 15:33:29,731 [myid:] - INFO  [main-SendThread():ClientCnxn$SendThread@1000] - Opening socket connection to server /192.168.1.201:2181
2013-08-12 15:33:38,736 [myid:] - INFO  [main-SendThread(192.168.1.201:2181):ClientCnxn$SendThread@908] - Socket connection established to 192.168.1.201/192.168.1.201:2181, initiating session
2013-08-12 15:33:38,804 [myid:] - INFO  [main-SendThread(192.168.1.201:2181):ClientCnxn$SendThread@701] - Session establishment complete on server 192.168.1.201/192.168.1.201:2181, sessionid = 0x1406f3c1ef9000d, negotiated timeout = 60000
EVENT:None
ls / => [zookeeper]
EVENT:NodeCreated
EVENT:NodeChildrenChanged
create /node conan
get /node => conan
ls / => [node, zookeeper]
EVENT:NodeCreated
create /node/sub1 sub1
ls /node => [sub1]
EVENT:NodeDataChanged
get /node => changed
EVENT:NodeDeleted
EVENT:NodeChildrenChanged
EVENT:NodeChildrenChanged
ls / => [zookeeper]
2013-08-12 15:33:38,877 [myid:] - INFO  [main:ZooKeeper@538] - Session: 0x1406f3c1ef9000d closed

pom.xml,maven配置文件

<?xml version="1.0" encoding="UTF-8"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>org.conan.zookeeper</groupId>
<artifactId>zkdemo</artifactId>
<version>0.0.1</version>
<name>zkdemo</name>
<description>zkdemo</description>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.1</version>
<exclusions>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

基础讲完,接下来就让我动手管理起,我们自己的分布式系统吧。

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-intro/

打赏作者