Blog Archives

ZooKeeper实现分布式FIFO队列

让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/zookeeper-queue-fifo

zookeeper-fifo-queue

前言

ZooKeeper是一个强大的分布式协作系统,用ZooKeeper可以方便地实现先进先出(FIFO)队列。给“队列”的技术现实多一种选择,标准化我们的程序结构。另一篇,分步式同步队列实现,请参考:ZooKeeper实现分布式队列Queue

关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及使用

目录

  1. 分布式先进先出(FIFO)队列
  2. 设计思路
  3. 程序实现

1. 分布式先进先出(FIFO)队列

在计算机科学中,消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式。消息队列提供了异步的通信协议,消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

先进先出(FIFO)队列,是消息队列最基本的一种实现形式,先发出的先消费。

2. 设计思路

实现的思路也非常简单,在/queue-fifo的目录下创建 SEQUENTIAL 类型的子目录 /x(i),这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证FIFO。

zookeeper-queue-fifo

应用实例
zookeeper-queue-fifo-app
图标解释

  1. app1,app2,app3是3个独立的业务系统
  2. zk1,zk2,zk3是ZooKeeper集群的3个连接点
  3. /queue-fifo,是znode的队列,按顺序存储数据
  4. /queue-fifo/x1,是znode队列中,1号排对者,由app1提交
  5. /queue-fifo/x2,是znode队列中,2号排对者,由app2提交
  6. app3是消费者,通过zk3连接到znode队列中,找到/queue-fifo中顺序最少的节点消费,删除消费后的节点(红色线表示)

注:

  • 1). app1可以通过zk2提交,app2也可通过zk3提交
  • 2). app1可以提交3次请求,生成x1,x2,x3多个节点
  • 3). app1可以作为消费者,消费队列数据

3. 程序实现

1). 单节点模拟实验

模拟app1,通过zk1,生产2个节点,然后再消费3个节点。


    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        produce(zk, 1);
        produce(zk, 2);
        cosume(zk);
        cosume(zk);
        cosume(zk);
        zk.close();
    }

创建一个与服务器的连接


    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, null);
        return zk;
    }

出始化队列


    public static ZooKeeper connection(String host) throws IOException {
        return new ZooKeeper(host, 60000, new Watcher() {
            public void process(WatchedEvent event) {
            }
        });
    }

生产者


    public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue-fifo/x" + x + " x" + x);
        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

消费者


    public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
        List list = zk.getChildren("/queue-fifo", true);
        if (list.size() > 0) {
            long min = Long.MAX_VALUE;
            for (String num : list) {
                if (min > Long.parseLong(num.substring(1))) {
                    min = Long.parseLong(num.substring(1));
                }
            }
            System.out.println("delete /queue/x" + min);
            zk.delete("/queue-fifo/x" + min, 0);
        } else {
            System.out.println("No node to cosume");
        }
    }

启动main函数


    public static void main(String[] args) throws Exception {
            doOne();
    }

运行结果:


/queue-fifo is exist!
create /queue-fifo/x1 x1
create /queue-fifo/x2 x2
delete /queue/x10000000032
delete /queue/x20000000033
No node to cosume

完全符合我的们预期,由于produce时,我们创建的节点模式是EPHEMERAL_SEQUENTIAL,所以系统会在x(i)(n),随机生成n=0000000032,输出为x10000000032。

接下来我们看分布式环境。

2). 分布式模拟实验
app1通过zk1生产x1, app2通过zk2生产x2, app3通过zk3消费3个节点


public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            produce(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            produce(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            cosume(zk);
            cosume(zk);
            cosume(zk);
            break;
        }
    }

启动main函数


    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3
run1

run1: 执行app1–>zk1


#日志输出
/queue-fifo is exist!
create /queue-fifo/x1 x1

run2: 执行app2–>zk2


#日志输出
/queue-fifo is exist!
create /queue-fifo/x2 x2

run3: 执行app3–>zk3


#日志输出
/queue-fifo is exist!
delete /queue/x10000000034
delete /queue/x20000000035
No node to cosume

我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。

下面贴一下完整的代码:


