实现分布式队列通过 Curator

Curator Framework 深入了解

本文受到 colobu 前辈文章的指引,深入了解 Curator Framework 的工作流程,十分感谢 colobu 前辈的博文给予的启发和指导。

分布式队列实现(DistributedQueue 实现)

DistributedQueue 是最普通的一种队列。 它设计以下四个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

创建队列使用 QueueBuilder,它也是其它队列的创建类,看看他的 builder 方法:

    public static <T> QueueBuilder<T> builder(CuratorFramework client, QueueConsumer<T> consumer, QueueSerializer<T> serializer, String queuePath) {
            return new QueueBuilder(client, consumer, serializer, queuePath);
        }

这里有四个入参,分别对应着客户端连接对象,consumer 对象,serializer 对象,path 节点对象。队列的消费就是通过 consumer 对象来实现的;serializer 对象负责存入 queue 的数据序列化和消费时的反序列化。

    // 创建了一个没有 consumer 的 builder
    QueueBuilder<String> builder = QueueBuilder.builder(client, null, createQueueSerializer(), PATH); 
    // 创建了一个 queue 对象
    DistributedQueue<String> queue = builder.buildQueue();
    // 启动 queue
    queue.start();

    // 这样操作就可以往 queue 里塞入消息了
    queue.put("Test Message.");

注意此时的 queue 是没有消费者的,如果需要消费者可以新建一个 queue_2 对象来消费对应 queuePath 的消息队列。当然也可以在创建 queue 对象的时候配置好 consumer 就可以即刻消费了。

    // 注意这里的第二个入参,配置了 consumer,此时的 queue_2 如果 start 会直接开始消费队列中的消息
    DistributedQueue<String> queue_2 = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH).buildQueue();

具体的逻辑可以看 queue.start() 时做了什么操作:

    // DistributedQueue.class
    private final boolean isProducerOnly;
    ...
    // 构造函数中做了如下判断
    this.isProducerOnly = consumer == null;

    // 下面就是配置了 consumer 时会进行的操作,通过 runLoop 方法去不停的消费队列
    // public void start() throws Exception
    if (!this.isProducerOnly) {
        this.service.submit(new Callable<Object>() {
             public Object call() {
                 DistributedQueue.this.runLoop();
                 return null;
             }
        });
    }

    // private DistributedQueue.ProcessMessageBytesCode processMessageBytes(String itemNode, byte[] bytes) throws Exception 
    // 伪代码 可以看到这个 processMessageBytes 方法是真正消费的地方,先把消息反序列化之后再使用 consumer 对象的 consumeMessage() 方法
    this.consumer.consumeMessage(ItemSerializer.deserialize(bytes, this.serializer));

上面的代码中还有个缺点,通过源码可知,消费队列是先将消息从队列中移除,再由 consumer 消费。 这两个步骤不是原子的,QueueBuilder 提供了 lockPath(String path) 方法以保证消费安全。当消费者消费数据时持有锁,这样其它消费者不能消费此消息。如果消费失败或者进程死掉,消息可以交给其它进程。这会带来一点性能的损失。 最好还是单消费者模式使用队列。

    // DistributedQueue.class
    // private void processChildren(List<String> children, long currentVersion) throws Exception
    // 这里就可以看到加锁和不加锁采用的是不同的策略
    if (isUsingLockSafety) {
        DistributedQueue.this.processWithLockSafety(itemNode, DistributedQueue.ProcessType.NORMAL);
    } else {
        DistributedQueue.this.processNormally(itemNode, DistributedQueue.ProcessType.NORMAL);
    }

分布式含ID队列实现(DistributedIdQueue 实现)

DistributedIdQueue 和上面的队列类似, 但是可以为队列中的每一个元素设置一个ID。 可以通过ID把队列中任意的元素移除。

通过下面方法创建:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除元素时:

int numberRemoved = queue.remove(messageId);

看下他是如何实现 id 这个属性的:

    // DistributedIdQueue.class
    private String makeIdPath(String itemId) {
       return this.queue.makeItemPath() + '|' + fixId(itemId) + '|';
    }

    // DistributedQueue.class
    String makeItemPath() {
       return ZKPaths.makePath(this.queuePath, "queue-");
    }

可以看到他是直接通过 id 的值加入 path 生成了一个指定的节点存储数据,这样也可以逆向操作得到该节点的 path 从而删除元素。

添加元素调用的都是 DistributedQueue 中的 internalPut() 方法:

    boolean internalPut(T item, MultiItem<T> multiItem, String path, int maxWait, TimeUnit unit) throws Exception

DistributedIdQueue 和 DistributedQueue 添加元素的 put 方法实际上都是调用到这个方法。DistributedIdQueue 是自己构建了 path,而 DistributedQueue 是自动生成如下的节点 path。

queue-0000000009
queue-0000000008
queue-0000000007
queue-0000000006
queue-0000000005
queue-0000000004
queue-0000000003
queue-0000000002

分布式优先级队列实现(DistributedPriorityQueue 实现)

优先级队列对队列中的消息按照优先级进行排序。 Priority 越小越靠前, 优先被消费。

创建一个 DistributedPriorityQueue 的方式如下:

    DistributedPriorityQueue<String> queue = builder.buildPriorityQueue(0/*minItemsBeforeRefresh*/);

