ZooKeeper实现分布式FIFO队列

让Hadoop跑在云端系列文章,介绍了如何整合虚拟化和Hadoop,让Hadoop集群跑在VPS虚拟主机上,通过云向用户提供存储和计算的服务。

现在硬件越来越便宜,一台非品牌服务器,2颗24核CPU,配48G内存,2T的硬盘,已经降到2万块人民币以下了。这种配置如果简单地放几个web应用,显然是奢侈的浪费。就算是用来实现单节点的hadoop,对计算资源浪费也是非常高的。对于这么高性能的计算机,如何有效利用计算资源,就成为成本控制的一项重要议题了。

通过虚拟化技术,我们可以将一台服务器,拆分成12台VPS,每台2核CPU,4G内存,40G硬盘,并且支持资源重新分配。多么伟大的技术啊!现在我们有了12个节点的hadoop集群, 让Hadoop跑在云端,让世界加速。

关于作者:

  • 张丹(Conan), 程序员Java,R,PHP,Javascript
  • weibo:@Conan_Z
  • blog: http://blog.fens.me
  • email: bsspirit@gmail.com

转载请注明出处:
http://blog.fens.me/zookeeper-queue-fifo

zookeeper-fifo-queue

前言

ZooKeeper是一个强大的分布式协作系统,用ZooKeeper可以方便地实现先进先出(FIFO)队列。给“队列”的技术现实多一种选择,标准化我们的程序结构。另一篇,分步式同步队列实现,请参考:ZooKeeper实现分布式队列Queue

关于ZooKeeper的基本使用,请参考:ZooKeeper伪分步式集群安装及使用

目录

  1. 分布式先进先出(FIFO)队列
  2. 设计思路
  3. 程序实现

1. 分布式先进先出(FIFO)队列

在计算机科学中,消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式。消息队列提供了异步的通信协议,消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。

先进先出(FIFO)队列,是消息队列最基本的一种实现形式,先发出的先消费。

2. 设计思路

实现的思路也非常简单,在/queue-fifo的目录下创建 SEQUENTIAL 类型的子目录 /x(i),这样就能保证所有成员加入队列时都是有编号的,出队列时通过 getChildren( ) 方法可以返回当前所有的队列中的元素,然后消费其中最小的一个,这样就能保证FIFO。

zookeeper-queue-fifo

应用实例
zookeeper-queue-fifo-app
图标解释

  1. app1,app2,app3是3个独立的业务系统
  2. zk1,zk2,zk3是ZooKeeper集群的3个连接点
  3. /queue-fifo,是znode的队列,按顺序存储数据
  4. /queue-fifo/x1,是znode队列中,1号排对者,由app1提交
  5. /queue-fifo/x2,是znode队列中,2号排对者,由app2提交
  6. app3是消费者,通过zk3连接到znode队列中,找到/queue-fifo中顺序最少的节点消费,删除消费后的节点(红色线表示)

注:

  • 1). app1可以通过zk2提交,app2也可通过zk3提交
  • 2). app1可以提交3次请求,生成x1,x2,x3多个节点
  • 3). app1可以作为消费者,消费队列数据

3. 程序实现

1). 单节点模拟实验

模拟app1,通过zk1,生产2个节点,然后再消费3个节点。


    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        produce(zk, 1);
        produce(zk, 2);
        cosume(zk);
        cosume(zk);
        cosume(zk);
        zk.close();
    }

创建一个与服务器的连接


    public static ZooKeeper connection(String host) throws IOException {
        ZooKeeper zk = new ZooKeeper(host, 60000, null);
        return zk;
    }

出始化队列


    public static ZooKeeper connection(String host) throws IOException {
        return new ZooKeeper(host, 60000, new Watcher() {
            public void process(WatchedEvent event) {
            }
        });
    }

生产者


    public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue-fifo/x" + x + " x" + x);
        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

消费者


    public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
        List list = zk.getChildren("/queue-fifo", true);
        if (list.size() > 0) {
            long min = Long.MAX_VALUE;
            for (String num : list) {
                if (min > Long.parseLong(num.substring(1))) {
                    min = Long.parseLong(num.substring(1));
                }
            }
            System.out.println("delete /queue/x" + min);
            zk.delete("/queue-fifo/x" + min, 0);
        } else {
            System.out.println("No node to cosume");
        }
    }

启动main函数


    public static void main(String[] args) throws Exception {
            doOne();
    }

运行结果:


/queue-fifo is exist!
create /queue-fifo/x1 x1
create /queue-fifo/x2 x2
delete /queue/x10000000032
delete /queue/x20000000033
No node to cosume

完全符合我的们预期,由于produce时,我们创建的节点模式是EPHEMERAL_SEQUENTIAL,所以系统会在x(i)(n),随机生成n=0000000032,输出为x10000000032。

接下来我们看分布式环境。

2). 分布式模拟实验
app1通过zk1生产x1, app2通过zk2生产x2, app3通过zk3消费3个节点


public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            produce(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            produce(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            cosume(zk);
            cosume(zk);
            cosume(zk);
            break;
        }
    }

启动main函数


    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

程序启动方法,分3次启动,命令行传不同的参数,分别是1,2,3
run1

run1: 执行app1–>zk1


#日志输出
/queue-fifo is exist!
create /queue-fifo/x1 x1