package org.conan.zookeeper.demo;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class FIFOZooKeeper {

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        produce(zk, 1);
        produce(zk, 2);
        cosume(zk);
        cosume(zk);
        cosume(zk);
        zk.close();
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            produce(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            produce(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            cosume(zk);
            cosume(zk);
            cosume(zk);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        return new ZooKeeper(host, 60000, new Watcher() {
            public void process(WatchedEvent event) {
            }
        });
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        if (zk.exists("/queue-fifo", false) == null) {
            System.out.println("create /queue-fifo task-queue-fifo");
            zk.create("/queue-fifo", "task-queue-fifo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue-fifo is exist!");
        }
    }

    public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue-fifo/x" + x + " x" + x);
        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
        List list = zk.getChildren("/queue-fifo", true);
        if (list.size() > 0) {
            long min = Long.MAX_VALUE;
            for (String num : list) {
                if (min > Long.parseLong(num.substring(1))) {
                    min = Long.parseLong(num.substring(1));
                }
            }
            System.out.println("delete /queue/x" + min);
            zk.delete("/queue-fifo/x" + min, 0);
        } else {
            System.out.println("No node to cosume");
        }
    }

}

转载请注明出处:
http://blog.fens.me/zookeeper-queue-fifo

打赏作者

ZooKeeper实现分布式队列Queue

让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/zookeeper-queue
zookeeper-queue

前言
ZooKeeper是一个分步式的协作系统,何为协作,ZooKeeper价值又有何体现。通过这篇文章的分布式队列的案例,你将了解到ZooKeeper的强大。关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及使用

目录

  1. 分布式队列
  2. 设计思路
  3. 程序实现

1. 分布式队列

队列有很多种产品,大都是消息系统所实现的,像ActiveMQ,JBossMQ,RabbitMQ,IBM-MQ等。分步式队列产品并不太多,像Beanstalkd。

本文实现的分布式对列,是基于ZooKeeper现实的一种同步的分步式队列,当一个队列的成员都聚齐时,这个队列才可用,否则一直等待所有成员到达。

2. 设计思路

创建一个父目录 /queue,每个成员都监控(Watch)标志位目录/queue/start 是否存在,然后每个成员都加入这个队列,加入队列的方式就是创建 /queue/x(i)的临时目录节点,然后每个成员获取 /queue 目录的所有目录节点,也就是 x(i)。判断 i 的值是否已经是成员的个数,如果小于成员个数等待 /queue/start 的出现,如果已经相等就创建 /queue/start。

产品流程图
zookeeper-queue

应用实例
zookeeper-queue-dataprocess

图标解释

  1. app1,app2,app3,app4是4个独立的业务系统
  2. zk1,zk2,zk3是ZooKeeper集群的3个连接点
  3. /queue,是znode的队列,假设队列长度为3
  4. /queue/x1,是znode队列中,1号排对者,由app1提交,同步请求,app1挂载等待
  5. /queue/x2,是znode队列中,2号排对者,由app2提交,同步请求,app2挂起等待
  6. /queue/x3,是znode队列中,3号排对者,由app3提交,同步请求,app3挂起等待
  7. /queue/start,当znode队列中满了,触发创建开始节点
  8. 当/qeueu/start被创建后,app4被启动,所有zk的连接通知同步程序(红色线),队列已完成,所有程序结束

注:

  • 1). 创建/queue/x1,/queue/x2,/queue/x3没有前后顺序,提交后程序就同步挂起。
  • 2). app1可以通过zk2提交,app2也可通过zk3提交
  • 3). app1可以提交3次请求,生成x1,x2,x3使用队列充满
  • 4). /queue/start被创建后,zk1会监听到这个事件,再告诉app1,队列已完成!

3. 程序实现

1). 单节点模拟实验

模拟app1,通过zk1,提交3个请求


   public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        joinQueue(zk, 1);
        joinQueue(zk, 2);
        joinQueue(zk, 3);
        zk.close();
    }

创建一个与服务器的连接


    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监听/queue/start创建的事件
            public void process(WatchedEvent event) {
                if (event.getPath() != null && event.getPath().equals("/queue/start") && event.getType() == Event.EventType.NodeCreated) {
                    System.out.println("Queue has Completed.Finish testing!!!");
                }
            }
        });
        return zk;
    }

出始化队列


    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => /queue/start");
        zk.exists("/queue/start", true);

        if (zk.exists("/queue", false) == null) {
            System.out.println("create /queue task-queue");
            zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue is exist!");
        }
    }

增加队列节点


    public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue/x" + x + " x" + x);
        zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        isCompleted(zk);
    }

检查队列是否完整


    public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
        int size = 3;
        int length = zk.getChildren("/queue", true).size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create /queue/start start");
            zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } 
    }

启动函数main


public static void main(String[] args) throws Exception {
    doOne();
}

运行结果:


WATCH => /queue/start
/queue is exist!
create /queue/x1 x1
Queue Complete:1/3
create /queue/x2 x2
Queue Complete:2/3
create /queue/x3 x3
Queue Complete:3/3
create /queue/start start
Queue has Completed.Finish testing!!!