可以看到只需要配置一个 minItemsBeforeRefresh 参数,这个参数用来对比当前是否需要进行重排序;需要强制重排序还需要配合 refreshOnWatch 参数,不过在 builder 创建 DistributedPriorityQueue 的时候就在 DistributedQueue 的构造参数上设置该值为 true 了。

    // DistributedPriorityQueue 的构造参数
    DistributedPriorityQueue(.../*很多入参*/) {
            Preconditions.checkArgument(minItemsBeforeRefresh >= 0, "minItemsBeforeRefresh cannot be negative");
            this.queue = new DistributedQueue(client, consumer, serializer, queuePath, threadFactory, executor, minItemsBeforeRefresh, true/*refreshOnWatch 直接设置为 true 了。*/, lockPath, maxItems, putInBackground, finalFlushMs);
        }

强制重排序的逻辑如下:

    // DistributedQueue.class
    //private void processChildren(List<String> children, long currentVersion) throws Exception 
    int min = this.minItemsBeforeRefresh;
    ...
    while(..){
        // min 就是强制刷新所需的最小的元素数量,或称之你的程序可以容忍的不排序的最小值。
        // 从源码可以看出 minItemsBeforeRefresh 被设置为 1 或者 0 都是可以直接触发重排序的一个决定因素
        if (min-- <= 0 && this.refreshOnWatch && currentVersion != this.childrenCache.getData().version) {
            // 这里的 processedLatch 是一个 Semaphore 对象 
            // final Semaphore processedLatch = new Semaphore(0);
            // 可以看到代码段的最下方的 acquire 代码,线程池消费完所有的代码之后才会 release 所有的信号量
            // 这里直接释放了,这样处理逻辑的代码可以直接退出
            // 然后在 runLoop 下一次循环的时候会进行 collection 的 sort
            processedLatch.release(children.size());
            break;
        }
        ...
        消费消息的代码
        ...
    }
    processedLatch.acquire(children.size());  

分布式Delay队列实现(DistributedDelayQueue 实现)

DistributedDelayQueue 中新增的元素有个delay值, 消费者隔一段时间才能收到元素。同样的可以通过 QueueBuilder 来创建该对象:

    DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入元素时可以指定 delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意 delayUntilEpoch 不是离现在的一个时间间隔, 比如20毫秒,而是未来的一个时间戳,如 System.currentTimeMillis() + 10 秒。 如果delayUntilEpoch的时间已经过去,消息会立刻被消费者接收。

延时队列的实现同样基于 DistributedQueue,在 runLoop 方法的逻辑中,会获取元素的 delay 值,默认直接返回 0,DistributedDelayQueue 重写了获取 delay 时间的方法:

    // DistributedQueue.class
    // private void runLoop() 
    maxWaitMs = this.getDelay((String)children.get(0));
    if (maxWaitMs <= 0L) {
        this.processChildren(children, currentVersion);
    }
    // DistributedDelayQueue.class
    // 构造函数
    DistributedDelayQueue(.../*很多入参*/) {
            Preconditions.checkArgument(minItemsBeforeRefresh >= 0, "minItemsBeforeRefresh cannot be negative");
            this.queue = new DistributedQueue<T>(.../*很多入参*/) {
                // override 了原有的方法,DistributedQueue 中的 getDelay 方法直接返回 0L
                protected long getDelay(String itemNode) {
                    return this.getDelay(itemNode, System.currentTimeMillis());
                }

                private long getDelay(String itemNode, long sortTime) {
                    long epoch = DistributedDelayQueue.getEpoch(itemNode);
                    return epoch - sortTime;
                }
                // 重写了排序的方法,根据 delay 的时间来排序了
                protected void sortChildren(List<String> children) {
                    final long sortTime = System.currentTimeMillis();
                    Collections.sort(children, new Comparator<String>() {
                        public int compare(String o1, String o2) {
                            long diff = getDelay(o1, sortTime) - getDelay(o2, sortTime);
                            return diff < 0L ? -1 : (diff > 0L ? 1 : 0);
                        }
                    });
                }
            };
        }

JDK Queue风格接口的分布式队列实现(SimpleDistributedQueue 实现)

前面虽然实现了各种队列,但并没有 JDK 中的队列接口风格实现。 SimpleDistributedQueue 提供了和JDK一致性的接口(但是没有实现Queue接口)。 创建很简单:

public SimpleDistributedQueue(CuratorFramework client, String path)

增加元素:

    public boolean offer(byte[] data) throws Exception
    // 即往一个既定的 path 下有以 qn- 开头的子路径,如 /path/qn-0000001 

删除元素:

    public byte[] take() throws Exception
    // 获取队列最前的元素,同时zk剔除该路径
    // 使用 CountDownLatch 来达到超时的设置,虽然 take 是没有设置超时的... 也就是要一致等待 zk 回应

另外还提供了其它方法:

    // 获取元素 同 element() 返回队列最前的元素
    public byte[] peek() throws Exception
    // 和 take() 实际上是一样的方法,但是这里会有超时配置,见上关于 take()的解释
    public byte[] poll(long timeout, TimeUnit unit) throws Exception
    // 和 remove() 操作一样
    public byte[] poll() throws Exception
    // 直接删掉队列最前的元素
    public byte[] remove() throws Exception
    public byte[] element() throws Exception

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 nickchenyx@gmail.com

Title:实现分布式队列通过 Curator

Count:2k

Author:nickChen

Created At:2018-04-12, 11:33:29

Updated At:2023-05-08, 23:27:10

Url:http://nickchenyx.github.io/2018/04/12/CuratorFramework-DistributedQueue/

Copyright: 'Attribution-non-commercial-shared in the same way 4.0' Reprint please keep the original link and author.