run2: 执行app2–>zk2


#日志输出
/queue-fifo is exist!
create /queue-fifo/x2 x2

run3: 执行app3–>zk3


#日志输出
/queue-fifo is exist!
delete /queue/x10000000034
delete /queue/x20000000035
No node to cosume

我们完成分布式队列的实验,由于时间仓促。文字说明及代码难免有一些问题,请发现问题的同学帮忙指正。

下面贴一下完整的代码:


package org.conan.zookeeper.demo;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.ZooKeeper;

public class FIFOZooKeeper {

    public static void main(String[] args) throws Exception {
        if (args.length == 0) {
            doOne();
        } else {
            doAction(Integer.parseInt(args[0]));
        }
    }

    public static void doOne() throws Exception {
        String host1 = "192.168.1.201:2181";
        ZooKeeper zk = connection(host1);
        initQueue(zk);
        produce(zk, 1);
        produce(zk, 2);
        cosume(zk);
        cosume(zk);
        cosume(zk);
        zk.close();
    }

    public static void doAction(int client) throws Exception {
        String host1 = "192.168.1.201:2181";
        String host2 = "192.168.1.201:2182";
        String host3 = "192.168.1.201:2183";

        ZooKeeper zk = null;
        switch (client) {
        case 1:
            zk = connection(host1);
            initQueue(zk);
            produce(zk, 1);
            break;
        case 2:
            zk = connection(host2);
            initQueue(zk);
            produce(zk, 2);
            break;
        case 3:
            zk = connection(host3);
            initQueue(zk);
            cosume(zk);
            cosume(zk);
            cosume(zk);
            break;
        }
    }

    // 创建一个与服务器的连接
    public static ZooKeeper connection(String host) throws IOException {
        return new ZooKeeper(host, 60000, new Watcher() {
            public void process(WatchedEvent event) {
            }
        });
    }

    public static void initQueue(ZooKeeper zk) throws KeeperException, InterruptedException {
        if (zk.exists("/queue-fifo", false) == null) {
            System.out.println("create /queue-fifo task-queue-fifo");
            zk.create("/queue-fifo", "task-queue-fifo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        } else {
            System.out.println("/queue-fifo is exist!");
        }
    }

    public static void produce(ZooKeeper zk, int x) throws KeeperException, InterruptedException {
        System.out.println("create /queue-fifo/x" + x + " x" + x);
        zk.create("/queue-fifo/x" + x, ("x" + x).getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    }

    public static void cosume(ZooKeeper zk) throws KeeperException, InterruptedException {
        List list = zk.getChildren("/queue-fifo", true);
        if (list.size() > 0) {
            long min = Long.MAX_VALUE;
            for (String num : list) {
                if (min > Long.parseLong(num.substring(1))) {
                    min = Long.parseLong(num.substring(1));
                }
            }
            System.out.println("delete /queue/x" + min);
            zk.delete("/queue-fifo/x" + min, 0);
        } else {
            System.out.println("No node to cosume");
        }
    }

}

转载请注明出处:
http://blog.fens.me/zookeeper-queue-fifo

打赏作者

This entry was posted in Hadoop实践, JAVA语言实践, 架构设计

0 0 votes
Article Rating
Subscribe
Notify of
guest

This site uses Akismet to reduce spam. Learn how your comment data is processed.

15 Comments
Oldest
Newest Most Voted
Inline Feedbacks
View all comments

[…] ZooKeeper实现分布式FIFO队列 […]

icesnow

请问你的/queue-info是建立在HDFS上么?如果不是,怎么保证分布环境中对该目录的访问?如果是,那么这种创建和消费应该是依赖于HDFS的分布式访问机制,如何体现ZooKeeper的分布式协调作用呢?

Conan Zhang

1. 不是。
2. zk单独维护一套分布式文件系统。
3. zk原理看官方文档。

phil

你这里序号都是自己分配的.现实中,app很难知道应该输入哪个号码.需要有一个序列号产生器.你文章中最好提下.

最好是利用zookeeper来实现递增.并且保证不同app入列的同步性.

Conan Zhang

文章只是一种业务模拟的情况,不能解决所有的问题。
ID的问题很常见,找个地方存一下就行了,和文章没什么关系。

ddd

扯蛋的设计,只有一个消费者

Conan Zhang

这是一个ZK的原型场景,produce和cosume在原型上都可以任意增加数量。不要匿名就乱喷,注册一个真实的姓名,说点实际的问题。

大鹏

问一个问题哈, zk已经集群了,我看你实例代码中只有一个消费者,如果这个消费的zk down 了的话 其他 zk是否可以充当消费者的角色

Conan Zhang

你可以多注册几个消费者,用于backup。

viscar

让hadoop运行在VPS上,你知道hadoop的应用场景不,让hadoop跑在运端就是一种错误的思路

Conan Zhang

不要以“今天”的观点,点评1年前文章的各种不对,已经意义不大了。

pseudocodes

只能说是toy式的设计,无法用于实际生产环境

yui

好奇怪的想法,有没有考虑过节点数量很多,消息更多的情况?光是取最小值就翘翘了

Conan Zhang

本文仅是一个原型。

王志

楼主的的确只是个原形,想法挺好,但是实际应用场景估计有限。

15
0
Would love your thoughts, please comment.x
()
x