完全符合我的们预期。接下来我们看分布式环境

2). 分布式模拟实验

模拟app1通过zk1提交x1,app2通过zk2提交x2,app3通过zk3提交x3


    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            joinQueue(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            joinQueue(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            joinQueue(zk, 3);
            break;
        }
    }

注:

  • 1). 为了简单起见,我们没有增加复杂的多线程控制的机制。
  • 2). 没有调用zk.close()方法,也就是说,app1执行完单独的提交,app1就结束了,但zk1还存在着,所以/queue/x1存在于队列。
  • 3). 程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3

zk-run1

执行app1–>zk1


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x1 x1
Queue Complete:1/3

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 4] ls /queue
[x10000000011]

执行app2–>zk2


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x2 x2
Queue Complete:2/3

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 5] ls /queue
[x20000000012, x10000000011]

执行app3–>zk3


#日志输出
WATCH => /queue/start
/queue is exist!
create /queue/x3 x3
Queue Complete:3/3
create /queue/start start
Queue has Completed.Finish testing!!!

#zookeeper控制台
[zk: 192.168.1.201:2181(CONNECTED) 6] ls /queue
[x30000000016, x10000000014, start, x20000000015]

/queue/stats被建立,打印出“Queue has Completed.Finish testing!!!”,代表调用app4完成!

我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。

下面贴一下完整的代码:


package org.conan.zookeeper.demo;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;

public class QueueZooKeeper {

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        joinQueue(zk, 1);
        joinQueue(zk, 2);
        joinQueue(zk, 3);
        zk.close();
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            joinQueue(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            joinQueue(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            joinQueue(zk, 3);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                if (event.getType() == Event.EventType.NodeCreated && event.getPath().equals("/queue/start")) {
                    System.out.println("Queue has Completed.Finish testing!!!");
                }
            }
        });
        return zk;
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        System.out.println("WATCH => /queue/start");
        zk.exists("/queue/start", true);

        if (zk.exists("/queue", false) == null) {
            System.out.println("create /queue task-queue");
            zk.create("/queue", "task-queue".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue is exist!");
        }
    }

    public static void joinQueue(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue/x" + x + " x" + x);
        zk.create("/queue/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        isCompleted(zk);
    }

    public static void isCompleted(ZooKeeper zk) throws KeeperException, InterruptedException {
        int size = 3;
        int length = zk.getChildren("/queue", true).size();

        System.out.println("Queue Complete:" + length + "/" + size);
        if (length >= size) {
            System.out.println("create /queue/start start");
            zk.create("/queue/start", "start".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        } 
    }

}

转载请注明出处:
http://blog.fens.me/zookeeper-queue

打赏作者

ZooKeeper伪分布式集群安装及使用

让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-intro/

zookeeper

前言

ZooKeeper是Hadoop家族的一款高性能的分布式协作的产品。在单机中,系统协作大都是进程级的操作。分布式系统中,服务协作都是跨服务器才能完成的。在ZooKeeper之前,我们对于协作服务大都使用消息中间件,随着分布式系统的普及,用消息中间件完成协作,会有大量的程序开发。ZooKeeper直接面向于分布式系统,可以减少我们自己的开发,帮助我们更好完成分布式系统的数据管理问题。

目录

  1. zookeeper介绍
  2. zookeeper单节点安装
  3. zookeeper伪分布式集群安装
  4. zookeeper命令行操作
  5. Java编程现实命令行操作

1. zookeeper介绍

ZooKeeper是一个为分布式应用所设计的分布的、开源的协调服务,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper本身可以以Standalone模式安装运行,不过它的长处在于通过分布式ZooKeeper集群(一个Leader,多个Follower),基于一定的策略来保证ZooKeeper集群的稳定性和可用性,从而实现分布式应用的可靠性。

ZooKeeper是作为分布式协调服务,是不需要依赖于Hadoop的环境,也可以为其他的分布式环境提供服务。

2. zookeeper单节点安装Standalones模式

系统环境:

  • Linux Ubuntu 12.04.2 LTS 64bit server
  • Java: 1.6.0_29 64-Bit Server VM
~ uname -a
Linux conan 3.5.0-23-generic #35~precise1-Ubuntu SMP Fri Jan 25 17:13:26 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux

~ cat /etc/issue
Ubuntu 12.04.2 LTS \n \l

~ java -version
java version "1.6.0_29"
Java(TM) SE Runtime Environment (build 1.6.0_29-b11)
Java HotSpot(TM) 64-Bit Server VM (build 20.4-b02, mixed mode)

下载zookeeper

~ mkdir /home/conan/toolkit
~ cd /home/conan/toolkit
~ wget http://apache.dataguru.cn/zookeeper/stable/zookeeper-3.4.5.tar.gz
~ tar xvf zookeeper-3.4.5.tar.gz
~ mv zookeeper-3.4.5 zookeeper345
~ cd zookeeper345
~ ls -l
drwxr-xr-x  2 conan conan    4096 Aug 12 04:34 bin
-rw-r--r--  1 conan conan   75988 Aug 12 04:34 build.xml
-rw-r--r--  1 conan conan   70223 Aug 12 04:34 CHANGES.txt
drwxr-xr-x  2 conan conan    4096 Aug 12 04:34 conf
drwxr-xr-x 10 conan conan    4096 Aug 12 04:34 contrib
drwxr-xr-x  2 conan conan    4096 Aug 12 04:34 dist-maven
drwxr-xr-x  6 conan conan    4096 Aug 12 04:34 docs
-rw-r--r--  1 conan conan    1953 Aug 12 04:34 ivysettings.xml
-rw-r--r--  1 conan conan    3120 Aug 12 04:34 ivy.xml
drwxr-xr-x  4 conan conan    4096 Aug 12 04:34 lib
-rw-r--r--  1 conan conan   11358 Aug 12 04:34 LICENSE.txt
-rw-r--r--  1 conan conan     170 Aug 12 04:34 NOTICE.txt
-rw-r--r--  1 conan conan    1770 Aug 12 04:34 README_packaging.txt
-rw-r--r--  1 conan conan    1585 Aug 12 04:34 README.txt
drwxr-xr-x  5 conan conan    4096 Aug 12 04:34 recipes
drwxr-xr-x  8 conan conan    4096 Aug 12 04:34 src
-rw-r--r--  1 conan conan 1315806 Aug 12 04:34 zookeeper-3.4.5.jar
-rw-r--r--  1 conan conan     833 Aug 12 04:34 zookeeper-3.4.5.jar.asc
-rw-r--r--  1 conan conan      33 Aug 12 04:34 zookeeper-3.4.5.jar.md5
-rw-r--r--  1 conan conan      41 Aug 12 04:34 zookeeper-3.4.5.jar.sha1

修改配置文件conf/zoo.cfg


~ mkdir /home/conan/zoo/zk0
~ cp conf/zoo_sample.cfg conf/zoo.cfg

~ vi conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/conan/zoo/zk0
clientPort=2181

非常简单,我们已经配置好了的zookeeper单节点

启动zookeeper


~ bin/zkServer.sh
JMX enabled by default
Using config: /home/conan/zoo/zk0/zookeeper345/bin/../conf/zoo.cfg
Usage: bin/zkServer.sh {start|start-foreground|stop|restart|status|upgrade|print-cmd}
conan@conan:~/zoo/zk0/zookeeper345$ bin/zkServer.sh start
JMX enabled by default
Using config: /home/conan/zoo/zk0/zookeeper345/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

#zk的服务显示为QuorumPeerMain
~ jps
5321 QuorumPeerMain
5338 Jps

#查看运行状态
~ bin/zkServer.sh status
JMX enabled by default
Using config: /home/conan/zoo/zk0/zookeeper345/bin/../conf/zoo.cfg
Mode: standalone

单节点的时,Mode会显示为standalone

停止ZooKeeper服务


~ bin/zkServer.sh stop
JMX enabled by default
Using config: /home/conan/zoo/zk0/zookeeper345/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

3. zookeeper伪分布式集群安装

所谓 “伪分布式集群” 就是在,在一台PC中,启动多个ZooKeeper的实例。“完全分布式集群” 是每台PC,启动一个ZooKeeper实例。

由于我的测试环境PC数量有限,所以在一台PC中,启动3个ZooKeeper的实例。

创建环境目录


~ mkdir /home/conan/zoo/zk1
~ mkdir /home/conan/zoo/zk2
~ mkdir /home/conan/zoo/zk3

#新建myid文件
~ echo "1" > /home/conan/zoo/zk1/myid
~ echo "2" > /home/conan/zoo/zk2/myid
~ echo "3" > /home/conan/zoo/zk3/myid

分别修改配置文件
修改:dataDir,clientPort
增加:集群的实例,server.X,”X”表示每个目录中的myid的值


~ vi /home/conan/toolkit/zookeeper345/conf/zk1.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/conan/zoo/zk1
clientPort=2181
server.1=192.168.1.201:2888:3888
server.2=192.168.1.201:2889:3889
server.3=192.168.1.201:2890:3890

~ vi /home/conan/toolkit/zookeeper345/conf/zk2.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/conan/zoo/zk2
clientPort=2182
server.1=192.168.1.201:2888:3888
server.2=192.168.1.201:2889:3889
server.3=192.168.1.201:2890:3890

~ vi /home/conan/toolkit/zookeeper345/conf/zk3.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/home/conan/zoo/zk3
clientPort=2183
server.1=192.168.1.201:2888:3888
server.2=192.168.1.201:2889:3889
server.3=192.168.1.201:2890:3890

3个节点的ZooKeeper集群配置完成,接下来我们的启动服务。

启动集群


~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh start zk1.cfg
~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh start zk2.cfg
~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh start zk3.cfg

~ jps
5422 QuorumPeerMain
5395 QuorumPeerMain
5463 QuorumPeerMain
5494 Jps

#查看节点状态
~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh status zk1.cfg
JMX enabled by default
Using config: /home/conan/toolkit/zookeeper345/bin/../conf/zk1.cfg
Mode: follower

~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh status zk2.cfg
JMX enabled by default
Using config: /home/conan/toolkit/zookeeper345/bin/../conf/zk2.cfg
Mode: leader

~ /home/conan/toolkit/zookeeper345/bin/zkServer.sh status zk3.cfg
JMX enabled by default
Using config: /home/conan/toolkit/zookeeper345/bin/../conf/zk3.cfg
Mode: follower

我们可以看到zk2是leader,zk1和zk3是follower

查看ZooKeeper物理文件目录结构


~ tree  -L 3 /home/conan/zoo
/home/conan/zoo
├── zk0
├── zk1
│   ├── myid
│   ├── version-2
│   │   ├── acceptedEpoch
│   │   ├── currentEpoch
│   │   ├── log.100000001
│   │   └── snapshot.0
│   └── zookeeper_server.pid
├── zk2
│   ├── myid
│   ├── version-2
│   │   ├── acceptedEpoch
│   │   ├── currentEpoch
│   │   ├── log.100000001
│   │   └── snapshot.0
│   └── zookeeper_server.pid
└── zk3
    ├── myid
    ├── version-2
    │   ├── acceptedEpoch
    │   ├── currentEpoch
    │   ├── log.100000001
    │   └── snapshot.100000000
    └── zookeeper_server.pid

7 directories, 18 files

4. zookeeper命令行操作

我们通过客户端连接ZooKeeper的集群,我们可以任意的zookeeper是进行连接。


~ /home/conan/toolkit/zookeeper345/bin/zkCli.sh -server 192.168.1.201:2181

Connecting to 192.168.1.201
2013-08-12 05:25:39,260 [myid:] - INFO  [main:Environment@100] - Client environment:zookeeper.version=3.4.5-1392090, built on 09/30/2012 17:52 GMT
2013-08-12 05:25:39,267 [myid:] - INFO  [main:Environment@100] - Client environment:host.name=conan
2013-08-12 05:25:39,269 [myid:] - INFO  [main:Environment@100] - Client environment:java.version=1.6.0_29
2013-08-12 05:25:39,269 [myid:] - INFO  [main:Environment@100] - Client environment:java.vendor=Sun Microsystems Inc.
2013-08-12 05:25:39,270 [myid:] - INFO  [main:Environment@100] - Client environment:java.home=/home/conan/toolkit/jdk16/jre
2013-08-12 05:25:39,270 [myid:] - INFO  [main:Environment@100] - Client environment:java.class.path=/home/conan/toolkit/zookeeper345/bin/../build/classes:/home/conan/toolkit/zookeeper345/bin/../build/lib/*.jar:/home/conan/toolkit/zookeeper345/bin/../lib/slf4j-log4j12-1.6.1.jar:/home/conan/toolkit/zookeeper345/bin/../lib/slf4j-api-1.6.1.jar:/home/conan/toolkit/zookeeper345/bin/../lib/netty-3.2.2.Final.jar:/home/conan/toolkit/zookeeper345/bin/../lib/log4j-1.2.15.jar:/home/conan/toolkit/zookeeper345/bin/../lib/jline-0.9.94.jar:/home/conan/toolkit/zookeeper345/bin/../zookeeper-3.4.5.jar:/home/conan/toolkit/zookeeper345/bin/../src/java/lib/*.jar:/home/conan/toolkit/zookeeper345/bin/../conf:
2013-08-12 05:25:39,271 [myid:] - INFO  [main:Environment@100] - Client environment:java.library.path=/home/conan/toolkit/jdk16/jre/lib/amd64/server:/home/conan/toolkit/jdk16/jre/lib/amd64:/home/conan/toolkit/jdk16/jre/../lib/amd64:/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib
2013-08-12 05:25:39,275 [myid:] - INFO  [main:Environment@100] - Client environment:java.io.tmpdir=/tmp
2013-08-12 05:25:39,276 [myid:] - INFO  [main:Environment@100] - Client environment:java.compiler=
2013-08-12 05:25:39,276 [myid:] - INFO  [main:Environment@100] - Client environment:os.name=Linux
2013-08-12 05:25:39,277 [myid:] - INFO  [main:Environment@100] - Client environment:os.arch=amd64
2013-08-12 05:25:39,281 [myid:] - INFO  [main:Environment@100] - Client environment:os.version=3.5.0-23-generic
2013-08-12 05:25:39,282 [myid:] - INFO  [main:Environment@100] - Client environment:user.name=conan
2013-08-12 05:25:39,282 [myid:] - INFO  [main:Environment@100] - Client environment:user.home=/home/conan
2013-08-12 05:25:39,283 [myid:] - INFO  [main:Environment@100] - Client environment:user.dir=/home/conan/zoo
2013-08-12 05:25:39,284 [myid:] - INFO  [main:ZooKeeper@438] - Initiating client connection, connectString=192.168.1.201 sessionTimeout=30000 watcher=org.apache.zookeeper.ZooKeeperMain$MyWatcher@22ba6c83
Welcome to ZooKeeper!
JLine support is enabled
[zk: 192.168.1.201(CONNECTING) 0] 2013-08-12 05:25:39,336 [myid:] - INFO  [main-SendThread(192.168.1.201:2181):ClientCnxn$SendThread@966] - Opening socket connection to server 192.168.1.201/192.168.1.201:2181. Will not attempt to authenticate using SASL (Unable to locate a login configuration)
2013-08-12 05:25:39,345 [myid:] - INFO  [main-SendThread(192.168.1.201:2181):ClientCnxn$SendThread@849] - Socket connection established to 192.168.1.201/192.168.1.201:2181, initiating session
2013-08-12 05:25:39,384 [myid:] - INFO  [main-SendThread(192.168.1.201:2181):ClientCnxn$SendThread@1207] - Session establishment complete on server 192.168.1.201/192.168.1.201:2181, sessionid = 0x1406f3c1ef90001, negotiated timeout = 30000

WATCHER::

WatchedEvent state:SyncConnected type:None path:null

[zk: 192.168.1.201(CONNECTED) 0]

集群已连接,下面我们要使用一下,ZooKeeper的命令行操作。

命令行操作
通过help打印命令行帮助


[zk: 192.168.1.201(CONNECTED) 1] help
ZooKeeper -server host:port cmd args
        connect host:port
        get path [watch]
        ls path [watch]
        set path data [version]
        rmr path
        delquota [-n|-b] path
        quit
        printwatches on|off
        create [-s] [-e] path data acl
        stat path [watch]
        close
        ls2 path [watch]
        history
        listquota path
        setAcl path acl
        getAcl path
        sync path
        redo cmdno
        addauth scheme auth
        delete path [version]
        setquota -n|-b val path

ZooKeeper的结构,很像是目录结构,我们看到了像ls这样熟悉的命令。


#ls,查看/目录内容
[zk: 192.168.1.201(CONNECTED) 1] ls /
[zookeeper]

#create,创建一个znode节点
[zk: 192.168.1.201(CONNECTED) 2] create /node conan
Created /node

#ls,再查看/目录
[zk: 192.168.1.201(CONNECTED) 3] ls /
[node, zookeeper]

#get,查看/node的数据信息
[zk: 192.168.1.201(CONNECTED) 4] get /node
conan
cZxid = 0x100000006
ctime = Mon Aug 12 05:32:49 CST 2013
mZxid = 0x100000006
mtime = Mon Aug 12 05:32:49 CST 2013
pZxid = 0x100000006
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 5
numChildren = 0

#set,修改数据
[zk: 192.168.1.201(CONNECTED) 5] set /node fens.me
cZxid = 0x100000006
ctime = Mon Aug 12 05:32:49 CST 2013
mZxid = 0x100000007
mtime = Mon Aug 12 05:34:32 CST 2013
pZxid = 0x100000006
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0

#get,再查看/node的数据信息,已改为fens.me
[zk: 192.168.1.201(CONNECTED) 6] get /node
fens.me
cZxid = 0x100000006
ctime = Mon Aug 12 05:32:49 CST 2013
mZxid = 0x100000007
mtime = Mon Aug 12 05:34:32 CST 2013
pZxid = 0x100000006
cversion = 0
dataVersion = 1
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 0

#delete,删除/node
[zk: 192.168.1.201(CONNECTED) 7] delete /node
[zk: 192.168.1.201(CONNECTED) 8] ls /
[zookeeper]

#quit,退出客户端连接
[zk: 192.168.1.201(CONNECTED) 19] quit
Quitting...
2013-08-12 05:40:29,304 [myid:] - INFO  [main:ZooKeeper@684] - Session: 0x1406f3c1ef90002 closed
2013-08-12 05:40:29,305 [myid:] - INFO  [main-EventThread:ClientCnxn$EventThread@509] - EventThread shut down

5. Java编程现实命令行操作


package org.conan.zookeeper.demo;

import java.io.IOException;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class BasicDemo1 {

    public static void main(String[] args) throws IOException, KeeperException, InterruptedException {
        // 创建一个与服务器的连接
        ZooKeeper zk = new ZooKeeper("192.168.1.201:2181", 60000, new Watcher() {
            // 监控所有被触发的事件
            public void process(WatchedEvent event) {
                System.out.println("EVENT:" + event.getType());
            }
        });

        // 查看根节点
        System.out.println("ls / => " + zk.getChildren("/", true));

        // 创建一个目录节点
        if (zk.exists("/node", true) == null) {
            zk.create("/node", "conan".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("create /node conan");
            // 查看/node节点数据
            System.out.println("get /node => " + new String(zk.getData("/node", false, null)));
            // 查看根节点
            System.out.println("ls / => " + zk.getChildren("/", true));
        }

        // 创建一个子目录节点
        if (zk.exists("/node/sub1", true) == null) {
            zk.create("/node/sub1", "sub1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            System.out.println("create /node/sub1 sub1");
            // 查看node节点
            System.out.println("ls /node => " + zk.getChildren("/node", true));
        }

        // 修改节点数据
        if (zk.exists("/node", true) != null) {
            zk.setData("/node", "changed".getBytes(), -1);
            // 查看/node节点数据
            System.out.println("get /node => " + new String(zk.getData("/node", false, null)));
        }

        // 删除节点
        if (zk.exists("/node/sub1", true) != null) {
            zk.delete("/node/sub1", -1);
            zk.delete("/node", -1);
            // 查看根节点
            System.out.println("ls / => " + zk.getChildren("/", true));
        }

        // 关闭连接
        zk.close();
    }
}

运行结果:

2013-08-12 15:33:29,699 [myid:] - INFO  [main:Environment@97] - Client environment:zookeeper.version=3.3.1-942149, built on 05/07/2010 17:14 GMT
2013-08-12 15:33:29,702 [myid:] - INFO  [main:Environment@97] - Client environment:host.name=PC201304202140
2013-08-12 15:33:29,702 [myid:] - INFO  [main:Environment@97] - Client environment:java.version=1.6.0_45
2013-08-12 15:33:29,702 [myid:] - INFO  [main:Environment@97] - Client environment:java.vendor=Sun Microsystems Inc.
2013-08-12 15:33:29,702 [myid:] - INFO  [main:Environment@97] - Client environment:java.home=D:\toolkit\java\jdk6\jre
2013-08-12 15:33:29,703 [myid:] - INFO  [main:Environment@97] - Client environment:java.class.path=D:\workspace\java\zkdemo\target\classes;C:\Users\Administrator\.m2\repository\org\apache\hadoop\zookeeper\3.3.1\zookeeper-3.3.1.jar;C:\Users\Administrator\.m2\repository\log4j\log4j\1.2.15\log4j-1.2.15.jar;C:\Users\Administrator\.m2\repository\javax\mail\mail\1.4\mail-1.4.jar;C:\Users\Administrator\.m2\repository\javax\activation\activation\1.1\activation-1.1.jar;C:\Users\Administrator\.m2\repository\jline\jline\0.9.94\jline-0.9.94.jar;C:\Users\Administrator\.m2\repository\junit\junit\3.8.1\junit-3.8.1.jar
2013-08-12 15:33:29,703 [myid:] - INFO  [main:Environment@97] - Client environment:java.library.path=D:\toolkit\java\jdk6\bin;C:\Windows\Sun\Java\bin;C:\Windows\system32;C:\Windows;D:\toolkit\Rtools\bin;D:\toolkit\Rtools\gcc-4.6.3\bin;C:\Program Files (x86)\Common Files\NetSarang;C:\Windows\system32;C:\Windows;C:\Windows\System32\Wbem;C:\Windows\System32\WindowsPowerShell\v1.0\;D:\toolkit\Git\cmd;D:\toolkit\Git\bin;C:\Program Files (x86)\Microsoft SQL Server\100\Tools\Binn\;C:\Program Files\Microsoft SQL Server\100\Tools\Binn\;C:\Program Files\Microsoft SQL Server\100\DTS\Binn\;c:\Program Files (x86)\Common Files\Ulead Systems\MPEG;C:\Program Files (x86)\QuickTime\QTSystem\;D:\toolkit\MiKTex\miktex\bin\x64\;D:\toolkit\sshclient;D:\toolkit\ant19\bin;D:\toolkit\eclipse;D:\toolkit\gradle15\bin;D:\toolkit\java\jdk6\bin;D:\toolkit\maven3\bin;D:\toolkit\mysql56\bin;D:\toolkit\python27;D:\toolkit\putty;C:\Program Files\R\R-3.0.1\bin\x64;D:\toolkit\mongodb243\bin;D:\toolkit\php54;D:\toolkit\nginx140;D:\toolkit\nodejs;D:\toolkit\npm12\bin;D:\toolkit\java\jdk6\jre\bin\server;.
2013-08-12 15:33:29,703 [myid:] - INFO  [main:Environment@97] - Client environment:java.io.tmpdir=C:\Users\ADMINI~1\AppData\Local\Temp\
2013-08-12 15:33:29,704 [myid:] - INFO  [main:Environment@97] - Client environment:java.compiler=
2013-08-12 15:33:29,704 [myid:] - INFO  [main:Environment@97] - Client environment:os.name=Windows 7
2013-08-12 15:33:29,704 [myid:] - INFO  [main:Environment@97] - Client environment:os.arch=amd64
2013-08-12 15:33:29,705 [myid:] - INFO  [main:Environment@97] - Client environment:os.version=6.1
2013-08-12 15:33:29,705 [myid:] - INFO  [main:Environment@97] - Client environment:user.name=Administrator
2013-08-12 15:33:29,705 [myid:] - INFO  [main:Environment@97] - Client environment:user.home=C:\Users\Administrator
2013-08-12 15:33:29,706 [myid:] - INFO  [main:Environment@97] - Client environment:user.dir=D:\workspace\java\zkdemo
2013-08-12 15:33:29,707 [myid:] - INFO  [main:ZooKeeper@373] - Initiating client connection, connectString=192.168.1.201:2181 sessionTimeout=60000 watcher=org.conan.zookeeper.demo.BasicDemo1$1@3dfeca64
2013-08-12 15:33:29,731 [myid:] - INFO  [main-SendThread():ClientCnxn$SendThread@1000] - Opening socket connection to server /192.168.1.201:2181
2013-08-12 15:33:38,736 [myid:] - INFO  [main-SendThread(192.168.1.201:2181):ClientCnxn$SendThread@908] - Socket connection established to 192.168.1.201/192.168.1.201:2181, initiating session
2013-08-12 15:33:38,804 [myid:] - INFO  [main-SendThread(192.168.1.201:2181):ClientCnxn$SendThread@701] - Session establishment complete on server 192.168.1.201/192.168.1.201:2181, sessionid = 0x1406f3c1ef9000d, negotiated timeout = 60000
EVENT:None
ls / => [zookeeper]
EVENT:NodeCreated
EVENT:NodeChildrenChanged
create /node conan
get /node => conan
ls / => [node, zookeeper]
EVENT:NodeCreated
create /node/sub1 sub1
ls /node => [sub1]
EVENT:NodeDataChanged
get /node => changed
EVENT:NodeDeleted
EVENT:NodeChildrenChanged
EVENT:NodeChildrenChanged
ls / => [zookeeper]
2013-08-12 15:33:38,877 [myid:] - INFO  [main:ZooKeeper@538] - Session: 0x1406f3c1ef9000d closed

pom.xml,maven配置文件

<?xml version="1.0" encoding="UTF-8"?>
<project
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<groupId>org.conan.zookeeper</groupId>
<artifactId>zkdemo</artifactId>
<version>0.0.1</version>
<name>zkdemo</name>
<description>zkdemo</description>

<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.1</version>
<exclusions>
<exclusion>
<groupId>javax.jms</groupId>
<artifactId>jms</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jdmk</groupId>
<artifactId>jmxtools</artifactId>
</exclusion>
<exclusion>
<groupId>com.sun.jmx</groupId>
<artifactId>jmxri</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</project>

基础讲完,接下来就让我动手管理起,我们自己的分布式系统吧。

转载请注明出处:
http://blog.fens.me/hadoop-zookeeper-intro/

打赏作者